如何把mysql中的数据同步到elasticsearch中?

低调的牛肉


对于ES,我并没有在实际项目中应用(自己研究过,没有实战过);我们项目使用的是MongoDB;由于项目的特殊性,我们研究了很多关于A->B的数据同步方案,包括DB2/Mysql到MongoDB,MongoDB到MongoDB等等。

MySQL数据同步到ES的方案

把MySQL的数据实时同步到ES,这样可以实现在ES中低延迟的检索,有些公司的项目做了分库,可以单独搭建一套ES来放全量的数据(或全量数据的部分字段,达到全量检索的效果),常用的数据同步实现方案有这些:

  • MySQL Binlog:MySQL的Binlog日志可以用于数据库的主从复制、数据恢复,也可以将MySQL的数据同步给ES;这里需要注意,Binlog的日志模式只能使用ROW模式(另外两种模式是STATEMENT和MIXED);解析Binlog日志中的内容,执行ES Document API,这样就可以将数据同步到ES中;

  • MySQL dump:如果是新建的项目,使用Binlog做数据同步是没有问题的,但如果MySQL已经运行了一段时间,项目架构中后增加的ES,那么历史数据的迁移就要额外处理了,因为Binlog可能已经被覆盖了。这时候历史数据的同步,可以使用mysqldump对现有数据导出,之后再使用Binlog的方式;

  • 开源工具:前两种方式都是在数据库日志这个级别做文章,我们还可以使用一些开源工具,比如go-mysql-elasticsearch;它可以帮助我们完成第一次全量数据同步,后续增量数据同步的工作(底层也是解析Binlog日志);又或者mypipe,它支持将Binlog日志内容解析后推送到Kafka,如果要写入到ES中,还需要写额外的代码从Kafka中消费数据写入ES。


我们项目中的实现方案

上面提过,我们项目中是将关系型数据库DB2/Mysql中的数据同步到MongoDB中,Mysql尚且还能使用Binlog日志,DB2想要把变化的数据实时通知出来,实现起来比较困难(写程序部署到DB2服务器上,要和数据库做关联配置,当数据发生变化,程序发送MQ通知给外围系统),先不说性能是否能保证,就是“自己写一个通知程序部署到数据库所在服务器上(有侵入)”,这一点至少在我们公司是无法做到的。

  • 我们采用了一个非常low的方式来解决这个问题,就是Java程序扫描DB2表中的时间戳,读取最近变化的数据,加工到MongoDB中;

  • 虽然实现方案比较low,但是效果还不错,因为我们在从关系型数据到MongoDB的数据同步过程中,可以自有地做数据加工;相当于按照一定的数据维度,比如按照客户维度,把几十张表做关联,加工后行程一个document保存到MongoDB中;再对外提供服务的时候,查询效率提升的很明显,因为表关联已经提前完成了(接口响应大多数时候都是毫秒级,超过50ms就算慢了)。


  • 缺点也很明显,数据从关系型数据库到MongoDB的延迟很高,我们项目几经优化,也需要20分钟左右;所以一定要结合着业务场景考虑是否使用这种方案。

我将持续分享Java开发、架构设计、程序员职业发展等方面的见解,希望能得到你的关注。


会点代码的大叔


1 数据迁移核心

只要是数据迁移就一定需要处理两个类型的数据:增量数据和历史数据。无论数据存储介质是什么,都必须处理这两类数据。我们以MySQL迁移数据至Elasticsearch为例,介绍两种比较常见的方案。


2 双写方案

(1) 处理增量数据

抽象出一个数据访问代理层,在操作针对MySQL中增改删的同时,也操作一份至ES,保持最新业务数据同步。完成后将部署代码上线运行。

增:直接插入ES

删:ES有该数据则删除,没有则不处理

改:ES有该数据则修改,没有则不处理

记录下MySQL迁移最小主键假设为100,所有主键大于等于100可以认为已经同步。

(2) 处理历史数据

编写代码脚本,将MySQL主键所有小于100的数据插入至ES。由于运行着双写程序,即使插入过后有修改,也可以同步至ES。


3 Binlog方案

(1) 处理增量数据

可以简单把Binlog看成是数据库操作日志,互联网公司中间件部门一般会做DRC中间件,把Binlog消息作为Kafka消息发出来,业务方去处理这个消息。这里我们监听DRC消息操作ES,根据消息体可以知道是增、删、改哪一类操作。

记录下MySQL迁移最小主键假设为100,所有主键大于等于100可以认为已经同步。

(2) 处理历史数据

这个方法与双写方案相同。


4 如何选择ES API

关于Elasticsearch多说一点,在选择ES API时一定要选择RestClient,即通过执行ES脚本的方式执行。

这种方式比较清晰,而且是ES官方推荐的方法。其它API例如TransportClient已经被官方放弃。


敬请关注

请点击关注按钮【IT徐胖子】会持续为大家奉献互联网和技术干货内容,感谢支持


IT徐胖子


近年来接触了比较多的有同步需求的项目,文件同步以及各种主流和非主流数据库之间的同步。要把数据从 MySQL 同步到 Elasticsearch,可以通过以下办法实现。

自实现

数据同步的关键的就是提取变化数据,MySQL 中捕获数据变化的方式有:

  • 触发器:简单直接,使用触发器把变化记录的主键插入到一个中间表中,程序定时扫描提取数据

  • Binlog:基于数据库二进制日志,日志中记录了数据的增删改操作,一般都是使用程序模拟 Slave 接收并解析日志,从而获取数据

  • 冗余字段:可以在同步表中添加一个更新时间字段,定时扫描并提取大于某个时间点的数据

自实现是需要成本的,当然了,也有很多开源工具可以使用。

使用其他工具

  • logstash-input-jdbc:这是 logstash 官方提供的一个插件,支持全量同步和增量同步,原理也比较简单,就是定时执行SQL,可使用上述使用**冗余字段**的方法。地址:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
  • go-mysql-elasticsearch:基于 Go 开发的,使用 Binlog 进行同步的第三方开源工具。地址:https://github.com/siddontang/go-mysql-elasticsearch
  • elasticsearch-jdbc:第三方工具,基于 SQL 进行全量和增量的同步。地址:https://github.com/jprante/elasticsearch-jdbc

推荐使用 logstash-input-jdbc,比较易用且稳定。


顿悟源码


logstash,应该没拼错吧


分享到:


相關文章: