1. 簡介
Databus是一個低延遲、可靠的、支持事務的、保持一致性的數據變更抓取系統 。由LinkedIn於2013年開源。Databus通過挖掘數據庫日誌的方式,將數據庫變更實時、可靠的從數據庫拉取出來,業務可以通過定製化client實時獲取變更並進行其他業務邏輯。
Databus有以下特點:
- 數據源和消費者之間的隔離。
- 數據傳輸能保證順序性和至少一次交付的高可用性。
- 從變化流的任意時間點進行消費,包括通過bootstrap獲取所有數據。
- 分區消費
- 源一致性保存,消費不成功會一直消費直到消費成功
2. 功能&特性
- 來源獨立:Databus支持多種數據來源的變更抓取,包括Oracle和MySQL。
- 可擴展、高度可用:Databus能擴展到支持數千消費者和事務數據來源,同時保持高度可用性。
- 事務按序提交:Databus能保持來源數據庫中的事務完整性,並按照事務分組和來源的提交順尋交付變更事件。
- 低延遲、支持多種訂閱機制:數據源變更完成後,Databus能在毫秒級內將事務提交給消費者。同時,消費者使用Databus中的服務器端過濾功能,可以只獲取自己需要的特定數據。
- 無限回溯:對消費者支持無限回溯能力,例如當消費者需要產生數據的完整拷貝時,它不會對數據庫產生任何額外負擔。當消費者的數據大大落後於來源數據庫時,也可以使用該功能。
3. 使用場景舉例
BUSSINESS1 和 BUSSINESS2 是兩個不同的業務邏輯,他們的變更需要同時寫入到 DB 和 CACHE ,那麼當他們同時修改同一個數據的時候是否能保證數據的一致性呢?可以發現如果按照下圖標明的順序進行操作並不能保證數據的一致性!
還有一個問題是變更完DB之後,更新CACHE失敗怎麼辦?如果忽略的話,會造成後續讀取到CACHE中舊的數據,如果重試的話,業務代碼會寫得更加複雜。針對這些場景,如果沒有一個強一致協議是很難解決掉的。如果要業務邏輯去實現這些晦澀的一致性協議,卻又是不現實的。
現在,有了Databus,上面提到的這些一致性問題就都沒有了,並且那些冗長的雙寫邏輯也可以去掉了,如下圖所示:
4. 系統整體架構與主要組件
4.1 系統整體架構
上圖中介紹了Databus系統的構成,包括Relays、bootstrap服務和Client lib等。Bootstrap服務中包括Bootstrap Producer和Bootstrap Server。快速變化的消費者直接從Relay中取事件。如果一個消費者的數據更新大幅落後,它要的數據就不在Relay的日誌中,而是需要請求Bootstrap服務,返回的將會是自消費者上次處理變更之後的所有數據變更快照。
- Source Databases:MySQL以及Oracle數據源
- Relays:負責抓取和存儲數據庫變更,全內存存儲,也可配置使用mmap內存映射文件方式
- Schema Registry:數據庫數據類型到Databus數據類型的一個轉換表
- Bootstrap Service:一個特殊的客戶端,功能和Relays類似,負責存儲數據庫變更,主要是磁盤存儲
- Application:數據庫變更消費邏輯,從Relay中拉取變更,並消費變更
- Client Lib:提供挑選關注變更的API給消費邏輯
- Consumer Code:變更消費邏輯,可以是自身消費或者再將變更發送至下游服務
4.2 主要組件及功能
上圖系統整體架構圖畫的比較簡略,下載源碼觀察項目結構後不難發現databus的主要由以下四個組件構成:
- Databus Relay:
從源數據庫中的Databus源中讀取變化的行並序列化為Databus變化事件保存到內存緩衝區中。
監聽Databus客戶端的請求(包括引導程序的請求)並傳輸Databus數據變化事件。
- Databus Client:
在Relay上檢查新的數據變化事件和處理特定的業務邏輯的回調。
如果它們在relay後面落下太遠,到引導程序服務運行一個追溯查詢。
單獨的客戶端可以處理全部的Databus流,它們也可以作為集群的一部分而每個客戶端處理一部分流。
- Databus Bootstrap Producer:
只是一個特殊的客戶端。
檢查Relay上的新的數據變化事件。
保存數據變化事件到Mysql數據庫,Mysql數據庫用於引導程序和為了客戶端追溯數據。
- Databus Bootstrap Server:
監聽來自Databus客戶端的請求併為了引導和追溯返回一個超長的回溯的數據變化事件。
5. Databus Relay和Databus Client詳細分析
5.1 Databus Relay
5.1.1 架構與組件功能
- ContainerStatsMBean
- DbusEventsTotalStatsMBean
- DbusEventsStatisticsCollectorMBean
5.1.2 源碼分析
- ServerContainer._globalStatsThread:統計信息的線程
- OpenReplicatorEventProducer.EventProducerThread:針對mysql binlog日誌的Event生產者線程,每個source一個線程,持有_orListener,管理和數據庫的連接,將變更寫入到Event Buffer裡。
- EventProducerThread啟動後會初始化類型為OpenReplicator的日誌解析對象開始解析日誌,同時初始化類型為ORListener的_orListener開始監聽,代碼如下:
<code>@Overridepublic void run() { _eventBuffer.start(_sinceScn); _startPrevScn.set(_sinceScn); initOpenReplicator(_sinceScn); try { boolean started = false; while (!started) { try { _or.start(); started = true; } catch (Exception e) { _log.error("Failed to start OpenReplicator: " + e); _log.warn("Sleeping for 1000 ms"); Thread.sleep(1000); } } _orListener.start(); } catch (Exception e) { _log.error("failed to start open replicator: " + e.getMessage(), e); return; }}/<code>
初始化方法如下:
<code>void initOpenReplicator(long scn) {int offset = offset(scn);int logid = logid(scn);String binlogFile = String.format("%s.%06d", _binlogFilePrefix, logid);// we should use a new ORListener to drop the left events in// binlogEventQueue and the half processed transaction._orListener = new ORListener(_sourceName, logid, _log, _binlogFilePrefix, _producerThread, _tableUriToSrcIdMap,_tableUriToSrcNameMap, _schemaRegistryService, 200, 100L);_or.setBinlogFileName(binlogFile);_or.setBinlogPosition(offset);_or.setBinlogEventListener(_orListener);// must set transport and binlogParser to null to drop the old// connection environment in reinit case_or.setTransport(null);_or.setBinlogParser(null);_log.info("Connecting to OpenReplicator " + _or.getUser() + "@" + _or.getHost() + ":" + _or.getPort() + "/"+ _or.getBinlogFileName() + "#" + _or.getBinlogPosition());}/<code>
EventProducerThread._orListener:監聽數據庫變更,將變更轉換為Avro記錄,寫入到transaction裡面,最終調用_producerThread的onEndTransaction()方法將事務裡的事件寫入到Event Buffer裡,代碼如下:
<code>@Overridepublic void onEndTransaction(Transaction txn) throws DatabusException {try { addTxnToBuffer(txn);_maxSCNReaderWriter.saveMaxScn(txn.getIgnoredSourceScn() != -1 ? txn.getIgnoredSourceScn() : txn.getScn());} catch (UnsupportedKeyException e) {_log.fatal("Got UnsupportedKeyException exception while adding txn (" + txn + ") to the buffer", e);throw new DatabusException(e);} catch (EventCreationException e) {_log.fatal("Got EventCreationException exception while adding txn (" + txn + ") to the buffer", e);throw new DatabusException(e);}}/<code>
FileMaxSCNHandler負責讀寫SCN,注意在寫入文件時會將原有文件重命名為XXX.temp,原因是為了防止在更新文件的時候發生錯誤,導致SCN丟失,代碼如下:
<code>private void writeScnToFile() throws IOException {long scn = _scn.longValue();File dir = _staticConfig.getScnDir();if (!dir.exists() && !dir.mkdirs()) {throw new IOException("unable to create SCN file parent:" + dir.getAbsolutePath());}// delete the temp file if one existsFile tempScnFile = new File(_scnFileName + TEMP);if (tempScnFile.exists() && !tempScnFile.delete()) {LOG.error("unable to erase temp SCN file: " + tempScnFile.getAbsolutePath());}File scnFile = new File(_scnFileName);if (scnFile.exists() && !scnFile.renameTo(tempScnFile)) {LOG.error("unable to backup scn file");}if (!scnFile.createNewFile()) {LOG.error("unable to create new SCN file:" + scnFile.getAbsolutePath());}FileWriter writer = new FileWriter(scnFile);writer.write(Long.toString(scn));writer.write(SCN_SEPARATOR + new Date().toString());writer.flush();writer.close();LOG.debug("scn persisted: " + scn);}/<code>
以源碼例子中PersonRelayServer的主類啟動為起點,大致的啟動流程如下:
PersonRelayServer主方法 -> new DatabusRelayMain實例 -> 調用initProducers方法初始化生產者->根據配置調用addOneProducer增加生產者->new DbusEventBufferAppendable獲得Event Buffer->new EventProducerServiceProvider實例->
調用createProducer獲得OpenReplicatorEventProducer->OpenReplicatorEventProducer中包含
EventProducerThread->啟動線程開始獲取Event
5.2 Databus Client
5.2.1 架構與組件功能
5.2.2 源碼分析
執行Client的啟動腳本後會調用main方法,main方法會根據命令行參數中指定的屬性文件創建StaticConfig類,然後配置類創建dbusHttpClient實例來與Relay進行通信,參數defaultConfigBuilder為默認配置類信息,可以為空,代碼如下:
<code>public static DatabusHttpClientImpl createFromCli(String[] args, Config defaultConfigBuilder) throws Exception {Properties startupProps = ServerContainer.processCommandLineArgs(args);if (null == defaultConfigBuilder)defaultConfigBuilder = new Config();ConfigLoader<staticconfig> staticConfigLoader = new ConfigLoader<staticconfig>("databus.client.",defaultConfigBuilder);StaticConfig staticConfig = staticConfigLoader.loadConfig(startupProps);DatabusHttpClientImpl dbusHttpClient = new DatabusHttpClientImpl(staticConfig);return dbusHttpClient;}/<staticconfig>/<staticconfig>/<code>
設置要連接的Relay信息,然後通過參數defaultConfigBuilder傳遞給dbusHttpClient,代碼如下:
<code>DatabusHttpClientImpl.Config configBuilder = new DatabusHttpClientImpl.Config();configBuilder.getRuntime().getRelay("1").setHost("localhost");configBuilder.getRuntime().getRelay("1").setPort(11115);configBuilder.getRuntime().getRelay("1").setSources(PERSON_SOURCE);/<code>
啟動databus client過程如下:
<code>protected void doStart() {_controlLock.lock();try {// 綁定並開始接收來到的連接int portNum = getContainerStaticConfig().getHttpPort();_tcpChannelGroup = new DefaultChannelGroup();_httpChannelGroup = new DefaultChannelGroup();_httpServerChannel = _httpBootstrap.bind(new InetSocketAddress(portNum));InetSocketAddress actualAddress = (InetSocketAddress) _httpServerChannel.getLocalAddress();_containerPort = actualAddress.getPort();// 持久化端口號 (文件名對容器來說必須唯一)File portNumFile = new File(getHttpPortFileName());portNumFile.deleteOnExit();try {FileWriter portNumFileW = new FileWriter(portNumFile);portNumFileW.write(Integer.toString(_containerPort));portNumFileW.close();LOG.info("Saving port number in " + portNumFile.getAbsolutePath());} catch (IOException e) {throw new RuntimeException(e);}_httpChannelGroup.add(_httpServerChannel);LOG.info("Serving container " + getContainerStaticConfig().getId() + " HTTP listener on port "+ _containerPort);if (_containerStaticConfig.getTcp().isEnabled()) {int tcpPortNum = _containerStaticConfig.getTcp().getPort();_tcpServerChannel = _tcpBootstrap.bind(new InetSocketAddress(tcpPortNum));_tcpChannelGroup.add(_tcpServerChannel);LOG.info("Serving container " + getContainerStaticConfig().getId() + " TCP listener on port "+ tcpPortNum);}_nettyShutdownThread = new NettyShutdownThread();Runtime.getRuntime().addShutdownHook(_nettyShutdownThread);// 5秒後開始producer線程if (null != _jmxConnServer && _containerStaticConfig.getJmx().isRmiEnabled()) {try {_jmxShutdownThread = new JmxShutdownThread(_jmxConnServer);Runtime.getRuntime().addShutdownHook(_jmxShutdownThread);_jmxConnServer.start();LOG.info("JMX server listening on port " + _containerStaticConfig.getJmx().getJmxServicePort());} catch (IOException ioe) {if (ioe.getCause() != null && ioe.getCause() instanceof NameAlreadyBoundException) {LOG.warn("Unable to bind JMX server connector. Likely cause is that the previous instance was not cleanly shutdown: killed in Eclipse?");if (_jmxConnServer.isActive()) {LOG.warn("JMX server connector seems to be running anyway. ");} else {LOG.warn("Unable to determine if JMX server connector is running");}} else {LOG.error("Unable to start JMX server connector", ioe);}}}_globalStatsThread.start();} catch (RuntimeException ex) {LOG.error("Got runtime exception :" + ex, ex);throw ex;} finally {_controlLock.unlock();}}/<code>
6. Databus for Mysql實踐
6.1 相關解釋
6.2 數據庫環境配置
- 配置數據庫binlog_format=ROW, show variables like ‘binlog_format‘可查看日誌格式, set globle binlog_format=ROW’可設置,通過修改my.cnf文件也可以,增加或修改行binlog_format=ROW即可。
- binlog_checksum設置為空,show global variables like ‘binlog_checksum’命令可查看,set binlog_checksum=none可設置。
- 在mysql上創建名為or_test的數據庫,or_test上創建表名為person的表,定義如下:
6.3 Demo配置與運行
6.3.1 下載源碼
- Databus官網下載源碼,下載地址https://github.com/linkedin/databus.git,我們需要用到databus目錄下的databus2-example文件夾,在此基礎上改造並運行,目錄結構及介紹如下:
- database:數據庫模擬相關的腳本和工具
- databus2-example-bst-producer-pkg:bootstrap producer的屬性配置文件夾,包括bootstrap producer和log4j屬性文件,build腳本以及bootstrap producer的啟動和停止腳本。
- databus2-example-client-pkg:client的屬性配置文件夾,包括各種屬性文件和啟動和停止腳本。
- databus2-example-client:client源代碼,包含啟動主類和消費者代碼邏輯。
- databus2-example-relay-pkg:relay的屬性配置文件夾,包含監控的表的source信息和Avro schema。
- databus2-example-relay:relay的啟動主類。
- schemas_registry:存放表的avsc文件。
6.3.2 Relay端的操作
- 配置Relay屬性文件:databus2-example-relay-pkg/conf/relay-or-person.properties的內容如下配置,包括端口號,buffer存儲策略,maxScn存放地址等信息:
<code>databus.relay.container.httpPort=11115databus.relay.container.jmx.rmiEnabled=falsedatabus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORYdatabus.relay.eventBuffer.queuePolicy=OVERWRITE_ON_WRITEdatabus.relay.eventLogReader.enabled=falsedatabus.relay.eventLogWriter.enabled=falsedatabus.relay.schemaRegistry.type=FILE_SYSTEMdatabus.relay.schemaRegistry.fileSystem.schemaDir=./schemas_registrydatabus.relay.eventBuffer.maxSize=1024000000databus.relay.eventBuffer.readBufferSize=10240databus.relay.eventBuffer.scnIndexSize=10240000databus.relay.physicalSourcesConfigsPattern=../../databus2-example/databus2-example-relay-pkg/conf/sources-or-person.jsondatabus.relay.dataSources.sequenceNumbersHandler.file.scnDir=/tmp/maxScndatabus.relay.startDbPuller=true/<code>
- 配置被監控表的source信息:databus2-example-relay-pkg/conf/sources-or-person.json的內容如下配置,其中URI format:mysql://username/password@mysql_host[:mysql_port]/mysql_serverid/binlog_prefix,注意%2F為轉義字符,用戶名為root,數據庫密碼為123。
<code>{"name" : "person","id" : 1,"uri" : "mysql://root%2F123@localhost:3306/1/mysql-bin","slowSourceQueryThreshold" : 2000,"sources" :[{"id" : 40,"name" : "com.linkedin.events.example.or_test.Person","uri": "or_test.person","partitionFunction" : "constant:1"}]}/<code>
- databus2-example-relay-pkg/schemas_registry/下定義person的Avro schema文件
com.linkedin.events.example.or_test.Person.1.avsc,其中1表示版本(Databus目前沒有針對mysql提供生成Avro schema文件的工具,所以只能手工編寫)具體內容如下所示:
<code>{"name" : "Person_V1","doc" : "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST","type" : "record","meta" : "dbFieldName=person;pk=id;","namespace" : "com.linkedin.events.example.or_test","fields" : [ {"name" : "id","type" : [ "long", "null" ],"meta" : "dbFieldName=ID;dbFieldPosition=0;"}, {"name" : "firstName","type" : [ "string", "null" ],"meta" : "dbFieldName=FIRST_NAME;dbFieldPosition=1;"}, {"name" : "lastName","type" : [ "string", "null" ],"meta" : "dbFieldName=LAST_NAME;dbFieldPosition=2;"}, {"name" : "birthDate","type" : [ "long", "null" ],"meta" : "dbFieldName=BIRTH_DATE;dbFieldPosition=3;"}, {"name" : "deleted","type" : [ "string", "null" ],"meta" : "dbFieldName=DELETED;dbFieldPosition=4;"} ]}/<code>
- 註冊Avro schema到index.schemas_registry文件,databus2-example-relay-pkg/schemas_registry/index.schemas_registry文件中添加行com.linkedin.events.example.or_test.Person.1.avsc ,每定義一個Avro schema都需要添加進去,relay運行時會到此文件中查找表對應的定義的Avro schema。
6.3.3 Client端的操作
- 配置Client屬性文件:databus2-example-client-pkg/conf/client-person.properties的內容如下配置,包括端口號,buffer存儲策略,checkpoint持久化等信息:
<code>databus.relay.container.httpPort=11125databus.relay.container.jmx.rmiEnabled=falsedatabus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORYdatabus.relay.eventBuffer.queuePolicy=BLOCK_ON_WRITEdatabus.relay.schemaRegistry.type=FILE_SYSTEMdatabus.relay.eventBuffer.maxSize=10240000databus.relay.eventBuffer.readBufferSize=1024000databus.relay.eventBuffer.scnIndexSize=1024000databus.client.connectionDefaults.pullerRetries.initSleep=1databus.client.checkpointPersistence.fileSystem.rootDirectory=./personclient-checkpointsdatabus.client.checkpointPersistence.clearBeforeUse=falsedatabus.client.connectionDefaults.enablePullerMessageQueueLogging=true/<code>
- databus2-example-client/src/main/java下的PersonConsumer類是消費邏輯回調代碼,主要是取出每一個event後依次打印每個字段的名值對,主要代碼如下:
<code>private ConsumerCallbackResult processEvent(DbusEvent event, DbusEventDecoder eventDecoder) {GenericRecord decodedEvent = eventDecoder.getGenericRecord(event, null);try {Utf8 firstName = (Utf8) decodedEvent.get("firstName");Utf8 lastName = (Utf8) decodedEvent.get("lastName");Long birthDate = (Long) decodedEvent.get("birthDate");Utf8 deleted = (Utf8) decodedEvent.get("deleted");LOG.info("firstName: " + firstName.toString() + ", lastName: " + lastName.toString() + ", birthDate: "+ birthDate + ", deleted: " + deleted.toString());} catch (Exception e) {LOG.error("error decoding event ", e);return ConsumerCallbackResult.ERROR;}return ConsumerCallbackResult.SUCCESS;}/<code>
- databus2-example-client/src/main/java下的PersonClient類是relay的啟動主類,主要是設置啟動Client的配置信息,將消費者實例註冊到監聽器中,後續可對其進行回調,主要代碼如下:
<code>public static void main(String[] args) throws Exception {DatabusHttpClientImpl.Config configBuilder = new DatabusHttpClientImpl.Config(); // Try to connect to a relay on localhostconfigBuilder.getRuntime().getRelay("1").setHost("localhost");configBuilder.getRuntime().getRelay("1").setPort(11115);configBuilder.getRuntime().getRelay("1").setSources(PERSON_SOURCE); // Instantiate a client using command-line parameters if anyDatabusHttpClientImpl client = DatabusHttpClientImpl.createFromCli(args, configBuilder); // register callbacksPersonConsumer personConsumer = new PersonConsumer();client.registerDatabusStreamListener(personConsumer, null, PERSON_SOURCE);client.registerDatabusBootstrapListener(personConsumer, null, PERSON_SOURCE); // fire off the Databus clientclient.startAndBlock();}/<code>
6.3.4 build-啟動-測試
- cd build/databus2-example-relay-pkg/distributions
- tar -zxvf databus2-example-relay-pkg.tar.gz解壓
- 執行啟動腳本 ./bin/start-example-relay.sh or_person -Y ./conf/sources-or-person.json
- 執行命令 curl -s http://localhost:11115/sources返回如下內容說明啟動成功:
- 啟動Client:
- cd build/databus2-example-client-pkg/distributions
- tar -zxvf databus2-example-client-pkg.tar.gz解壓
- 執行啟動腳本 ./bin/start-example-client.sh person
- 執行命令 curl http://localhost:11115/relayStats/outbound/http/clients返回如下內容說明啟動成功:
測試:
Relay和Client啟動成功後,就已經開始對person表進行數據變更捕獲了,現在向person表插入一條如下記錄:
databus2-example-relay-pkg/distributions/logs下的relay.log記錄如下:
databus2-example-client-pkg/distributions/logs下的client.log記錄如下:
可以看到已經可以抓取到改變的數據了!
7. 總結
遇到的問題:
需要進一步實驗:
- 使用bootstrap produces和bootstrap servers模式來進行大批量事件的獲取
- 配置多個relay進行事件抓取
- 結合zookeeper來配置客戶端集群進行消費
專注於技術熱點大數據,人工智能,JAVA、Python、 C 、GO、Javascript等語言最新前言技術,及業務痛點問題分析,請關注【編程我最懂】共同交流學習。
閱讀更多 編程我最懂 的文章