聊聊canal的BooleanMutex

本文主要研究一下canal的BooleanMutex


聊聊canal的BooleanMutex


BooleanMutex

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/utils/BooleanMutex.java

<code>public class BooleanMutex {

  private Sync sync;

  public BooleanMutex(){
      sync = new Sync();
      set(false);
  }

  public BooleanMutex(Boolean mutex){
      sync = new Sync();
      set(mutex);
  }

  /**
    * 阻塞等待Boolean为true
    *
    * @throws InterruptedException
    */
  public void get() throws InterruptedException {
      sync.innerGet();
  }

  /**
    * 阻塞等待Boolean为true,允许设置超时时间
    *
    * @param timeout
    * @param unit
    * @throws InterruptedException
    * @throws TimeoutException
    */
  public void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
      sync.innerGet(unit.toNanos(timeout));
  }

  /**
    * 重新设置对应的Boolean mutex
    *
    * @param mutex

    */
  public void set(Boolean mutex) {
      if (mutex) {
          sync.innerSetTrue();
      } else {
          sync.innerSetFalse();
      }
  }

  public boolean state() {
      return sync.innerState();
  }

  //......
}/<code>
  • BooleanMutex定义了sync属性,其get方法执行的是sync.innerGet(),其set方法执行的是sync.innerSetTrue()或者sync.innerSetFalse(),其state方法返回的是sync.innerState()

Sync

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/utils/BooleanMutex.java

<code>    private final class Sync extends AbstractQueuedSynchronizer {

      private static final long serialVersionUID = 2559471934544126329L;
      /** State value representing that TRUE */
      private static final int TRUE             = 1;
      /** State value representing that FALSE */
      private static final int FALSE           = 2;

      private boolean isTrue(int state) {
          return (state & TRUE) != 0;
      }

      /**
        * 实现AQS的接口,获取共享锁的判断
        */
      protected int tryAcquireShared(int state) {
          // 如果为true,直接允许获取锁对象
          // 如果为false,进入阻塞队列,等待被唤醒
          return isTrue(getState()) ? 1 : -1;

      }

      /**
        * 实现AQS的接口,释放共享锁的判断
        */
      protected boolean tryReleaseShared(int ignore) {
          // 始终返回true,代表可以release
          return true;
      }

      boolean innerState() {
          return isTrue(getState());
      }

      void innerGet() throws InterruptedException {
          acquireSharedInterruptibly(0);
      }

      void innerGet(long nanosTimeout) throws InterruptedException, TimeoutException {
          if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException();
      }

      void innerSetTrue() {
          for (;;) {
              int s = getState();
              if (s == TRUE) {
                  return; // 直接退出
              }
              if (compareAndSetState(s, TRUE)) {// cas更新状态,避免并发更新true操作
                  releaseShared(0);// 释放一下锁对象,唤醒一下阻塞的Thread
                  return;
              }
          }
      }

      void innerSetFalse() {
          for (;;) {
              int s = getState();
              if (s == FALSE) {
                  return; // 直接退出
              }
              if (compareAndSetState(s, FALSE)) {// cas更新状态,避免并发更新false操作
                  return;

              }
          }
      }

  }/<code>
  • Sync继承了AbstractQueuedSynchronizer,其tryReleaseShared方法始终返回true,其innerGet方法执行的是acquireSharedInterruptibly方法,其innerSetTrue方法执行compareAndSetState(s, TRUE),其innerSetFalse执行compareAndSetState(s, FALSE)

小结

BooleanMutex定义了sync属性,其get方法执行的是sync.innerGet(),其set方法执行的是sync.innerSetTrue()或者sync.innerSetFalse(),其state方法返回的是sync.innerState()

doc

  • BooleanMutex


分享到:


相關文章: