项目作者: thestyleofme

项目描述 :
基于DataX的通用数据同步微服务,一个Restful接口搞定所有通用数据同步
高级语言: Java
项目地址: git://github.com/thestyleofme/common-datax.git
创建时间: 2019-04-28T17:20:36Z
项目社区:https://github.com/thestyleofme/common-datax

开源协议:

下载


DEPRECATED!!!不在维护!!!

已优化重构并迁移至datax-admin

datax-admin传送门


common-datax

基于阿里DataX开发一个通用导数的微服务,可以开发前台页面,根据reader和writer自动进行数据同步

本项目只限于同步数据源量很少的时候使用,若是数据源很多的情况,请参考下面的设计思路


由于阿里DataX有一些缺点:

  • 不够自动化
  • 需要手写json
  • 需要手动运行job

搬砖的时间很宝贵,所以:

  • 提供通用数据抽取restful接口
  • HDFS自动创库创表创分区
  • 利用freemarker模板自动创建json文件
  • 自动python执行job
  • 集成Azkaban进行调度管理

例如:mysql到hive

选择mysql需要同步的表、字段等信息,输入导入到hive的库表分区等信息,不需提前在hive进行创库创表创分区,自动根据要导的mysql表以及字段类型进行创建hive库表分区,然后利用freemarker去生成json文件,使用Azkaban进行调度执行,自动创建项目、上传zip、执行流一系列操作,可在Azkaban页面进行查看。当然也提供了可直接远程python执行。

上述设计使用策略实现,只有几个数据源之间相互同步还好,如hive/mysql/oracle三个,策略模式还是不错的,但若是数据源很多的时候,策略模式不是很方便,写的类也成幂次方增加,为了优化开发易维护,只有放弃策略模式,用以下方式,代码我就不推了,有点懒。

设计思路(跟策略模式对比即目前的项目) 后续有时间我推一下新版的设计实现:

  • 摒弃freemarker,DTO直接使用Map映射,Map里传reader、writer、setting的信息
  • 定义WriterService/ReaderService接口,该接口方法处理reader/writer部分的json信息
  • 一个reader/一个writer对应一个类进行处理(使用反射),专门生成reader/writer部分的json,最后加上setting部分生成成一个完整DataX的Job类

伪代码

  1. DataxSyncDTO
  2. /**
  3. * 同步信息,包含以下三个key
  4. * @see BaseReader 子类
  5. * @see BaseWriter 子类
  6. * @see Job.Setting
  7. */
  8. private Map<String, Object> sync;
  9. ReaderService/WriterService
  10. 如可以实现MysqlReaderService/MysqlWriterService
  11. public interface ReaderService<T extends BaseReader> {
  12. /**
  13. * 解析reader
  14. *
  15. * @param tenantId 租户id
  16. * @param datasourceId 数据源ID
  17. * @param reader json
  18. * @return json
  19. */
  20. String parseReader(Long tenantId, Long datasourceId, String reader);
  21. }
  22. 根据名称使用反射找到具体的实现类,序列化出具体的reader/writer部分json
  23. 最后组合成dataxjson
  24. 最终的datax json映射类
  25. public class Job {
  26. private Setting setting;
  27. private List<Content> content;
  28. public static class Setting {
  29. private Speed speed;
  30. private ErrorLimit errorLimit;
  31. }
  32. public static class Speed {
  33. private String record;
  34. private String channel;
  35. private String speedByte;
  36. }
  37. public static class ErrorLimit {
  38. private String record;
  39. private String percentage;
  40. }
  41. public static class Content {
  42. private Reader reader;
  43. private Writer writer;
  44. }
  45. public static class Reader {
  46. private String name;
  47. private Object parameter;
  48. }
  49. public static class Writer {
  50. private String name;
  51. private Object parameter;
  52. }
  53. }

done:

  • oracle、mysql、hive两两互相同步
  • 本地csv文件导入到hive,支持分区
  • 使用Azkaban去执行python脚本进行抽数
  • 一个restful接口,可以实现所有的同步

todo:

  • 创表记录导数的历史
  • json文件下载
  • Azkaban定时调度等
  • 数据源,mysql、hive的数据源维护,下次要导数时,不用传那么多服务器信息
  • groovy脚本

说明

修改配置文件application-template.yml

  1. 数据源修改,根据自己项目情况进行调整

不要修改数据源名称,只需修改为自己的username、password、url即可

  1. datax的信息修改
    1. # 这里只要是路径,后面都加上/
    2. datax:
    3. home: ${DATAX_HOME:/usr/local/DataX/target/datax/datax/}
    4. host: ${DATAX_HOST:datax01}
    5. port: 22
    6. # 要操作hdfs,用户要有权限
    7. username: ${DATAX_USERNAME:hadoop}
    8. password: ${DATAX_PASSWORD:hadoop}
    9. uploadDicPath: ${DATAX_JSON_FILE_HOME:/home/hadoop/datax/}
  2. azkaban的url, 也可以不用azkaban,本项目默认使用azkaban进行调度
    1. azkaban:
    2. host: ${AZKABAN_HOST:http://192.168.43.221:8081}
    3. username: ${AZKABAN_USERNAME:azkaban}
    4. password: ${AZKABAN_PASSWORD:azkaban}

    指定启动配置

可以重命名application-template.yml为application-dev.yml,application.yml指定生效的配置文件

  1. spring:
  2. profiles:
  3. active: ${SPRING_PROFILES_ACTIVE:dev}

swagger地址

> http://localhost:10024/swagger-ui.html

使用示例

这里的mysql2Hive表明是mysql同步到hive,可以更换为mysql2Mysql、hive2Hive、oracle2Hive等,驼峰命名。

1. mysql2hive example

这里是mysql数据导入到hive,支持分区
>

POST http://localhost:10024//v1/datax-syncs/execute

Body示例

  1. {
  2. "syncName": "mysql2hive_test_0625_where",
  3. "syncDescription": "mysql2hive_test_0625_where",
  4. "sourceDatasourceType": "mysql",
  5. "sourceDatasourceId": "1",
  6. "writeDatasourceType": "hadoop_hive_2",
  7. "writeDatasourceId": "1",
  8. "jsonFileName": "mysql2hive_test_0625_where.json",
  9. "mysql2Hive": {
  10. "setting": {
  11. "speed": {
  12. "channel": 3
  13. },
  14. "errorLimit": {
  15. "record": 0,
  16. "percentage": 0.02
  17. }
  18. },
  19. "reader": {
  20. "splitPk": "",
  21. "username": "root",
  22. "password": "root",
  23. "column": [
  24. "id",
  25. "username"
  26. ],
  27. "connection": [{
  28. "table": [
  29. "userinfo"
  30. ],
  31. "jdbcUrl": [
  32. "jdbc:mysql://hadoop04:3306/common_datax?useUnicode=true&characterEncoding=utf-8&useSSL=false"
  33. ]
  34. }],
  35. "where": "2 > 1"
  36. },
  37. "writer": {
  38. "defaultFS": "hdfs://hadoop04:9000",
  39. "fileType": "text",
  40. "path": "/user/hive/warehouse/test.db/userinfo",
  41. "fileName": "userinfo",
  42. "column": [
  43. {
  44. "name": "id",
  45. "type": "BIGINT"
  46. },
  47. {
  48. "name": "username",
  49. "type": "STRING"
  50. }
  51. ],
  52. "writeMode": "append",
  53. "fieldDelimiter": "\t",
  54. "compress": "",
  55. "hadoopConfig": {
  56. },
  57. "haveKerberos": false,
  58. "kerberosKeytabFilePath": "",
  59. "kerberosPrincipal": ""
  60. }
  61. }
  62. }

path可以更换为分区的hdfs路径,不需提前创建分区,自动创建,例如:

  1. "path": "/user/hive/warehouse/test.db/userinfo_dts/dt1=A1/dt2=B2"

这里会在hive里自动创建userinfo_dts分区表,有两个分区字段,然后会将数据导入到这里的dt1=A1,dt2=B2分区下