聊聊canal的ClientIdentity

本文主要研究一下canal的ClientIdentity


聊聊canal的ClientIdentity


ClientIdentity

canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/ClientIdentity.java

<code>public class ClientIdentity implements Serializable {
​
    private static final long serialVersionUID = -8262100681930834834L;
    private String            destination;
    private short             clientId;
    private String            filter;
​
    public ClientIdentity(){
​
    }
​
    public ClientIdentity(String destination, short clientId){
        this.clientId = clientId;
        this.destination = destination;
    }
​
    public ClientIdentity(String destination, short clientId, String filter){
        this.clientId = clientId;
        this.destination = destination;
        this.filter = filter;
    }
​
    public Boolean hasFilter() {
        if (filter == null) {
            return false;
        }
        return StringUtils.isNotBlank(filter);
    }
​
    //......
}/<code>
  • ClientIdentity定义了destination、clientId、filter属性

CanalServerWithEmbedded

canal-1.1.4/server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

<code>public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {
​
    private static final Logger        logger  = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
    private Map canalInstances;
    // private Map lastRollbackPostions;
    private CanalInstanceGenerator     canalInstanceGenerator;
    private int                        metricsPort;
    private CanalMetricsService        metrics = NopCanalMetricsService.NOP;
    private String                     user;
    private String                     passwd;
​
    //......
​
    @Override
    public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
        checkStart(clientIdentity.getDestination());
​
        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        if (!canalInstance.getMetaManager().isStart()) {
            canalInstance.getMetaManager().start();
        }
​
        canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅
​
        Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
        if (position == null) {
            position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条
            if (position != null) {
                canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
            }
            logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
        } else {
            logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position);
        }
​
        // 通知下订阅关系变化
        canalInstance.subscribeChange(clientIdentity);
    }
​
    /**
     * 取消订阅
     */
    @Override
    public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        canalInstance.getMetaManager().unsubscribe(clientIdentity); // 执行一下meta订阅
​
        logger.info("unsubscribe successfully, {}", clientIdentity);
    }
​
    /**
     * 查询所有的订阅信息
     */
    public List listAllSubscribe(String destination) throws CanalServerException {
        CanalInstance canalInstance = canalInstances.get(destination);
        return canalInstance.getMetaManager().listAllSubscribeInfo(destination);
    }
​
    //......
​
}/<code>
  • CanalServerWithEmbedded提供了subscribe、unsubscribe、listAllSubscribe方法;其subscribe方法接收clientIdentity参数,然后使用canalInstance.getMetaManager().getCursor(clientIdentity)获取position,若position为null则使用canalInstance.getEventStore().getFirstPosition()获取,然后通过canalInstance.getMetaManager().updateCursor(clientIdentity, position)更新cursor,最后执行canalInstance.subscribeChange(clientIdentity);unsubscribe方法则执行canalInstance.getMetaManager().unsubscribe(clientIdentity);listAllSubscribe方法则执行canalInstance.getMetaManager().listAllSubscribeInfo(destination)

小结

ClientIdentity定义了destination、clientId、filter属性;CanalServerWithEmbedded提供了subscribe、unsubscribe、listAllSubscribe方法;其中subscribe、unsubscribe方法接收clientIdentity参数,而listAllSubscribe方法返回ClientIdentity列表

doc

  • ClientIdentity


分享到:


相關文章: