项目作者: zhongchengxcr

项目描述 :
基于阿里巴的canal向elasticsearch中同步数据mysql数据的小工具
高级语言: Java
项目地址: git://github.com/zhongchengxcr/canal-elasticsearch.git
创建时间: 2017-11-15T08:39:17Z
项目社区:https://github.com/zhongchengxcr/canal-elasticsearch

开源协议:Apache License 2.0

关键词:
"cannal" "elasticsearch" "mysql"

下载


" class="reference-link">logo.jpg

一个基于阿里巴巴Canal,实时同步mysql数据到Elasticsearch的工具。轻量,易于配置,部署简单,支持数据回滚。
使用 Totoro 可以帮助你轻松的将mysql的数据实时同步到Elasticsearch。Totoro是基于阿里巴巴的canal,是数据库级别的
监听,对原有项目没有任何侵入,所以你无需更改项目中的任何代码就可以实现实时的数据同步。

AppVeyor

Elasticsearch是一个分布式搜索服务,提供Restful API,底层基于Lucene,
采用多shard的方式保证数据安全,并且提供自动resharding的功能,github等大型的站点也采用Elasticsearch作为其搜索服务。

对于Elasticsearch,如果要在项目中使用,第一个要解决的问题就是,如何将数据库中的数据同步到Elasticsearch,下面常见的几种中方案

  • 修改代码,修改插入数据库的代码,同时插入Elasticsearch
  • 修改代码,修改插入数据库的代码,同时放入消息队列,在消息队列的另一端进行插入Elasticsearch
  • 使用 Elasticsearch技术栈的Logstash,编写sql语句定时执行搜集数据,并插入到Elasticsearch

在上诉方案中,一三都有明显缺陷,第二种方案是目前采用最多的,但还是会对原有项目代码有侵入,并且要引入消息队列。
如果你不想引入消息队列并且想获得一个开箱即用的同步中间件,那么Totoro会令你非常满意

Totoro的方案是,基于阿里巴巴开源数据库中间件canal,监听mysql数据库,并且在mysql数据库数据发生变化的时候
发送消息给Totoro,Totoro会将数据同步到Elasticsearch,如果消费出错支持回滚,保证数据被正确消费。

工作原理

totoro主要分为4个模块: select、Transformation、consumer、channel

  • select 负责从canal中拉取数据
  • Transformation 负责将select生产的数据,进行过滤、处理、转换
  • consumer 负责消费数据,在这里就是将数据同步到elasticsearch
  • channel 是以上3个模块链接者,也是数据在totoro中的容器。select将拉取的数据放入channel,Transformation监听到select放入的数据,进行处理,处理完再放回channel,等待consumer去消费

    select、Transformation、consumer分别是3个任务,并行执行。为保证数据消费顺序与ack(每次消费一条数据要向canal进行ack)顺序,
    其中select task与consumer task 都分别只有一条线程。而Transformation task具有多条线程。

QuickStart

1.安装Canal

有关于canal的安装,请参见canal的文档,里面有很详细说明 Canal QuickStart

2.安装Totoro

  1. git clone git@github.com:zhongchengxcr/canal-elasticsearch.git
  2. cd canal-elasticsearch
  3. mvn clean compile package -P dev -Dmaven.test.skip=true

编译完成后,进入/target/totoro目录,可以看到如下结构:

  1. -rw-r--r-- 1 zhongcheng staff 2214 12 21 17:08 README.md
  2. drwxr-xr-x 5 zhongcheng staff 160 12 21 17:22 bin
  3. drwxr-xr-x 5 zhongcheng staff 160 12 21 17:08 conf
  4. drwxr-xr-x 73 zhongcheng staff 2336 12 21 17:08 lib
  5. drwxr-xr-x 4 zhongcheng staff 128 12 21 17:08 logs

3.配置修改

  1. vi conf/canal-es.properties
  2. ###一下配置需要根据自己的业务进行修改
  3. # ----------------------- canal 相关配置 -----------------------------
  4. #canal的实例名字
  5. totoro.canal.destination=totoro
  6. #cananl 服务端的模式,单机:single ,集群:cluster/
  7. totoro.canal.mode=sign
  8. #canal 地址,包括端口号
  9. totoro.canal.address=127.0.0.1:11111
  10. #过滤表达式
  11. totoro.canal.filter.patten=
  12. #如果canal模式是集群的话,则需要填写zk地址
  13. totoro.canal.zk.address=
  14. totoro.canal.username=
  15. totoro.canal.password=
  16. #此项必须配置,不配置会导致 totoro 启动不了, totoro 只会处理在此配置的表
  17. #没有在此配置的表,totoro 将会忽略,不会进行同步,格式 "database.table.id" 多个使用 ","分割
  18. #database代表数据库,table 代表 数据库中的表,id代表 table中的 id
  19. #totoro 会默认将 database 作为 es中的index,table作为es中的type ,使用db中的id作为es的id
  20. totoro.canal.table.accept=demo.cc.id
  21. # ----------------------- elasticsearch 相关配置 -----------------------------
  22. totoro.es.address=127.0.0.1:9300
  23. totoro.es.cluster.name=my-elasticsearch
  24. totoro.es.username=
  25. totoro.es.password=
  26. # ----------------------- totoro 相关配置 ----------------------------
  27. #处理信息转换的线程数量 默认 3个 , 不要配置太大,2-4 之间吧,取决于业务情况,太大并不会增加性能,反而会增加上下文切换的开销
  28. totoro.cannal.trans.thread.nums=3

4.准备启动

  1. cd ../bin
  2. ./startup.sh
  3. _____ _
  4. |_ _|___ | |_ ___ _ __ ___
  5. | | / _ \ | __|/ _ \ | '__|/ _ \
  6. | || (_) || |_| (_) || | | (_) |
  7. |_| \___/ \__|\___/ |_| \___/
  8. [Totoro 1.0-SNAPSHOT,Build 2017/12/20,Author:zhongcheng_m@yeah.net]
  9. cd to /Users/zhongcheng/IdeaProjects/canal-elasticsearch/target/totoro/bin for workaround relative path
  10. LOG CONFIGURATION : /Users/zhongcheng/IdeaProjects/canal-elasticsearch/target/totoro/bin/../conf/logback.xml
  11. sync conf : /Users/zhongcheng/IdeaProjects/canal-elasticsearch/target/totoro/bin/../conf/canal-es.properties
  12. cd to /Users/zhongcheng/IdeaProjects/canal-elasticsearch/target/totoro/bin for continue

5.查看日志

  1. cd ../logs
  2. tail -100f totoro.log
  3. [2017-12-21 17:58:27.699] [INFO] [main] [c.t.c.e.s.s.canal.CanalEmbedSelector] --- TotoroSelector init start conf :CanalConf{mode=SIGN destination='totoro' filterPatten='' address='127.0.0.1:11111' zkAddress='' userName='' accept='demo.cc.id'}
  4. [2017-12-21 17:58:27.719] [INFO] [main] [c.t.c.e.s.s.canal.CanalEmbedSelector] --- TotoroSelector init complete .......
  5. [2017-12-21 17:58:27.737] [INFO] [main] [c.t.c.e.select.selector.SelectorTask] --- Selector task init .......
  6. [2017-12-21 17:58:27.737] [INFO] [main] [c.t.c.e.select.selector.SelectorTask] --- Selector task complete .......
  7. [2017-12-21 17:58:27.748] [INFO] [main] [c.t.c.e.transform.MessageFilterChain] --- TableFilter has benn registered to message filter chain
  8. [2017-12-21 17:58:27.748] [INFO] [main] [c.t.c.e.transform.MessageFilterChain] --- SimpleMessageFilter has benn registered to message filter chain
  9. [2017-12-21 17:58:27.749] [INFO] [main] [c.t.c.es.transform.SimpleEsAdapter] --- Add accept :demo.cc.id
  10. [2017-12-21 17:58:27.750] [INFO] [main] [c.t.canal.es.transform.TransFormTask] --- TransFormTask init start .......
  11. [2017-12-21 17:58:27.758] [INFO] [main] [c.t.canal.es.transform.TransFormTask] --- TransFormTask init complete .......
  12. [2017-12-21 17:58:30.571] [INFO] [main] [c.t.c.e.c.e.i.ElasticsearchServiceImpl] --- Complete the connection to elasticsearch
  13. [2017-12-21 17:58:30.572] [INFO] [main] [c.t.canal.es.consum.es.ConsumerTask] --- Consumer task init start .......
  14. [2017-12-21 17:58:30.573] [INFO] [main] [c.t.canal.es.consum.es.ConsumerTask] --- Consumer task init complete.......
  15. [2017-12-21 17:58:30.573] [INFO] [main] [com.totoro.canal.es.TotoroLauncher] --- Totoro init complete .......
  16. [2017-12-21 17:58:30.574] [INFO] [taskName = TransFormTask] [c.t.canal.es.transform.TransFormTask] --- TransFormTask start .......
  17. [2017-12-21 17:58:30.574] [INFO] [taskName = ConsumerTask] [c.t.canal.es.consum.es.ConsumerTask] --- ConsumerTask start .......
  18. [2017-12-21 17:58:30.656] [INFO] [taskName = SelectorTask] [c.t.c.e.select.selector.SelectorTask] --- Selector task start .......

6.关闭

  1. cd ../bin
  2. ./stop.sh

存在的问题

  • 内存抖动与GC时间过高(已解决,使用对象池化)

  • Elasticsearch 客户端 transport 的 cpu与内存消耗比较多,经测试 发现 transport 启动了 20+ 线程

  • 数据转换的灵活性不够

  • 单一节点,不支持多节点部署

计划

  • 0.1.0
    将selector部分抽出单独的模块,分别支持canal,各种mq,以及自定义的selector

欢迎有想法的朋友一起参与,讨论
QQ群:688734361