基于DataX的通用数据同步微服务,一个Restful接口搞定所有通用数据同步
已优化重构并迁移至datax-admin
基于阿里DataX开发一个通用导数的微服务,可以开发前台页面,根据reader和writer自动进行数据同步
本项目只限于同步数据源量很少的时候使用,若是数据源很多的情况,请参考下面的设计思路
由于阿里DataX有一些缺点:
搬砖的时间很宝贵,所以:
例如:mysql到hive
选择mysql需要同步的表、字段等信息,输入导入到hive的库表分区等信息,不需提前在hive进行创库创表创分区,自动根据要导的mysql表以及字段类型进行创建hive库表分区,然后利用freemarker去生成json文件,使用Azkaban进行调度执行,自动创建项目、上传zip、执行流一系列操作,可在Azkaban页面进行查看。当然也提供了可直接远程python执行。
上述设计使用策略实现,只有几个数据源之间相互同步还好,如hive/mysql/oracle三个,策略模式还是不错的,但若是数据源很多的时候,策略模式不是很方便,写的类也成幂次方增加,为了优化开发易维护,只有放弃策略模式,用以下方式,代码我就不推了,有点懒。
伪代码
DataxSyncDTO
/**
* 同步信息,包含以下三个key
* @see BaseReader 子类
* @see BaseWriter 子类
* @see Job.Setting
*/
private Map<String, Object> sync;
ReaderService/WriterService
如可以实现MysqlReaderService/MysqlWriterService
public interface ReaderService<T extends BaseReader> {
/**
* 解析reader
*
* @param tenantId 租户id
* @param datasourceId 数据源ID
* @param reader json
* @return json
*/
String parseReader(Long tenantId, Long datasourceId, String reader);
}
根据名称使用反射找到具体的实现类,序列化出具体的reader/writer部分json
最后组合成datax的json
最终的datax json映射类
public class Job {
private Setting setting;
private List<Content> content;
public static class Setting {
private Speed speed;
private ErrorLimit errorLimit;
}
public static class Speed {
private String record;
private String channel;
private String speedByte;
}
public static class ErrorLimit {
private String record;
private String percentage;
}
public static class Content {
private Reader reader;
private Writer writer;
}
public static class Reader {
private String name;
private Object parameter;
}
public static class Writer {
private String name;
private Object parameter;
}
}
不要修改数据源名称,只需修改为自己的username、password、url即可
# 这里只要是路径,后面都加上/
datax:
home: ${DATAX_HOME:/usr/local/DataX/target/datax/datax/}
host: ${DATAX_HOST:datax01}
port: 22
# 要操作hdfs,用户要有权限
username: ${DATAX_USERNAME:hadoop}
password: ${DATAX_PASSWORD:hadoop}
uploadDicPath: ${DATAX_JSON_FILE_HOME:/home/hadoop/datax/}
azkaban:
host: ${AZKABAN_HOST
//192.168.43.221:8081}
username: ${AZKABAN_USERNAME:azkaban}
password: ${AZKABAN_PASSWORD:azkaban}
可以重命名application-template.yml为application-dev.yml,application.yml指定生效的配置文件
spring:
profiles:
active: ${SPRING_PROFILES_ACTIVE:dev}
这里的mysql2Hive表明是mysql同步到hive,可以更换为mysql2Mysql、hive2Hive、oracle2Hive等,驼峰命名。
这里是mysql数据导入到hive,支持分区
>
POST http://localhost:10024//v1/datax-syncs/execute
Body示例
{
"syncName": "mysql2hive_test_0625_where",
"syncDescription": "mysql2hive_test_0625_where",
"sourceDatasourceType": "mysql",
"sourceDatasourceId": "1",
"writeDatasourceType": "hadoop_hive_2",
"writeDatasourceId": "1",
"jsonFileName": "mysql2hive_test_0625_where.json",
"mysql2Hive": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"reader": {
"splitPk": "",
"username": "root",
"password": "root",
"column": [
"id",
"username"
],
"connection": [{
"table": [
"userinfo"
],
"jdbcUrl": [
"jdbc:mysql://hadoop04:3306/common_datax?useUnicode=true&characterEncoding=utf-8&useSSL=false"
]
}],
"where": "2 > 1"
},
"writer": {
"defaultFS": "hdfs://hadoop04:9000",
"fileType": "text",
"path": "/user/hive/warehouse/test.db/userinfo",
"fileName": "userinfo",
"column": [
{
"name": "id",
"type": "BIGINT"
},
{
"name": "username",
"type": "STRING"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"compress": "",
"hadoopConfig": {
},
"haveKerberos": false,
"kerberosKeytabFilePath": "",
"kerberosPrincipal": ""
}
}
}
path可以更换为分区的hdfs路径,不需提前创建分区,自动创建,例如:
"path": "/user/hive/warehouse/test.db/userinfo_dts/dt1=A1/dt2=B2"
这里会在hive里自动创建userinfo_dts分区表,有两个分区字段,然后会将数据导入到这里的dt1=A1,dt2=B2分区下