且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

HBase源码分析之事件处理模型

更新时间:2022-04-11 19:10:46

        HBase是一个复杂的分布式非结构化数据库,它将表中的数据按照行的方向切分成一个个的Region,并在若干RegionServer上上线,依靠所在RegionServer对外提供数据读写IO服务。一开始,表中数据由于很少,只有一个Region。随着数据越来越多,一个Region已难以满足频繁的数据读写请求,所以,Region开始分裂。分裂后的两个Region又会按照一定策略选择RegionServer上线,继续对外提供数据读写服务。并且,HBase作为一个分布式数据库,肯定需要考虑负载均衡,它会按照某些策略选择若干Region,在比较繁忙的RegionServer上下线,转移到较为空闲的RegionSever上线继续提供高质量的数据读写服务。所有涉及到的这些Region的上线、下线、分裂,以及我们还没提到的合并等等流程,在HBase内部都是通过不同组件之间发送事件,然后按照一定策略调度执行的。这就是HBase的事件处理模型。

        那么,HBase的事件处理模型是如何实现的呢?本文,我们就将研究下HBase内部事件处理模型的实现。

        在HBase中有一个抽象类EventHandler,定义如下:

@InterfaceAudience.Private
public abstract class EventHandler implements Runnable, Comparable<Runnable> {
        它实现了Runnable接口,说明其子类是一个线程,而且,在它内部定义了以下成员变量:

  // type of event this object represents
  // 该对象代表的事件类型
  protected EventType eventType;

  // 服务器
  protected Server server;

  // sequence id generator for default FIFO ordering of events
  // 默认的FIFO调度的事件的序列化ID生成器
  protected static final AtomicLong seqids = new AtomicLong(0);

  // sequence id for this event
  // 该事件的序列化ID
  private final long seqid;

  // Listener to call pre- and post- processing.  May be null.
  // 监听器,可能为空
  private EventHandlerListener listener;

  // Time to wait for events to happen, should be kept short
  // 等待事件发生的时间
  protected int waitingTimeForEvents;

  // 祖先
  private final Span parent;
        监听器listener是实现了EventHandlerListener接口的实例,接口定义如下:

  /**
   * This interface provides pre- and post-process hooks for events.
   * 为事件提供事前和事后处理钩子的接口
   */
  public interface EventHandlerListener {
    /**
     * Called before any event is processed
     * 任何事件执行前被调用
     * @param event The event handler whose process method is about to be called.
     */
    void beforeProcess(EventHandler event);
    /**
     * Called after any event is processed
     * 任何事件执行后被调用
     * @param event The event handler whose process method is about to be called.
     */
    void afterProcess(EventHandler event);
  }
        接口就定义了两个方法,一个是任何事件执行前被调用的beforeProcess()方法和任何事件执行后被调用的afterProcess()方法。

        抽象类EventHandler既然实现了Runnable接口,那么其子类肯定是一个线程,而且其功能的实现,必然在核心方法run()方法内。下面,我们就看下这个run()方法,代码如下:

  /**
   * 线程实现功能的主方法,run()方法,是不是很像一个模板方法啊
   */
  public void run() {
<span style="white-space:pre">	</span>  
<span style="white-space:pre">	</span>// 开启一个TraceScope
    TraceScope chunk = Trace.startSpan(this.getClass().getSimpleName(), parent);
    
    try {
      
      // 先执行监听器的beforeProcess()方法
      if (getListener() != null) getListener().beforeProcess(this);
      
      // 接着执行process()方法
      process();
      
      // 最后执行监听器的afterProcess()方法
      if (getListener() != null) getListener().afterProcess(this);
      
    } catch(Throwable t) {
    <span style="white-space:pre">	</span>
      // 处理事件异常t
      handleException(t);
      
    } finally {
      
      // 关闭TraceScope
      chunk.close();
    }
  }
        第一感觉是不是个模板方法呢?它的主要流程就是:

        1、开启一个TraceScope;

        2、先执行监听器的beforeProcess()方法;

        3、接着执行process()方法;

        4、最后执行监听器的afterProcess()方法;

        5、关闭TraceScope。

        如果中间出现Throwable异常,则调用handleException()方法处理事件异常。

        关于监听器的beforeProcess()方法和afterProcess()方法我们在上面已经提到过了,这里不再赘述。关键是看一下process()方法,其定义如下:

  /**
   * This method is the main processing loop to be implemented by the various
   * subclasses.
   * 
   * 核心方法process()方法是一个抽象方法,子类必须实现
   * @throws IOException
   */
  public abstract void process() throws IOException;
        它是一个抽象方法,也就意味着子类必须要实现它。并且,它是子类完成事件处理核心逻辑所必须执行的方法。

        另外,还实现了诸如获取监听器、设置监听器、获取优先级、获取事件序列ID等工具方法,十分简单,不再一一介绍,代码粘贴如下,读者可自行查看:

  /**
   * This method is the main processing loop to be implemented by the various
   * subclasses.
   * 
   * 核心方法process()方法是一个抽象方法,子类必须实现
   * @throws IOException
   */
  public abstract void process() throws IOException;

  /**
   * Return the event type
   * 获取时事件类型
   * @return The event type.
   */
  public EventType getEventType() {
    return this.eventType;
  }

  /**
   * Get the priority level for this handler instance.  This uses natural
   * ordering so lower numbers are higher priority.
   * 获取handler实例的优先级,数字越低级别越高
   * 
   * <p>
   * Lowest priority is Integer.MAX_VALUE.  Highest priority is 0.
   * <p>
   * Subclasses should override this method to allow prioritizing handlers.
   * <p>
   * Handlers with the same priority are handled in FIFO order.
   * <p>
   * @return Integer.MAX_VALUE by default, override to set higher priorities
   */
  public int getPriority() {
    return Integer.MAX_VALUE;
  }

  /**
   * 获取事件的序列号ID
   * @return This events' sequence id.
   */
  public long getSeqid() {
    return this.seqid;
  }

  /**
   * Default prioritized runnable comparator which implements a FIFO ordering.
   * <p>
   * Subclasses should not override this.  Instead, if they want to implement
   * priority beyond FIFO, they should override {@link #getPriority()}.
   * 
   * 实现可比较接口的compareTo()方法,先比较优先级Priority,谁的优先级越小谁就越小。
   * 优先级相同的话,再比较事件序列号ID,谁的事件序列号ID越小谁就越小
   */
  @Override
  public int compareTo(Runnable o) {
    EventHandler eh = (EventHandler)o;
    if(getPriority() != eh.getPriority()) {
      return (getPriority() < eh.getPriority()) ? -1 : 1;
    }
    return (this.seqid < eh.seqid) ? -1 : 1;
  }

  /**
   * 获取事件的监听器
   * @return Current listener or null if none set.
   */
  public synchronized EventHandlerListener getListener() {
    return listener;
  }

  /**
   * 设置事件的监听器
   * @param listener Listener to call pre- and post- {@link #process()}.
   */
  public synchronized void setListener(EventHandlerListener listener) {
    this.listener = listener;
  }

  @Override
  public String toString() {
    return "Event #" + getSeqid() +
      " of type " + eventType +
      " (" + getInformativeName() + ")";
  }

  /**
   * Event implementations should override thie class to provide an
   * informative name about what event they are handling. For example,
   * event-specific information such as which region or server is
   * being processed should be included if possible.
   */
  public String getInformativeName() {
    return this.getClass().toString();
  }

  /**
   * 处理事件异常,可能被覆写
   * Event exception handler, may be overridden
   * @param t Throwable object
   */
  protected void handleException(Throwable t) {
    LOG.error("Caught throwable while processing event " + eventType, t);
  }

        接下来就有一个问题,继承了抽象类EventHandler的各种事件是如何被提交的?它们被提交到哪里,又是如何被调度执行的呢?别慌,下面我一一为大家解答。

        首先在HRegionServer上有一个叫做service的成员变量,定义如下:

  // Instance of the hbase executor service.
  // HBase执行服务的实例
  protected ExecutorService service;
        它是HRegionServer上执行各种事件的ExecutorService实例,而ExecutorService提供了通用的事件执行机制,它抽象了线程池、队列,EventType可以被提交,使用线程处理被添加到队列中的对象。如果要创建一个的服务, 创建该类的一个实例,并调用实例的startExecutorService()方法。当服务完成后,调用shutdown()方法。

        那么事件是如何被提交的呢?我们以Region上线为例,在HRegionServer对外提供RPC服务的RSRpcServices类的openRegion()方法中,Region上线事件OpenRegionHandler是通过以下方式被提交的,代码如下:

          // If there is no action in progress, we can submit a specific handler.
          // Need to pass the expected version in the constructor.
          // 如果对应Region上没有相关的操作在进行,我们可以提交一个特定的处理者
          if (region.isMetaRegion()) {
            regionServer.service.submit(new OpenMetaHandler(
              regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
          } else {
            regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
              regionOpenInfo.getFavoredNodesList());
            regionServer.service.submit(new OpenRegionHandler(
              regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
          }
        它就是通过HRegionServer中成员变量service的submit()方法,来提交OpenRegionHandler事件的。