1、应用场景分析

  • 数据清洗【实时ETL】
  • 数据报表

1.1、数据报表

1.1.1、架构图

在这里插入图片描述

1.1.2、需求分析

主要针对直播/短视频平台审核指标的统计

  1. 统计不同大区每1 min内过审(上架)的数据量
  2. 统计不同大区每1 min内未过审(下架)的数据量
  3. 统计不同大区每1 min内加黑名单的数据量

2、DataClean代码开发

开发介绍采用的是 Java 代码实现的,完整工程代码及 Scala 代码的实现详见底部 GitHub 代码地址

2.1、MyAggFunction实现

功能: 聚合数据代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package henry.flink.function;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;

/**
 * @Author: Henry
 * @Description: 聚合数据代码
 **/
public class MyAggFunction implements WindowFunction<Tuple3<Long, String, String>, Tuple4<String, String, String, Long>, Tuple, TimeWindow>{
    @Override
    public void apply(Tuple tuple,
                      TimeWindow window,
                      Iterable<Tuple3<Long, String, String>> input,
                      Collector<Tuple4<String, String, String, Long>> out)
            throws Exception {
        //获取分组字段信息
        String type = tuple.getField(0).toString();
        String area = tuple.getField(1).toString();

        Iterator<Tuple3<Long, String, String>> it = input.iterator();

        //存储时间,为了获取最后一条数据的时间
        ArrayList<Long> arrayList = new ArrayList<>();

        long count = 0;
        while (it.hasNext()) {
            Tuple3<Long, String, String> next = it.next();
            arrayList.add(next.f0);
            count++;
        }
        System.err.println(Thread.currentThread().getId()+",window触发了,数据条数:"+count);

        //排序,默认正排
        Collections.sort(arrayList);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String time = sdf.format(new Date(arrayList.get(arrayList.size() - 1)));

        //组装结果
        Tuple4<String, String, String, Long> res = new Tuple4<>(time, type, area, count);
        out.collect(res);
    }
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253

2.2、DataReport实现

主要代码:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/**
 * @Author: Henry
 * @Description: 数据报表
 *
 **/
public class DataReport {
    private static Logger logger = LoggerFactory.getLogger(DataReport.class);

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(5);

        //  设置使用eventtime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // checkpoint配置
		...

        //  配置 kafkaSource
        String topic = "auditLog";     // 审核日志
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "master:9092");
        prop.setProperty("group.id", "con1");

        FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<String>(
                topic, new SimpleStringSchema(),prop);

       /*
       *    获取到kafka的数据
       *   {"dt":"审核时间{年月日 时分秒}", "type":"审核类型","username":"审核人姓名","area":"大区"}
       * */
        DataStreamSource<String> data = env.addSource(myConsumer);

        //   对数据进行清洗
        DataStream<Tuple3<Long, String, String>> mapData = data.map(
                new MapFunction<String, Tuple3<Long, String, String>>() {
            @Override
            public Tuple3<Long, String, String> map(String line) throws Exception {

                JSONObject jsonObject = JSON.parseObject(line);
                String dt = jsonObject.getString("dt");

                long time = 0;
                try {
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    Date parse = sdf.parse(dt);
                    time = parse.getTime();
                } catch (ParseException e) {
                    //也可以把这个日志存储到其他介质中
                    logger.error("时间解析异常,dt:" + dt, e.getCause());
                }
                String type = jsonObject.getString("type");
                String area = jsonObject.getString("area");

                return new Tuple3<>(time, type, area);
            }
        });

        //   过滤掉异常数据
        DataStream<Tuple3<Long, String, String>> filterData = mapData.filter(
                new FilterFunction<Tuple3<Long, String, String>>() {
            @Override
            public boolean filter(Tuple3<Long, String, String> value) throws Exception {
                boolean flag = true;
                if (value.f0 == 0) {    //   即 time 字段为0
                    flag = false;
                }
                return flag;
            }
        });

        //  保存迟到太久的数据
        OutputTag<Tuple3<Long, String, String>> outputTag = new OutputTag<Tuple3<Long, String, String>>("late-data"){};

        /*
         *  窗口统计操作
         * */
        SingleOutputStreamOperator<Tuple4<String, String, String, Long>> resultData = filterData.assignTimestampsAndWatermarks(
                new MyWatermark())
                .keyBy(1, 2)   // 根据第1、2个字段,即type、area分组,第0个字段是timestamp
                .window(TumblingEventTimeWindows.of(Time.minutes(30)))  //  每隔一分钟统计前一分钟的数据
                .allowedLateness(Time.seconds(30))  // 允许迟到30s
                .sideOutputLateData(outputTag)  // 记录迟到太久的数据
                .apply(new MyAggFunction());

        //  获取迟到太久的数据
        DataStream<Tuple3<Long, String, String>> sideOutput = resultData.getSideOutput(outputTag);

        //  存储迟到太久的数据到kafka中
        String outTopic = "lateLog";
        Properties outprop = new Properties();
        outprop.setProperty("bootstrap.servers", "master:9092");
        //	设置事务超时时间
        outprop.setProperty("transaction.timeout.ms", 60000*15+"");

        FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
                outTopic,
                new KeyedSerializationSchemaWrapper<String>(
                        new SimpleStringSchema()),
                        outprop,
                        FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);

        //  迟到太久的数据存储到 kafka 中
        sideOutput.map(new MapFunction<Tuple3<Long, String, String>, String>() {
            @Override
            public String map(Tuple3<Long, String, String> value) throws Exception {
                return value.f0+"\t"+value.f1+"\t"+value.f2;
            }
        }).addSink(myProducer);

        /*
        *   把计算的结存储到 ES 中
        * */
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("master", 9200, "http"));

        ElasticsearchSink.Builder<Tuple4<String, String, String, Long>> esSinkBuilder = new ElasticsearchSink.Builder<
                Tuple4<String, String, String, Long>>(
                httpHosts,
                new ElasticsearchSinkFunction<Tuple4<String, String, String, Long>>() {
                    public IndexRequest createIndexRequest(Tuple4<String, String, String, Long> element) {
                        Map<String, Object> json = new HashMap<>();
                        json.put("time",element.f0);
                        json.put("type",element.f1);
                        json.put("area",element.f2);
                        json.put("count",element.f3);

                        //使用time+type+area 保证id唯一
                        String id = element.f0.replace(" ","_")+"-"+element.f1+"-"+element.f2;

                        return Requests.indexRequest()
                                .index("auditindex")
                                .type("audittype")
                                .id(id)
                                .source(json);
                    }

                    @Override
                    public void process(Tuple4<String, String, String, Long> element,
                                        RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );
        //  ES是有缓冲区的,这里设置1代表,每增加一条数据直接就刷新到ES
        esSinkBuilder.setBulkFlushMaxActions(1);
        resultData.addSink(esSinkBuilder.build());

        env.execute("DataReport");
    }
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154

3、实践运行

3.1、启动 kafka 生产者

kafka 生产者消息数据:

img

创建 kafka topic:

1
2
3
./kafka-topics.sh  --create --topic lateLog --zookeeper localhost:2181 --partitions 5 --replication-factor 1
./kafka-topics.sh  --create --topic auditLog --zookeeper localhost:2181 --partitions 5 --replication-factor 1
12

img

3.2、启动 ES 和 kibana

1. 启动:

1
2
3
4
5
6
# 后台启动 ES 
./bin/elasticsearch -d

# 后台启动 kibana
nohup ./bin/kibana >/dev/null 2>&1 &
12345

2. 启动 UI 访问: 在这里插入图片描述

3. 创建索引: 创建索引脚本代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 注意,如果索引建的有问题,则先删除再重新创建索引
curl -XDELETE 'localhost:9200/auditindex'

# 创建索引
curl -XPUT 'http://master:9200/auditindex?pretty'

# 创建type的mapping信息
curl -H "Content-Type: application/json" -XPOST 'http://master:9200/auditindex/audittype/_mapping?pretty' -d '
{
"audittype":{
	"properties":{
		"area":{"type":"keyword"},
		"type":{"type":"keyword"},
		"count":{"type":"long"},
		"time":{"type":"date","format": "yyyy-MM-dd HH:mm:ss"}
		}
    }
}
'
12345678910111213141516171819
# 执行创建索引脚本
sh -x createindex.sh
12

创建索引执行结果: 在这里插入图片描述

4. 查看 ES 数据:

1
2
3
# 访问网址
http://master:9200/auditindex/_search?pretty=true
12

在这里插入图片描述

3.3、可视化报表

1. Kibana 关联 ES: 在这里插入图片描述 2. Discover 查看数据: 在这里插入图片描述

显示数据差8个小时原因:

在程序中向 ES 写数据没有带时区,就是默认时区,并且会检查浏览器时区

解决方法:

  1. 从代码中设置时间(不变)
  2. 从网页端进行设置(简单)

设置时区方法如下: 在这里插入图片描述 重新刷新并查看数据: 在这里插入图片描述 3. 新建饼图对数据可视化操作: 在这里插入图片描述 4. 新建 Dashboard 添加可视化图: 在这里插入图片描述


Github 工程源码地址

转:https://blog.csdn.net/hongzhen91/article/details/92795891