项目作者: jiamx

项目描述 :
User behavior log analysis system based on Flink
高级语言: Java
项目地址: git://github.com/jiamx/flink-log-analysis.git
创建时间: 2020-08-30T09:06:48Z
项目社区:https://github.com/jiamx/flink-log-analysis

开源协议:

下载


flink-log-analysis

项目架构图

从0到1基于Flink实现一个实时的用户行为日志分析系统,基本架构图如下:

代码结构

首先会先搭建一个论坛平台,对论坛平台产生的用户点击日志进行分析。然后使用Flume日志收集系统对产生的Apache日志进行收集,并将其推送到Kafka。接着我们使用Flink对日志进行实时分析处理,将处理之后的结果写入MySQL供前端应用可视化展示。本文主要实现以下三个指标计算:

  • 统计热门板块,即访问量最高的板块
  • 统计热门文章,即访问量最高的帖子文章
  • 统计不同客户端对版块和文章的总访问量

基于discuz搭建一个论坛平台

安装XAMPP

  • 下载
  1. wget https://www.apachefriends.org/xampp-files/5.6.33/xampp-linux-x64-5.6.33-0-installer.run
  • 安装
  1. # 赋予文件执行权限
  2. chmod u+x xampp-linux-x64-5.6.33-0-installer.run
  3. # 运行安装文件
  4. ./xampp-linux-x64-5.6.33-0-installer.run
  • 配置环境变量

    将以下内容加入到 ~/.bash_profile

  1. export XAMPP=/opt/lampp/
  2. export PATH=$PATH:$XAMPP:$XAMPP/bin
  • 刷新环境变量
  1. source ~/.bash_profile
  • 启动XAMPP
  1. xampp restart
  • MySQL的root用户密码和权限修改
  1. #修改root用户密码为123qwe
  2. update mysql.user set password=PASSWORD('123qwe') where user='root';
  3. flush privileges;
  4. #赋予root用户远程登录权限
  5. grant all privileges on *.* to 'root'@'%' identified by '123qwe' with grant option;
  6. flush privileges;

安装Discuz

  • 下载discuz
  1. wget http://download.comsenz.com/DiscuzX/3.2/Discuz_X3.2_SC_UTF8.zip
  • 安装
  1. #删除原有的web应用
  2. rm -rf /opt/lampp/htdocs/*
  3. unzip Discuz_X3.2_SC_UTF8.zip d /opt/lampp/htdocs/
  4. cd /opt/lampp/htdocs/
  5. mv upload/*
  6. #修改目录权限
  7. chmod 777 -R /opt/lampp/htdocs/config/
  8. chmod 777 -R /opt/lampp/htdocs/data/
  9. chmod 777 -R /opt/lampp/htdocs/uc_client/
  10. chmod 777 -R /opt/lampp/htdocs/uc_server/

Discuz基本操作

  • 自定义版块
  • 进入discuz后台:http://kms-4/admin.php
  • 点击顶部的论坛菜单
  • 按照页面提示创建所需版本,可以创建父子版块

Discuz帖子/版块存储数据库表介

  1. -- 登录ultrax数据库
  2. mysql -uroot -p123 ultrax
  3. -- 查看包含帖子id及标题对应关系的表
  4. -- tid, subject(文章id、标题)
  5. select tid, subject from pre_forum_post limit 10;
  6. -- fid, name(版块id、标题)
  7. select fid, name from pre_forum_forum limit 40;

当我们在各个板块添加帖子之后,如下所示:

修改日志格式

  • 查看访问日志
  1. # 日志默认地址
  2. /opt/lampp/logs/access_log
  3. # 实时查看日志命令
  4. tail f /opt/lampp/logs/access_log
  • 修改日志格式

Apache配置文件名称为httpd.conf,完整路径为/opt/lampp/etc/httpd.conf。由于默认的日志类型为common类型,总共有7个字段。为了获取更多的日志信息,我们需要将其格式修改为combined格式,该日志格式共有9个字段。修改方式如下:

  1. # 启用组合日志文件
  2. CustomLog "logs/access_log" combined

  • 重新加载配置文件
  1. xampp reload

Apache日志格式介绍

  1. 192.168.10.1 - - [30/Aug/2020:15:53:15 +0800] "GET /forum.php?mod=forumdisplay&fid=43 HTTP/1.1" 200 30647 "http://kms-4/forum.php" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36"

上面的日志格式共有9个字段,分别用空格隔开。每个字段的具体含义如下:

  1. 192.168.10.1 ##(1)客户端的IP地址
  2. - ## (2)客户端identity标识,该字段为"-"
  3. - ## (3)客户端userid标识,该字段为"-"
  4. [30/Aug/2020:15:53:15 +0800] ## (4)服务器完成请求处理时的时间
  5. "GET /forum.php?mod=forumdisplay&fid=43 HTTP/1.1" ## (5)请求类型 请求的资源 使用的协议
  6. 200 ## (6)服务器返回给客户端的状态码,200表示成功
  7. 30647 ## (7)返回给客户端不包括响应头的字节数,如果没有信息返回,则此项应该是"-"
  8. "http://kms-4/forum.php" ## (8)Referer请求头
  9. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36" ## (9)客户端的浏览器信息

关于上面的日志格式,可以使用正则表达式进行匹配:

  1. (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) (\S+) (\S+) (\[.+?\]) (\"(.*?)\") (\d{3}) (\S+) (\"(.*?)\") (\"(.*?)\")

Flume与Kafka集成

本文使用Flume对产生的Apache日志进行收集,然后推送至Kafka。需要启动Flume agent对日志进行收集,对应的配置文件如下:

  1. # agent的名称为a1
  2. a1.sources = source1
  3. a1.channels = channel1
  4. a1.sinks = sink1
  5. # set source
  6. a1.sources.source1.type = TAILDIR
  7. a1.sources.source1.filegroups = f1
  8. a1.sources.source1.filegroups.f1 = /opt/lampp/logs/access_log
  9. a1sources.source1.fileHeader = flase
  10. # 配置sink
  11. a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
  12. a1.sinks.sink1.brokerList=kms-2:9092,kms-3:9092,kms-4:9092
  13. a1.sinks.sink1.topic= user_access_logs
  14. a1.sinks.sink1.kafka.flumeBatchSize = 20
  15. a1.sinks.sink1.kafka.producer.acks = 1
  16. a1.sinks.sink1.kafka.producer.linger.ms = 1
  17. a1.sinks.sink1.kafka.producer.compression.type = snappy
  18. # 配置channel
  19. a1.channels.channel1.type = file
  20. a1.channels.channel1.checkpointDir = /home/kms/data/flume_data/checkpoint
  21. a1.channels.channel1.dataDirs= /home/kms/data/flume_data/data
  22. # 配置bind
  23. a1.sources.source1.channels = channel1
  24. a1.sinks.sink1.channel = channel1

知识点:

Taildir Source相比Exec SourceSpooling Directory Source的优势是什么?

TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传

Exec Source:可以实时收集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失

Spooling Directory Source:监控目录,不支持断点续传

值得注意的是,上面的配置是直接将原始日志push到Kafka。除此之外,我们还可以自定义Flume的拦截器对原始日志先进行过滤处理,同时也可以实现将不同的日志push到Kafka的不同Topic中。

启动Flume Agent

将启动Agent的命令封装成shell脚本:start-log-collection.sh ,脚本内容如下:

  1. #!/bin/bash
  2. echo "start log agent !!!"
  3. /opt/modules/apache-flume-1.9.0-bin/bin/flume-ng agent --conf-file /opt/modules/apache-flume-1.9.0-bin/conf/log_collection.conf --name a1 -Dflume.root.logger=INFO,console

查看push到Kafka的日志数据

将控制台消费者命令封装成shell脚本:kafka-consumer.sh,脚本内容如下:

  1. #!/bin/bash
  2. echo "kafka consumer "
  3. bin/kafka-console-consumer.sh --bootstrap-server kms-2.apache.com:9092,kms-3.apache.com:9092,kms-4.apache.com:9092 --topic $1 --from-beginning

使用下面命令消费Kafka中的数据:

  1. [kms@kms-2 kafka_2.11-2.1.0]$ ./kafka-consumer.sh user_access_logs

日志分析处理流程

创建MySQL数据库和目标表

  1. -- 客户端访问量统计
  2. CREATE TABLE `client_ip_access` (
  3. `client_ip` char(50) NOT NULL COMMENT '客户端ip',
  4. `client_access_cnt` bigint(20) NOT NULL COMMENT '访问次数',
  5. `statistic_time` text NOT NULL COMMENT '统计时间',
  6. PRIMARY KEY (`client_ip`)
  7. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  8. -- 热门文章统计
  9. CREATE TABLE `hot_article` (
  10. `article_id` int(10) NOT NULL COMMENT '文章id',
  11. `subject` varchar(80) NOT NULL COMMENT '文章标题',
  12. `article_pv` bigint(20) NOT NULL COMMENT '访问次数',
  13. `statistic_time` text NOT NULL COMMENT '统计时间',
  14. PRIMARY KEY (`article_id`)
  15. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  16. -- 热门板块统计
  17. CREATE TABLE `hot_section` (
  18. `section_id` int(10) NOT NULL COMMENT '版块id',
  19. `name` char(50) NOT NULL COMMENT '版块标题',
  20. `section_pv` bigint(20) NOT NULL COMMENT '访问次数',
  21. `statistic_time` text NOT NULL COMMENT '统计时间',
  22. PRIMARY KEY (`section_id`)
  23. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

AccessLogRecord类

该类封装了日志所包含的字段数据,共有9个字段。

  1. /**
  2. * 使用lombok
  3. * 原始日志封装类
  4. */
  5. @Data
  6. public class AccessLogRecord {
  7. public String clientIpAddress; // 客户端ip地址
  8. public String clientIdentity; // 客户端身份标识,该字段为 `-`
  9. public String remoteUser; // 用户标识,该字段为 `-`
  10. public String dateTime; //日期,格式为[day/month/yearhourminutesecond zone]
  11. public String request; // url请求,如:`GET /foo ...`
  12. public String httpStatusCode; // 状态码,如:200; 404.
  13. public String bytesSent; // 传输的字节数,有可能是 `-`
  14. public String referer; // 参考链接,即来源页
  15. public String userAgent; // 浏览器和操作系统类型
  16. }

LogParse类

该类是日志解析类,通过正则表达式对日志进行匹配,对匹配上的日志进行按照字段解析。

  1. public class LogParse implements Serializable {
  2. //构建正则表达式
  3. private String regex = "(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}) (\\S+) (\\S+) (\\[.+?\\]) (\\\"(.*?)\\\") (\\d{3}) (\\S+) (\\\"(.*?)\\\") (\\\"(.*?)\\\")";
  4. private Pattern p = Pattern.compile(regex);
  5. /*
  6. *构造访问日志的封装类对象
  7. * */
  8. public AccessLogRecord buildAccessLogRecord(Matcher matcher) {
  9. AccessLogRecord record = new AccessLogRecord();
  10. record.setClientIpAddress(matcher.group(1));
  11. record.setClientIdentity(matcher.group(2));
  12. record.setRemoteUser(matcher.group(3));
  13. record.setDateTime(matcher.group(4));
  14. record.setRequest(matcher.group(5));
  15. record.setHttpStatusCode(matcher.group(6));
  16. record.setBytesSent(matcher.group(7));
  17. record.setReferer(matcher.group(8));
  18. record.setUserAgent(matcher.group(9));
  19. return record;
  20. }
  21. /**
  22. * @param record:record表示一条apache combined 日志
  23. * @return 解析日志记录,将解析的日志封装成一个AccessLogRecord类
  24. */
  25. public AccessLogRecord parseRecord(String record) {
  26. Matcher matcher = p.matcher(record);
  27. if (matcher.find()) {
  28. return buildAccessLogRecord(matcher);
  29. }
  30. return null;
  31. }
  32. /**
  33. * @param request url请求,类型为字符串,类似于 "GET /the-uri-here HTTP/1.1"
  34. * @return 一个三元组(requestType, uri, httpVersion). requestType表示请求类型,如GET, POST等
  35. */
  36. public Tuple3<String, String, String> parseRequestField(String request) {
  37. //请求的字符串格式为:“GET /test.php HTTP/1.1”,用空格切割
  38. String[] arr = request.split(" ");
  39. if (arr.length == 3) {
  40. return Tuple3.of(arr[0], arr[1], arr[2]);
  41. } else {
  42. return null;
  43. }
  44. }
  45. /**
  46. * 将apache日志中的英文日期转化为指定格式的中文日期
  47. *
  48. * @param dateTime 传入的apache日志中的日期字符串,"[21/Jul/2009:02:48:13 -0700]"
  49. * @return
  50. */
  51. public String parseDateField(String dateTime) throws ParseException {
  52. // 输入的英文日期格式
  53. String inputFormat = "dd/MMM/yyyy:HH:mm:ss";
  54. // 输出的日期格式
  55. String outPutFormat = "yyyy-MM-dd HH:mm:ss";
  56. String dateRegex = "\\[(.*?) .+]";
  57. Pattern datePattern = Pattern.compile(dateRegex);
  58. Matcher dateMatcher = datePattern.matcher(dateTime);
  59. if (dateMatcher.find()) {
  60. String dateString = dateMatcher.group(1);
  61. SimpleDateFormat dateInputFormat = new SimpleDateFormat(inputFormat, Locale.ENGLISH);
  62. Date date = dateInputFormat.parse(dateString);
  63. SimpleDateFormat dateOutFormat = new SimpleDateFormat(outPutFormat);
  64. String formatDate = dateOutFormat.format(date);
  65. return formatDate;
  66. } else {
  67. return "";
  68. }
  69. }
  70. /**
  71. * 解析request,即访问页面的url信息解析
  72. * "GET /about/forum.php?mod=viewthread&tid=5&extra=page%3D1 HTTP/1.1"
  73. * 匹配出访问的fid:版本id
  74. * 以及tid:文章id
  75. * @param request
  76. * @return
  77. */
  78. public Tuple2<String, String> parseSectionIdAndArticleId(String request) {
  79. // 匹配出前面是"forumdisplay&fid="的数字记为版块id
  80. String sectionIdRegex = "(\\?mod=forumdisplay&fid=)(\\d+)";
  81. Pattern sectionPattern = Pattern.compile(sectionIdRegex);
  82. // 匹配出前面是"tid="的数字记为文章id
  83. String articleIdRegex = "(\\?mod=viewthread&tid=)(\\d+)";
  84. Pattern articlePattern = Pattern.compile(articleIdRegex);
  85. String[] arr = request.split(" ");
  86. String sectionId = "";
  87. String articleId = "";
  88. if (arr.length == 3) {
  89. Matcher sectionMatcher = sectionPattern.matcher(arr[1]);
  90. Matcher articleMatcher = articlePattern.matcher(arr[1]);
  91. sectionId = (sectionMatcher.find()) ? sectionMatcher.group(2) : "";
  92. articleId = (articleMatcher.find()) ? articleMatcher.group(2) : "";
  93. }
  94. return Tuple2.of(sectionId, articleId);
  95. }
  96. }

LogAnalysis类

该类是日志处理的基本逻辑

  1. public class LogAnalysis {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
  4. // 开启checkpoint,时间间隔为毫秒
  5. senv.enableCheckpointing(5000L);
  6. // 选择状态后端
  7. // 本地测试
  8. // senv.setStateBackend(new FsStateBackend("file:///E://checkpoint"));
  9. // 集群运行
  10. senv.setStateBackend(new FsStateBackend("hdfs://kms-1:8020/flink-checkpoints"));
  11. // 重启策略
  12. senv.setRestartStrategy(
  13. RestartStrategies.fixedDelayRestart(3, Time.of(2, TimeUnit.SECONDS) ));
  14. EnvironmentSettings settings = EnvironmentSettings.newInstance()
  15. .useBlinkPlanner()
  16. .inStreamingMode()
  17. .build();
  18. StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv, settings);
  19. // kafka参数配置
  20. Properties props = new Properties();
  21. // kafka broker地址
  22. props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092");
  23. // 消费者组
  24. props.put("group.id", "log_consumer");
  25. // kafka 消息的key序列化器
  26. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  27. // kafka 消息的value序列化器
  28. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  29. props.put("auto.offset.reset", "earliest");
  30. FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(
  31. "user_access_logs",
  32. new SimpleStringSchema(),
  33. props);
  34. DataStreamSource<String> logSource = senv.addSource(kafkaConsumer);
  35. // 获取有效的日志数据
  36. DataStream<AccessLogRecord> availableAccessLog = LogAnalysis.getAvailableAccessLog(logSource);
  37. // 获取[clienIP,accessDate,sectionId,articleId]
  38. DataStream<Tuple4<String, String, Integer, Integer>> fieldFromLog = LogAnalysis.getFieldFromLog(availableAccessLog);
  39. //从DataStream中创建临时视图,名称为logs
  40. // 添加一个计算字段:proctime,用于维表JOIN
  41. tEnv.createTemporaryView("logs",
  42. fieldFromLog,
  43. $("clientIP"),
  44. $("accessDate"),
  45. $("sectionId"),
  46. $("articleId"),
  47. $("proctime").proctime());
  48. // 需求1:统计热门板块
  49. LogAnalysis.getHotSection(tEnv);
  50. // 需求2:统计热门文章
  51. LogAnalysis.getHotArticle(tEnv);
  52. // 需求3:统计不同客户端ip对版块和文章的总访问量
  53. LogAnalysis.getClientAccess(tEnv);
  54. senv.execute("log-analysisi");
  55. }
  56. /**
  57. * 统计不同客户端ip对版块和文章的总访问量
  58. * @param tEnv
  59. */
  60. private static void getClientAccess(StreamTableEnvironment tEnv) {
  61. // sink表
  62. // [client_ip,client_access_cnt,statistic_time]
  63. // [客户端ip,访问次数,统计时间]
  64. String client_ip_access_ddl = "" +
  65. "CREATE TABLE client_ip_access (\n" +
  66. " client_ip STRING ,\n" +
  67. " client_access_cnt BIGINT,\n" +
  68. " statistic_time STRING,\n" +
  69. " PRIMARY KEY (client_ip) NOT ENFORCED\n" +
  70. ")WITH (\n" +
  71. " 'connector' = 'jdbc',\n" +
  72. " 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n" +
  73. " 'table-name' = 'client_ip_access', \n" +
  74. " 'driver' = 'com.mysql.jdbc.Driver',\n" +
  75. " 'username' = 'root',\n" +
  76. " 'password' = '123qwe'\n" +
  77. ") ";
  78. tEnv.executeSql(client_ip_access_ddl);
  79. String client_ip_access_sql = "" +
  80. "INSERT INTO client_ip_access\n" +
  81. "SELECT\n" +
  82. " clientIP,\n" +
  83. " count(1) AS access_cnt,\n" +
  84. " FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time\n" +
  85. "FROM\n" +
  86. " logs \n" +
  87. "WHERE\n" +
  88. " articleId <> 0 \n" +
  89. " OR sectionId <> 0 \n" +
  90. "GROUP BY\n" +
  91. " clientIP "
  92. ;
  93. tEnv.executeSql(client_ip_access_sql);
  94. }
  95. /**
  96. * 统计热门文章
  97. * @param tEnv
  98. */
  99. private static void getHotArticle(StreamTableEnvironment tEnv) {
  100. // JDBC数据源
  101. // 文章id及标题对应关系的表,[tid, subject]分别为:文章id和标题
  102. String pre_forum_post_ddl = "" +
  103. "CREATE TABLE pre_forum_post (\n" +
  104. " tid INT,\n" +
  105. " subject STRING,\n" +
  106. " PRIMARY KEY (tid) NOT ENFORCED\n" +
  107. ") WITH (\n" +
  108. " 'connector' = 'jdbc',\n" +
  109. " 'url' = 'jdbc:mysql://kms-4:3306/ultrax',\n" +
  110. " 'table-name' = 'pre_forum_post', \n" +
  111. " 'driver' = 'com.mysql.jdbc.Driver',\n" +
  112. " 'username' = 'root',\n" +
  113. " 'password' = '123qwe'\n" +
  114. ")";
  115. // 创建pre_forum_post数据源
  116. tEnv.executeSql(pre_forum_post_ddl);
  117. // 创建MySQL的sink表
  118. // [article_id,subject,article_pv,statistic_time]
  119. // [文章id,标题名称,访问次数,统计时间]
  120. String hot_article_ddl = "" +
  121. "CREATE TABLE hot_article (\n" +
  122. " article_id INT,\n" +
  123. " subject STRING,\n" +
  124. " article_pv BIGINT ,\n" +
  125. " statistic_time STRING,\n" +
  126. " PRIMARY KEY (article_id) NOT ENFORCED\n" +
  127. ")WITH (\n" +
  128. " 'connector' = 'jdbc',\n" +
  129. " 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n" +
  130. " 'table-name' = 'hot_article', \n" +
  131. " 'driver' = 'com.mysql.jdbc.Driver',\n" +
  132. " 'username' = 'root',\n" +
  133. " 'password' = '123qwe'\n" +
  134. ")";
  135. tEnv.executeSql(hot_article_ddl);
  136. // 向MySQL目标表insert数据
  137. String hot_article_sql = "" +
  138. "INSERT INTO hot_article\n" +
  139. "SELECT \n" +
  140. " a.articleId,\n" +
  141. " b.subject,\n" +
  142. " count(1) as article_pv,\n" +
  143. " FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time\n" +
  144. "FROM logs a \n" +
  145. " JOIN pre_forum_post FOR SYSTEM_TIME AS OF a.proctime as b ON a.articleId = b.tid\n" +
  146. "WHERE a.articleId <> 0\n" +
  147. "GROUP BY a.articleId,b.subject\n" +
  148. "ORDER BY count(1) desc\n" +
  149. "LIMIT 10";
  150. tEnv.executeSql(hot_article_sql);
  151. }
  152. /**
  153. * 统计热门板块
  154. *
  155. * @param tEnv
  156. */
  157. public static void getHotSection(StreamTableEnvironment tEnv) {
  158. // 板块id及其名称对应关系表,[fid, name]分别为:版块id和板块名称
  159. String pre_forum_forum_ddl = "" +
  160. "CREATE TABLE pre_forum_forum (\n" +
  161. " fid INT,\n" +
  162. " name STRING,\n" +
  163. " PRIMARY KEY (fid) NOT ENFORCED\n" +
  164. ") WITH (\n" +
  165. " 'connector' = 'jdbc',\n" +
  166. " 'url' = 'jdbc:mysql://kms-4:3306/ultrax',\n" +
  167. " 'table-name' = 'pre_forum_forum', \n" +
  168. " 'driver' = 'com.mysql.jdbc.Driver',\n" +
  169. " 'username' = 'root',\n" +
  170. " 'password' = '123qwe',\n" +
  171. " 'lookup.cache.ttl' = '10',\n" +
  172. " 'lookup.cache.max-rows' = '1000'" +
  173. ")";
  174. // 创建pre_forum_forum数据源
  175. tEnv.executeSql(pre_forum_forum_ddl);
  176. // 创建MySQL的sink表
  177. // [section_id,name,section_pv,statistic_time]
  178. // [板块id,板块名称,访问次数,统计时间]
  179. String hot_section_ddl = "" +
  180. "CREATE TABLE hot_section (\n" +
  181. " section_id INT,\n" +
  182. " name STRING ,\n" +
  183. " section_pv BIGINT,\n" +
  184. " statistic_time STRING,\n" +
  185. " PRIMARY KEY (section_id) NOT ENFORCED \n" +
  186. ") WITH (\n" +
  187. " 'connector' = 'jdbc',\n" +
  188. " 'url' = 'jdbc:mysql://kms-4:3306/statistics?useUnicode=true&characterEncoding=utf-8',\n" +
  189. " 'table-name' = 'hot_section', \n" +
  190. " 'driver' = 'com.mysql.jdbc.Driver',\n" +
  191. " 'username' = 'root',\n" +
  192. " 'password' = '123qwe'\n" +
  193. ")";
  194. // 创建sink表:hot_section
  195. tEnv.executeSql(hot_section_ddl);
  196. //统计热门板块
  197. // 使用日志流与MySQL的维表数据进行JOIN
  198. // 从而获取板块名称
  199. String hot_section_sql = "" +
  200. "INSERT INTO hot_section\n" +
  201. "SELECT\n" +
  202. " a.sectionId,\n" +
  203. " b.name,\n" +
  204. " count(1) as section_pv,\n" +
  205. " FROM_UNIXTIME(UNIX_TIMESTAMP()) AS statistic_time \n" +
  206. "FROM\n" +
  207. " logs a\n" +
  208. " JOIN pre_forum_forum FOR SYSTEM_TIME AS OF a.proctime as b ON a.sectionId = b.fid \n" +
  209. "WHERE\n" +
  210. " a.sectionId <> 0 \n" +
  211. "GROUP BY a.sectionId, b.name\n" +
  212. "ORDER BY count(1) desc\n" +
  213. "LIMIT 10";
  214. // 执行数据insert
  215. tEnv.executeSql(hot_section_sql);
  216. }
  217. /**
  218. * 获取[clienIP,accessDate,sectionId,articleId]
  219. * 分别为客户端ip,访问日期,板块id,文章id
  220. *
  221. * @param logRecord
  222. * @return
  223. */
  224. public static DataStream<Tuple4<String, String, Integer, Integer>> getFieldFromLog(DataStream<AccessLogRecord> logRecord) {
  225. DataStream<Tuple4<String, String, Integer, Integer>> fieldFromLog = logRecord.map(new MapFunction<AccessLogRecord, Tuple4<String, String, Integer, Integer>>() {
  226. @Override
  227. public Tuple4<String, String, Integer, Integer> map(AccessLogRecord accessLogRecord) throws Exception {
  228. LogParse parse = new LogParse();
  229. String clientIpAddress = accessLogRecord.getClientIpAddress();
  230. String dateTime = accessLogRecord.getDateTime();
  231. String request = accessLogRecord.getRequest();
  232. String formatDate = parse.parseDateField(dateTime);
  233. Tuple2<String, String> sectionIdAndArticleId = parse.parseSectionIdAndArticleId(request);
  234. if (formatDate == "" || sectionIdAndArticleId == Tuple2.of("", "")) {
  235. return new Tuple4<String, String, Integer, Integer>("0.0.0.0", "0000-00-00 00:00:00", 0, 0);
  236. }
  237. Integer sectionId = (sectionIdAndArticleId.f0 == "") ? 0 : Integer.parseInt(sectionIdAndArticleId.f0);
  238. Integer articleId = (sectionIdAndArticleId.f1 == "") ? 0 : Integer.parseInt(sectionIdAndArticleId.f1);
  239. return new Tuple4<>(clientIpAddress, formatDate, sectionId, articleId);
  240. }
  241. });
  242. return fieldFromLog;
  243. }
  244. /**
  245. * 筛选可用的日志记录
  246. *
  247. * @param accessLog
  248. * @return
  249. */
  250. public static DataStream<AccessLogRecord> getAvailableAccessLog(DataStream<String> accessLog) {
  251. final LogParse logParse = new LogParse();
  252. //解析原始日志,将其解析为AccessLogRecord格式
  253. DataStream<AccessLogRecord> filterDS = accessLog.map(new MapFunction<String, AccessLogRecord>() {
  254. @Override
  255. public AccessLogRecord map(String log) throws Exception {
  256. return logParse.parseRecord(log);
  257. }
  258. }).filter(new FilterFunction<AccessLogRecord>() {
  259. //过滤掉无效日志
  260. @Override
  261. public boolean filter(AccessLogRecord accessLogRecord) throws Exception {
  262. return !(accessLogRecord == null);
  263. }
  264. }).filter(new FilterFunction<AccessLogRecord>() {
  265. //过滤掉状态码非200的记录,即保留请求成功的日志记录
  266. @Override
  267. public boolean filter(AccessLogRecord accessLogRecord) throws Exception {
  268. return !accessLogRecord.getHttpStatusCode().equals("200");
  269. }
  270. });
  271. return filterDS;
  272. }
  273. }

将上述代码打包上传到集群运行,在执行提交命令之前,需要先将Hadoop的依赖jar包放置在Flink安装目录下的lib文件下:flink-shaded-hadoop-2-uber-2.7.5-10.0.jar,因为我们配置了HDFS上的状态后端,而Flink的release包不含有Hadoop的依赖Jar包。

否则会报如下错误:

  1. Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.

提交到集群

编写提交命令脚本

  1. #!/bin/bash
  2. /opt/modules/flink-1.11.1/bin/flink run -m kms-1:8081 \
  3. -c com.jmx.analysis.LogAnalysis \
  4. /opt/softwares/com.jmx-1.0-SNAPSHOT.jar

提交之后,访问Flink的Web界面,查看任务:

此时访问论坛,点击板块和帖子文章,观察数据库变化:

总结

本文主要分享了从0到1构建一个用户行为日志分析系统。首先,基于discuz搭建了论坛平台,针对论坛产生的日志,使用Flume进行收集并push到Kafka中;接着使用Flink对其进行分析处理;最后将处理结果写入MySQL供可视化展示使用。