利用Canal完成Mysql數據同步Redis

流程

Canal的原理是模擬Slave向Master發送請求,Canal解析binlog,但不將解析結果持久化,而是保存在內存中,每次有客戶端讀取一次消息,就刪除該消息。這裡所說的客戶端,就需要我們寫一個連接Canal的程序,持續從Canal獲取數據。

程序寫MySQL, 解析binlog,數據放入隊列寫Redis
讀Redis

利用Canal完成Mysql數據同步Redis

步驟
一、配置Canal
參考https://github.com/alibaba/canal
【mysql配置】
1,配置參數

<code>[mysqld]  
log-bin=mysql-bin #添加這一行就ok
binlog-format=ROW #選擇row模式
server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重複 /<code>

2,在mysql中 配置canal數據庫管理用戶,配置相應權限(repication權限)

<code>CREATE USER canal IDENTIFIED BY 'canal';      
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES; /<code>

【canal下載和配置】
1,下載canal https://github.com/alibaba/canal/releases
2,解壓

<code>mkdir /tmp/canal  
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal /<code>

3,修改配置文件

vi conf/example/instance.properties

<code>#################################################  
## mysql serverId
canal.instance.mysql.slaveId = 1234

# position info,需要改成自己的數據庫信息
canal.instance.master.address = 127.0.0.1:3306

canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =

# username/password,需要改成自己的數據庫信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

# table regex
canal.instance.filter.regex = .*\\\\..*

################################################# /<code>

【canal啟動和關閉】
1,啟動

<code>shbin/startup.sh/<code>

2,查看日誌

<code>vi logs/canal/canal.log /<code>
<code>2018-12-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.  
2018-12-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]  
2018-12-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
/<code>

具體instance的日誌:

<code>vi logs/example/example.log  /<code>
<code>2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]  
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful.... /<code>

3,關閉

<code>shbin/stop.sh/<code> 

注意:
1,這裡只需要配置好參數後,就可以直接運行
2,Canal沒有解析後的文件,不會持久化

二、創建客戶端
參考https://github.com/alibaba/canal/wiki/ClientExample

其中一個是連接canal並操作的類,一個是redis的工具類,使用maven主要是依賴包的下載很方便。

利用Canal完成Mysql數據同步Redis

pom.xml

<code><project>  
<modelversion>4.0.0/<modelversion>
<groupid>com.alibaba.otter/<groupid>
<artifactid>canal.sample/<artifactid>
<version>0.0.1-SNAPSHOT/<version>
<dependencies>
<dependency>
<groupid>com.alibaba.otter/<groupid>
<artifactid>canal.client/<artifactid>
<version>1.0.12/<version>
/<dependency>
<dependency>
<groupid>org.springframework/<groupid>
<artifactid>spring-test/<artifactid>
<version>3.1.2.RELEASE/<version>
<scope>test/<scope>
/<dependency>
<dependency>
<groupid>redis.clients/<groupid>
<artifactid>jedis/<artifactid>
<version>2.4.2/<version>
/<dependency>
/<dependencies>
<build>
/<project> /<code>

2,ClientSample代碼
這裡主要做兩個工作,一個是循環從Canal上取數據,一個是將數據更新至Redis

<code>package canal.sample;  

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;

public class ClientSample {

public static void main(String args[]) {

// 創建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
printEntry(message.getEntries());
}

connector.ack(batchId); // 提交確認
// connector.rollback(batchId); // 處理失敗, 回滾數據
}

} finally {
connector.disconnect();
}
}

private static void printEntry( List<entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}

RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {

throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}

EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));

for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}

private static void printColumn( List<column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}

private static void redisInsert( List<column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
}
}

private static void redisUpdate( List<column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
}
}


private static void redisDelete( List<column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.delKey("user:"+ columns.get(0).getValue());
}
}
} /<column>/<column>/<column>/<column>/<entry>/<code>

3,RedisUtil代碼

<code>package canal.sample;  

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisUtil {

// Redis服務器IP
private static String ADDR = "10.1.2.190";

// Redis的端口號
private static int PORT = 6379;

// 訪問密碼
private static String AUTH = "admin";

// 可用連接實例的最大數目,默認值為8;
// 如果賦值為-1,則表示不限制;如果pool已經分配了maxActive個jedis實例,則此時pool的狀態為exhausted(耗盡)。
private static int MAX_ACTIVE = 1024;

// 控制一個pool最多有多少個狀態為idle(空閒的)的jedis實例,默認值也是8。
private static int MAX_IDLE = 200;

// 等待可用連接的最大時間,單位毫秒,默認值為-1,表示永不超時。如果超過等待時間,則直接拋出JedisConnectionException;

private static int MAX_WAIT = 10000;

// 過期時間
protected static int expireTime = 660 * 660 *24;

// 連接池
protected static JedisPool pool;

/**
* 靜態代碼,只在初次調用一次
*/
static {
JedisPoolConfig config = new JedisPoolConfig();
//最大連接數
config.setMaxTotal(MAX_ACTIVE);
//最多空閒實例
config.setMaxIdle(MAX_IDLE);
//超時時間
config.setMaxWaitMillis(MAX_WAIT);
//
config.setTestOnBorrow(false);
pool = new JedisPool(config, ADDR, PORT, 1000);
}

/**
* 獲取jedis實例
*/
protected static synchronized Jedis getJedis() {
Jedis jedis = null;
try {
jedis = pool.getResource();
} catch (Exception e) {
e.printStackTrace();
if (jedis != null) {
pool.returnBrokenResource(jedis);
}
}
return jedis;
}

/**
* 釋放jedis資源
* @param jedis
* @param isBroken
*/
protected static void closeResource(Jedis jedis, boolean isBroken) {

try {
if (isBroken) {
pool.returnBrokenResource(jedis);
} else {
pool.returnResource(jedis);
}
} catch (Exception e) {

}
}

/**
* 是否存在key
* @param key
*/
public static boolean existKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
return jedis.exists(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return false;
}

/**
* 刪除key
* @param key
*/
public static void delKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
jedis.del(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}

/**
* 取得key的值

* @param key
*/
public static String stringGet(String key) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.get(key);
jedis.expire(key, expireTime);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}

/**
* 添加string數據
* @param key
* @param value
*/
public static String stringSet(String key, String value) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.set(key, value);
jedis.expire(key, expireTime);
} catch (Exception e) {
e.printStackTrace();
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}

/**
* 添加hash數據
* @param key
* @param field
* @param value
*/
public static void hashSet(String key, String field, String value) {
boolean isBroken = false;

Jedis jedis = null;
try {
jedis = getJedis();
if (jedis != null) {
jedis.select(0);
jedis.hset(key, field, value);
jedis.expire(key, expireTime);
}
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
} /<code>

注意

1,客戶端的Jedis連接不同於項目裡的Jedis連接需要Spring註解,直接使用靜態方法就可以。

運行
1,運行canal服務端startup.bat / startup.sh
2,運行客戶端程序

注意:
1,雖然canal服務端解析binlog後不會把數據持久化,但canal服務端會記錄每次客戶端消費的位置(客戶端每次ack時服務端會記錄pos點)。如果數據正在更新時,canal服務端掛掉,客戶端也會跟著掛掉,mysql依然在插入數據,而redis則因為客戶端的關閉而停止更新,造成mysql和redis的數據不一致。解決辦法是,只要重啟canal服務端和客戶端就可以了,雖然canal服務端因為重啟之前解析數據清空,但因為canal服務端記錄的是客戶端最後一次獲取的pos點,canal服務端再從這個pos點開始解析,客戶端更新至redis,以達到數據的一致。


2,如果只有一個canal服務端和一個客戶端,肯定存在可用性低的問題,一種做法是用程序來監控canal服務端和客戶端,如果掛掉,再重啟;一種做法是多個canal服務端+zk,將canal服務端的配置文件放在zk,任何一個canal服務端掛掉後,切換到其他canal服務端,讀到的配置文件的內容就是一致的(還有記錄的消費pos點),保證業務的高可用,客戶端可使用相同的做法。


專注於技術熱點大數據,人工智能,JAVA、Python、 C 、GO、Javascript等語言最新前言技術,及業務痛點問題分析,請關注【編程我最懂】共同交流學習。


分享到:


相關文章: