用 canal 監控 binlog 並實現Mysql定製同步數據的功能

點擊上方 "程序員小樂"關注, 星標或置頂一起成長

第一時間與你相約


每日英文

Don't make decisions when angry, don't say the promise at happy time.

請記住,別在生氣的時候做決定,別在開心的時候說承諾。


每日掏心話

所謂心事,不過是不如己意,那就是我執著於自己描畫的理想,一有落差,即生煩惱。

鏈接:cnblogs.com/hebaibai/p/10911899.html

用 canal 監控 binlog 並實現Mysql定製同步數據的功能

程序員小樂(ID:study_tech)第 831 次推文 圖片來自百度


往日回顧:為什麼Spring MVC能自動轉換json/xml,你研究過它背後的原理嗎?


正文


業務背景

寫任何工具都不能脫離實際業務的背景。開始這個項目的時候是因為現有的項目中數據分佈太零碎,零零散散的分佈在好幾個數據庫中,沒有統一的數據庫來收集這些數據。這種情況下想做一個大而全的會員中心繫統比較困難。(這邊是一個以互聯網保險為中心的項目,保單,會員等數據很零散的儲存在好幾個項目之中,並且項目之間的數據基本上是隔離的)。

現有的項目數據庫是在騰訊雲中儲存,雖然騰訊提供了數據同步功能,但是這樣必須要表結構相同才行,並不符合我們的需求。所以需要自行開發。

需求


  • 需要能靈活配置。

  • 實時數據10分鐘內希望可以完成同步。

  • 來源數據與目標數據可能結構,字段名稱不同。

  • 增刪改都可以同步。


技術選擇

這個任務交給了我和另外一個同事來做。

同事的

同事希望可以通過ETL工具Kettle來做,這個東西我沒有研究過,是同事自己在研究。具體過程不是很清楚,但是最後是通過在mysql中設置更新,修改,刪除的觸發器,然後在Kettle中做了一個定時任務,實現了數據同步的功能,初步測試符合需求。但是必須要在數據庫中設置觸發器,並且會有一個臨時表,這一點我個人不是很喜歡。

我的

我是本著能自己寫就自己寫的原則,準備自己寫一個。剛開始使用的是定時任務比較兩個庫的數據差別,然後再同步數據。但是經過一定的數據測試後,發現在數據量大的時候,定時任務中的上一個任務沒有執行完畢,下一個任務就又開始了。這樣造成了兩邊數據不一致。最終這個方案廢棄了。

後來通過研究,發現mysql的數據操作會記錄在binlog中,這時就有了新的方案。可以通過逐行獲取binlog信息,經過解析數據後,同步在目標庫中。建議看下我們這篇文章:那些年被面試官懟的 MySQL 索引、圖解 MySQL 索引:B-樹、B+樹,看這篇文章就對了!

既然有了方案,那麼就開始做吧。

開始嘗試1

首先要打開數據庫的binlog功能,這一步比較簡單,修改mysql的配置文件:/etc/mysql/mysql.conf.d/mysqld.cnf,添加:

server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
expire_logs_days = 10
max_binlog_size = 100M
binlog_format = ROW

然後重啟mysql 就好了,具體每個參數的意思,搜索一下就好了。這時候隨意的對某一個數據庫中的表做一下增刪改,對應的日誌就會記錄在/var/log/mysql/這個文件夾下了。我們看一下這個文件夾裡的東西:

用 canal 監控 binlog 並實現Mysql定製同步數據的功能

這裡的文件是沒有辦法正常查看的,需要使用mysql提供的命令來查看,命令是這個樣子的:

1、查看

mysqlbinlog mysql-bin.000002

2、指定位置查看

mysqlbinlog --start-position="120" --stop-position="332" mysql-bin.000002

因為我們現在的binlog_format指定的格式是ROW(就在上面寫的,還記得嗎?),所謂binlog文件的內容沒有辦法正常查看,因為他是這個樣子的:

用 canal 監控 binlog 並實現Mysql定製同步數據的功能

這時,我們需要對輸出進行解碼

mysqlbinlog --base64-output=decode-rows -v mysql-bin.000001

這時候,顯示的結果就變成了:

用 canal 監控 binlog 並實現Mysql定製同步數據的功能

雖然還不是正常的sql,但是好賴是有一定的格式了。

but自己來做解析的話還是很麻煩,so~放棄這種操作。

繼續嘗試2

經過再次研究後,發現數據庫中執行sql也是可以查看binlog的。主要有如下幾條命令:

--重置binlog
reset master;

--查看binlog的配置
show variables like '%binlog%';

--查看所有的binlog
show binary logs;

--查看正在寫入的binlog
show master status;

--查看指定binlog文件
show binlog events in 'mysql-bin.000001';

--查看指定binlog文件,並指定位置
show binlog events in 'mysql-bin.000001' from [pos] limit [顯示多少條];

按照上面的命令執行結果為:

用 canal 監控 binlog 並實現Mysql定製同步數據的功能

發現sql還是不能正常顯示。這裡的原因應該是binlog_format配置的原因。將其修改為 binlog_format=Mixed後,完美解決。經過數據庫中一通增刪改後,顯示的sql類似這樣:

use `pay`; /* ApplicationName=DataGrip 2018.2.5 */ UPDATE `pay`.`p_pay_log` t SET t.`mark_0` = 'sdfsdf' WHERE t.`id` LIKE '342' ESCAPE '#'

現在似乎已經可以開始寫數據同步了,只要在啟動的時候獲取當正在使用的是哪一個日誌文件,記錄binlog的位置,然後一點一點向下執行,解析sql就好了。但是在這個過程中,我發現阿里巴巴有一款開源的軟件可以用。就是標題上說道的:canal。看了一下網站上的介紹,簡直美滋滋。

它的文檔和代碼地址在這裡:https://github.com/alibaba/canal,大家可以看一下。現在就準備用這個來完成我所需要的功能。

正式開始寫

首先看一下介紹,canal是需要單獨運行一個服務的,這個服務具體的配置還是比較簡單的。它的作用我自己理解就是監控binlog,然後根據自己的需要獲取binlog中一定量的數據。這個數據是經過處理的,可以比較方便的知道里面的具體信息。比如那些數據發生了變動,每列數據的列名是什麼,變動前和變動後的值是啥之類的。那麼開始。

1.我的想法

1)項目啟動的時候,開啟canal的鏈接,以及初始化一些配置。

@Bean
public CanalConnector canalConnector() {
CanalConnector connector = CanalConnectors.newSingleConnector(
//對應canal服務的鏈接
new InetSocketAddress(canalConf.getIp(), canalConf.getPort()),
//鏈接的目標,這裡對應canal服務中的配置,需要查閱文檔
canalConf.getDestination(),
//不知道是什麼用戶,使用“”
canalConf.getUser(),
//不知道是什麼密碼,使用“”
canalConf.getPassword()
);
return connector;
}

2)先開啟一個線程,裡面寫一個死循環,用於從canal的服務中獲取binlog中的消息。這個消息類是:com.alibaba.otter.canal.protocol.Message。

Message message = connector.getWithoutAck(100);

  • connector:canal鏈接的實例化對象。

  • connector.getWithoutAck(100):從連接中獲取100條binlog中的數據。


3)取出Message中的事件集合,就是binlog中的每一條數據。將類型為增刪改的數據取出,之後每一條數據放在一個線程中,用線程池去執行它。

List<entry> entries = message.getEntries();

message.getEntries():從鏈接中獲取的數據集合,每一條代表1條binlog數據
/<entry>

4)在每一個線程中,取出Entry中的數據,根據其類型拼接各種sql,並執行。

Header header = entry.getHeader();
//獲取發生變化的表名稱,可能會沒有
String tableName = header.getTableName();

//獲取發生變化的數據庫名稱,可能會沒有
String schemaName = header.getSchemaName();

//獲取事件類型
EventType eventType = rowChange.getEventType();
/**
這裡我們只是用其中的三種類型:
EventType.DELETE 刪除
EventType.INSERT 插入
EventType.UPDATE 更新
*/
//獲取發生變化的數據
RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

//遍歷其中的數據
int rowDatasCount = rowChange.getRowDatasCount();


for (int i = 0; i < rowDatasCount; i++) {
//每一行中的數據
RowData rowData = rowChange.getRowDatas(i);
}

//獲取修改前的數據
List<column> before = rowData.getBeforeColumnsList();

//獲取修改後的數據
List<column> after = rowData.getAfterColumnsList();
/<column>/<column>

Column中有一系列方法,比如是否發生修改,時候為key,是否是null等,就不在細說了。擴展:阿里Canal框架(數據同步中間件)初步實踐

2.萬事具備,可以開始寫了

1)這裡先寫一個線程,用於不停的從canal服務中獲取消息,然後創建新的線程並讓其處理其中的數據。代碼如下:

@Override
public void run() {
while (true) {
//主要用於在鏈接失敗後用於再次嘗試重新鏈接
try {
if (!run) {

//打開鏈接,並設置 run=true
startCanal();
}
} catch (Exception e) {

System.err.println("連接失敗,嘗試重新鏈接。。。");
threadSleep(3 * 1000);
}


System.err.println("鏈接成功。。。");
//不停的從CanalConnector中獲取消息
try {
while (run) {

//獲取一定數量的消息,這裡為線程池數量×3
Message message = connector.getWithoutAck(batchSize * 3);
long id = message.getId();

//處理獲取到的消息
process(message);
connector.ack(id);
}
} catch (Exception e) {
System.err.println(e.getMessage());
} finally {
//如果發生異常,最終關閉連接,並設置run=false
stopCanal();
}
}

}


void process(Message message) {
List<entry> entries = message.getEntries();
if (entries.size() <= 0) {
return;
}
log.info("process message.entries.size:{}", entries.size());
for (Entry entry : entries) {
Header header = entry.getHeader();
String tableName = header.getTableName();
String schemaName = header.getSchemaName();

//這裡判斷是否可以取出數據庫名稱和表名稱,如果不行,跳過循環
if (StringUtils.isAllBlank(tableName, schemaName)) {
continue;
}



//創建新的線程,並執行
jobList.stream()
.filter(job -> job.isMatches(tableName, schemaName))
.forEach(job -> executorService.execute(job.newTask(entry)));
}
}
/<entry>

這裡的jobList是我自己定義List,代碼如下:

package com.hebaibai.miner.job;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;

import static com.alibaba.otter.canal.protocol.CanalEntry.Entry;

@Slf4j
@Data
public abstract class Job {


/**
* 數據庫鏈接
*/
protected JdbcTemplate jdbcTemplate;

/**
* 額外配置
*/
protected JSONObject prop;

/**
* 校驗目標是否為合適的數據庫和表
*
* @param table
* @param database
* @return
*/


abstract public boolean isMatches(String table, String database);

/**
* 實例化一個Runnable
*
* @param entry
* @return
*/
abstract public Runnable newTask(final Entry entry);


/**
* 獲取RowChange
*
* @param entry
* @return
*/
protected CanalEntry.RowChange getRowChange(Entry entry) {
try {
return CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
}

}

jobList裡面放的是Job的實現類。

3.寫一個Job的實現類,並用於同步表,並轉換字段名稱。

因為需求中要求兩個同步的數據中可能字段名稱不一致,所以我寫了一個josn用來配置兩個表的字段對應關係:

//省略其他配置
"prop": {
//來源數據庫
"database": "pay",
//來源表


"table": "p_pay_msg",
//目標表(目標庫在其他地方配置)
"target": "member",
//字段對應關係
//key :來源表的字段名
//value:目標表的字段名
"mapping": {
"id": "id",
"mch_code": "mCode",
"send_type": "mName",
"order_id": "phone",
"created_time": "create_time",
"creator": "remark"
}
}
//省略其他配置

下面是全部的代碼,主要做的就是取出變動的數據,按照對應的字段名重新拼裝sql,然後執行就好了,不多解釋。擴展:基於canal進行日誌的訂閱和轉換

package com.hebaibai.miner.job;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static com.alibaba.otter.canal.protocol.CanalEntry.*;

/**
* 單表同步,表的字段名稱可以不同,類型需要一致
* 表中需要有id字段
*/
@SuppressWarnings("ALL")


@Slf4j
public class TableSyncJob extends Job {


/**
* 用於校驗是否適用於當前的配置
*
* @param table
* @param database
* @return
*/
@Override
public boolean isMatches(String table, String database) {
return prop.getString("database").equals(database) &&
prop.getString("table").equals(table);
}

/**
* 返回一個新的Runnable
*
* @param entry
* @return
*/
@Override
public Runnable newTask(final Entry entry) {
return () -> {
RowChange rowChange = super.getRowChange(entry);
if (rowChange == null) {
return;
}
EventType eventType = rowChange.getEventType();
int rowDatasCount = rowChange.getRowDatasCount();
for (int i = 0; i < rowDatasCount; i++) {
RowData rowData = rowChange.getRowDatas(i);
if (eventType == EventType.DELETE) {
delete(rowData.getBeforeColumnsList());
}
if (eventType == EventType.INSERT) {
insert(rowData.getAfterColumnsList());
}
if (eventType == EventType.UPDATE) {
update(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
}
}
};
}

/**

* 修改後的數據
*
* @param after
*/
private void insert(List<column> after) {
//找到改動的數據
List<column> collect = after.stream().filter(column -> column.getUpdated() || column.getIsKey()).collect(Collectors.toList());
//根據表映射關係拼裝更新sql
JSONObject mapping = prop.getJSONObject("mapping");
String target = prop.getString("target");
List<string> columnNames = new ArrayList<>();
List<string> columnValues = new ArrayList<>();
for (int i = 0; i < collect.size(); i++) {
Column column = collect.get(i);
if (!mapping.containsKey(column.getName())) {
continue;
}
String name = mapping.getString(column.getName());
columnNames.add(name);
if (column.getIsNull()) {
columnValues.add("null");
} else {
columnValues.add("'" + column.getValue() + "'");
}
}
StringBuilder sql = new StringBuilder();
sql.append("REPLACE INTO ").append(target).append("( ")
.append(StringUtils.join(columnNames, ", "))
.append(") VALUES ( ")
.append(StringUtils.join(columnValues, ", "))
.append(");");
String sqlStr = sql.toString();
log.debug(sqlStr);
jdbcTemplate.execute(sqlStr);
}

/**
* 更新數據
*
* @param before 原始數據
* @param after 更新後的數據
*/
private void update(List<column> before, List<column> after) {
//找到改動的數據
List<column> updataCols = after.stream().filter(column -> column.getUpdated()).collect(Collectors.toList());
//找到之前的數據中的keys

List<column> keyCols = before.stream().filter(column -> column.getIsKey()).collect(Collectors.toList());
//沒有key,執行更新替換
if (keyCols.size() == 0) {
return;
}
//根據表映射關係拼裝更新sql
JSONObject mapping = prop.getJSONObject("mapping");
String target = prop.getString("target");
//待更新數據
List<string> updatas = new ArrayList<>();
for (int i = 0; i < updataCols.size(); i++) {
Column updataCol = updataCols.get(i);
if (!mapping.containsKey(updataCol.getName())) {
continue;
}
String name = mapping.getString(updataCol.getName());
if (updataCol.getIsNull()) {
updatas.add("`" + name + "` = null");
} else {
updatas.add("`" + name + "` = '" + updataCol.getValue() + "'");
}
}
//如果沒有要修改的數據,返回
if (updatas.size() == 0) {
return;
}
//keys
List<string> keys = new ArrayList<>();
for (Column keyCol : keyCols) {
String name = mapping.getString(keyCol.getName());
keys.add("`" + name + "` = '" + keyCol.getValue() + "'");
}
StringBuilder sql = new StringBuilder();
sql.append("UPDATE ").append(target).append(" SET ");
sql.append(StringUtils.join(updatas, ", "));
sql.append(" WHERE ");
sql.append(StringUtils.join(keys, "AND "));
String sqlStr = sql.toString();
log.debug(sqlStr);
jdbcTemplate.execute(sqlStr);
}

/**
* 刪除數據
*
* @param before

*/
private void delete(List<column> before) {
//找到改動的數據
List<column> keyCols = before.stream().filter(column -> column.getIsKey()).collect(Collectors.toList());
if (keyCols.size() == 0) {
return;
}
//根據表映射關係拼裝更新sql
JSONObject mapping = prop.getJSONObject("mapping");
String target = prop.getString("target");
StringBuilder sql = new StringBuilder();
sql.append("DELETE FROM `").append(target).append("` WHERE ");
List<string> where = new ArrayList<>();
for (Column column : keyCols) {
String name = mapping.getString(column.getName());
where.add(name + " = '" + column.getValue() + "' ");
}
sql.append(StringUtils.join(where, "and "));
String sqlStr = sql.toString();
log.debug(sqlStr);
jdbcTemplate.execute(sqlStr);
}
}
/<string>/<column>/<column>/<string>/<string>/<column>/<column>/<column>/<column>/<string>/<string>/<column>/<column>

項目源碼


完整代碼在後臺回覆:同步,即可獲取。


歡迎在留言區留下你的觀點,一起討論提高。如果今天的文章讓你有新的啟發,學習能力的提升上有新的認識,歡迎轉發分享給更多人。


猜你還想看


阿里、騰訊、百度、華為、京東最新面試題彙集

手把手教你 Netty 實現自定義協議!

一個簡單的例子帶你理解HashMap

JDK 中定時器是如何實現的

關注訂閱號「程序員小樂」,收看更多精彩內容
嘿,你在看嗎?



分享到:


相關文章: