基于阿里云的DTS进行的二次开发,基于SpringBoot框架进行消息格式转化、集成客户端kakfa
最近公司需要应用订阅阿里云RDS相关的binlog,基于阿里云提供的案例subscribe_exampleale演化而来,重构成了SpringBoot、并且升级了相应的jar包,避免了很多版本上面带来的问题,还新增集成了新的客户端kafka、后续会考虑redis等等。
功能点:
后续还会加强的功能点:
由于DTS集成的kafka是基于单分区的,所以同一时刻只能有一个消费者消费(也就是自身自带的消费者),在某些情况可能会产生消息堆积,导致消费延迟(如果业务直接基于dts单个消费者开发的话),这里只是将消费过来的消息进行格式化转换之后,直接放入一个全新的kafka,交给业务方自己去消费。避免binlog消费能力下降。
其他的基本和案例保持一致
另外可能需要看一下application.yml配置,将你的应用的一些环境配置上去. 如果涉及到环境切分的话
KafkaConfiguration代码中有一个环境变量的配置: dev 如果了解环境切分的可以新增一个application-dev.yml的环境里面配置ssl相关的参数
如果启动时候,发现数据没进来,请查看配置spring.dts.include-data-info
是否关注了该数据,又或者在logback.xml
文件中将日志级别调整至DEBUG,但是可能会产生大量刷屏日志,这个在调试的时候注意一下。
DtsServerApplication
: 启动整个应用
StartCallback
: Spring的容器构建完成之后回调该方法,启动dts的运行环境。(dts-kafka的构建,以及一些位点线程、找对应消费者的流程初始化
以上环境基本初始化完成。
RecordConsumerListener
: binlog回调类。
AbstractEventProcess
: 事件处理类,包括关注事件、消息格式化、业务处理、消息发送等等
DDLEventProcess
: 针对表的DDL操作进行回调处理DataEventProcess
: 针对表数据的增删改查回调处理参考类: com.elab.data.dts.model.DMLData
{
"changeFieldList":[ // 修改的字段名
"updated"
],
"databaseName":"databaseName", // 数据库名称
"fieldDataMap":{
"id":{
"dataType":"java.lang.Integer", // 数据类型
"field":"id", // 字段名称
"oldValue":"502941", // 老的字段
"value":"502941" // 当前字段
},
"updated":{
"dataType":"java.util.Date",
"field":"updated",
"oldValue":"2020-10-16 11:30:06",
"value":"2020-10-16 11:35:06"
}
},
"operation":"UPDATE", // 操作类型 UPDATE、INSERT、DELETE等等
"sourceTimestamp":1602819306, // 触发时间戳
"tableName":"table_info" // 表名
}
参考类: com.elab.data.dts.model.DDLData
{
"databaseName":"库名", // 数据库名称
"operation":"DDL", // 表示操作类型
"sourceTimestamp":1605582383, // 数据产生时间戳
"sql":"alter table 表名 modify 字段名 varchar(100) COMMENT '描述'", // 具体的执行SQL
"tableName":"表名" // 表名
}
如果有需要可以在这个基础上进行二次开发,节省更多时间。
use-config-checkpoint-name
: 为true的情况,非第一次才会参考initial-checkpoint-name的时间戳为主,达到位点偏移到具体的时间戳
spring:
dts:
initial-checkpoint-name: 1596440543 # 注意这里是秒的时间戳,为空的话默认是当前时间戳。第一次启动的时候会参考这个时间戳,后续配合use-config-checkpoint-name属性以文件或者kafka的存储位点为主,
use-config-checkpoint-name: false # 强制使用当前位点,这里使用的时候要特别注意,不是非得回滚到指定位点,不要用true,否则重启的时候会重复消费,通常用来回到特定时间点的数据进行消费
subscribe-mode-name: subscribe # subscribe表示多机主备,如果有多台,只有其中一台会消费,其他只是等待这个消费挂掉,后续补上,起到容灾作用
max-poll-records: 100
include-data-info: # 数据过滤 包含数据信息
marketing_db_prod: [ all,abc ] # marketing_db_prod : 对应的库名 [ all,abc ] 对应的表名 : all 代表所有表, abc 代表具体的表名
# 如果有多个的话可以继续添加例如(marketing_db_prod2: [ all,abc ])
exclude-data-info: # 排除具体的表 : A , B ,C 代表3个表名, marketing_db : 代表库名
marketing_db: [ A , B , C]
exclude-table-change-field: # 过滤掉binlog中发生改变的字段
all: [updated,updator] # 过滤掉所有表的发生改变的字段
c_user_info: [ updated ] # 过滤掉指定表的发生改变的字段
table-partition-map: # 表和分区进行绑定,特定的表直接绑定固定的kafka分区
content_materials_label_rlat_info: 6
仅仅是应对特殊情况,由于是单分区消费也可以理解为单机消费,所以同一时刻也只可能有一个消费者在消费,重启太麻烦了。
该数据存在内存中,重启则无效。
# GET 请求 获取当前要过滤的表名列表
http://localhost:8686/debug/register/tableName/list?token=自定义的token
# GET 请求 新增要过滤的表名
http://localhost:8686/debug/register/tableName/list?token=自定义的token&&tb=具体的表名
# GET 请求 删除要过滤的表名
http://localhost:8686/debug/register/tableName/clear?token=自定义的token&&tb=具体的表名
# GET 请求 清空所有过滤的表名
http://localhost:8686/debug/register/tableName/clear?token=自定义的token
后续还会将一些dts使用心得分享处理,会持续更新.
欢迎大家一起交流: QQ : 444368875 有问题可以随时沟通 …