聊聊canal的ApplicationConfigMonitor

本文主要研究一下canal的ApplicationConfigMonitor


聊聊canal的ApplicationConfigMonitor


ApplicationConfigMonitor

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationConfigMonitor.java

<code>@Component
public class ApplicationConfigMonitor {

  private static final Logger   logger = LoggerFactory.getLogger(ApplicationConfigMonitor.class);

  @Resource
  private ContextRefresher     contextRefresher;

  @Resource
  private CanalAdapterService   canalAdapterService;

  private FileAlterationMonitor fileMonitor;

  @PostConstruct
  public void init() {
      File confDir = Util.getConfDirPath();
      try {
          FileAlterationObserver observer = new FileAlterationObserver(confDir,
              FileFilterUtils.and(FileFilterUtils.fileFileFilter(),
                  FileFilterUtils.prefixFileFilter("application"),
                  FileFilterUtils.suffixFileFilter("yml")));
          FileListener listener = new FileListener();
          observer.addListener(listener);
          fileMonitor = new FileAlterationMonitor(3000, observer);
          fileMonitor.start();

      } catch (Exception e) {
          logger.error(e.getMessage(), e);
      }
  }

  @PreDestroy
  public void destroy() {
      try {
          fileMonitor.stop();
      } catch (Exception e) {
          logger.error(e.getMessage(), e);
      }
  }

  private class FileListener extends FileAlterationListenerAdaptor {

      @Override
      public void onFileChange(File file) {
          super.onFileChange(file);
          try {
              // 检查yml格式
              new Yaml().loadAs(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8), Map.class);

              canalAdapterService.destroy();

              // refresh context
              contextRefresher.refresh();

              try {
                  Thread.sleep(2000);
              } catch (InterruptedException e) {
                  // ignore
              }
              canalAdapterService.init();
              logger.info("## adapter application config reloaded.");
          } catch (Exception e) {
              logger.error(e.getMessage(), e);
          }
      }
  }
}/<code>
  • ApplicationConfigMonitor在PostConstruct的时候创建FileAlterationObserver,添加FileListener,然后使用FileAlterationObserver创建FileAlterationMonitor,然后执行其start方法;在PreDestroy的时候执行fileMonitor.stop();FileListener继承了FileAlterationListenerAdaptor,其onFileChange方法会检查yml格式,执行canalAdapterService.destroy()、contextRefresher.refresh()、canalAdapterService.init()

FileAlterationMonitor

commons-io-2.4-sources.jar!/org/apache/commons/io/monitor/FileAlterationMonitor.java

<code>public final class FileAlterationMonitor implements Runnable {

  private final long interval;
  private final List<filealterationobserver> observers = new CopyOnWriteArrayList<filealterationobserver>();
  private Thread thread = null;
  private ThreadFactory threadFactory;
  private volatile boolean running = false;

  public FileAlterationMonitor() {
      this(10000);
  }

  public FileAlterationMonitor(long interval) {
      this.interval = interval;
  }

  public FileAlterationMonitor(long interval, FileAlterationObserver... observers) {
      this(interval);
      if (observers != null) {
          for (FileAlterationObserver observer : observers) {
              addObserver(observer);
          }
      }
  }

  //......

  public synchronized void start() throws Exception {
      if (running) {
          throw new IllegalStateException("Monitor is already running");
      }
      for (FileAlterationObserver observer : observers) {
          observer.initialize();
      }
      running = true;
      if (threadFactory != null) {
          thread = threadFactory.newThread(this);
      } else {
          thread = new Thread(this);
      }
      thread.start();
  }

  /**
    * Stop monitoring.
    *
    * @throws Exception if an error occurs initializing the observer
    */
  public synchronized void stop() throws Exception {
      stop(interval);
  }

  /**
    * Stop monitoring.
    *
    * @param stopInterval the amount of time in milliseconds to wait for the thread to finish.
    * A value of zero will wait until the thread is finished (see {@link Thread#join(long)}).
    * @throws Exception if an error occurs initializing the observer
    * @since 2.1
    */
  public synchronized void stop(long stopInterval) throws Exception {
      if (running == false) {
          throw new IllegalStateException("Monitor is not running");
      }
      running = false;
      try {
          thread.join(stopInterval);
      } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
      }
      for (FileAlterationObserver observer : observers) {
          observer.destroy();
      }
  }

  public void run() {
      while (running) {
          for (FileAlterationObserver observer : observers) {
              observer.checkAndNotify();
          }
          if (!running) {
              break;
          }
          try {
              Thread.sleep(interval);
          } catch (final InterruptedException ignored) {
          }
      }
  }    
}/<filealterationobserver>/<filealterationobserver>/<code>
  • FileAlterationMonitor的start方法会使用自己的runnable创建Thread,然后执行thread.start();其stop方法则执行thread.join(stopInterval),然后遍历observers,执行observer.destroy();其run方法则遍历observers,执行observer.checkAndNotify()

FileAlterationObserver

commons-io-2.4-sources.jar!/org/apache/commons/io/monitor/FileAlterationObserver.java

<code>public class FileAlterationObserver implements Serializable {

  private final List<filealterationlistener> listeners = new CopyOnWriteArrayList<filealterationlistener>();
  private final FileEntry rootEntry;
  private final FileFilter fileFilter;
  private final Comparator<file> comparator;

  //......

  public void checkAndNotify() {

      /* fire onStart() */
      for (FileAlterationListener listener : listeners) {
          listener.onStart(this);
      }

      /* fire directory/file events */
      File rootFile = rootEntry.getFile();
      if (rootFile.exists()) {
          checkAndNotify(rootEntry, rootEntry.getChildren(), listFiles(rootFile));
      } else if (rootEntry.isExists()) {
          checkAndNotify(rootEntry, rootEntry.getChildren(), FileUtils.EMPTY_FILE_ARRAY);
      } else {
          // Didn't exist and still doesn't
      }

      /* fire onStop() */
      for (FileAlterationListener listener : listeners) {
          listener.onStop(this);
      }
  }

  private void checkAndNotify(FileEntry parent, FileEntry[] previous, File[] files) {
      int c = 0;
      FileEntry[] current = files.length > 0 ? new FileEntry[files.length] : FileEntry.EMPTY_ENTRIES;
      for (FileEntry entry : previous) {
          while (c < files.length && comparator.compare(entry.getFile(), files[c]) > 0) {
              current[c] = createFileEntry(parent, files[c]);
              doCreate(current[c]);
              c++;
          }
          if (c < files.length && comparator.compare(entry.getFile(), files[c]) == 0) {
              doMatch(entry, files[c]);
              checkAndNotify(entry, entry.getChildren(), listFiles(files[c]));
              current[c] = entry;
              c++;
          } else {
              checkAndNotify(entry, entry.getChildren(), FileUtils.EMPTY_FILE_ARRAY);
              doDelete(entry);
          }
      }
      for (; c < files.length; c++) {
          current[c] = createFileEntry(parent, files[c]);
          doCreate(current[c]);
      }
      parent.setChildren(current);
  }

  //......
}/<file>/<filealterationlistener>/<filealterationlistener>/<code>
  • FileAlterationObserver的checkAndNotify方法会遍历之前的FileEntry,然后使用NameFileComparator递归遍历对比文件变化,分别触发doCreate、doMatch、doDelete,他们会回调FileAlterationListener的对应方法

小结

ApplicationConfigMonitor在PostConstruct的时候创建FileAlterationObserver,添加FileListener,然后使用FileAlterationObserver创建FileAlterationMonitor,然后执行其start方法;在PreDestroy的时候执行fileMonitor.stop();FileListener继承了FileAlterationListenerAdaptor,其onFileChange方法会检查yml格式,执行canalAdapterService.destroy()、contextRefresher.refresh()、canalAdapterService.init()

doc

  • ApplicationConfigMonitor


分享到:


相關文章: