1、应用场景分析
参考徐崴老师Flink项目
1.1、数据清洗【实时ETL】
1.1.1、需求分析
针对算法产生的日志数据进行清洗拆分
- 算法产生的日志数据是嵌套大JSON格式(json嵌套json),需要拆分打平
- 针对算法中的国家字段进行大区转换
- 最后把不同类型的日志数据分别进行存储
1.1.2、架构图
1.2、新建项目工程
创建一个大的项目,用Maven来维护
每个flink需求,即job创建一个 ”Module”
第一步:
第二步:
在工程项目pom.xml中添加依赖管理,在其中管理各个需求子项目的依赖版本
在依赖管理中添加工程项目相关依赖:
第三步:
在DataClean module中的pom.xml中添加依赖(不需要加入版本):
2、DataClean代码开发
开发介绍采用的是 Java 代码实现的,完整工程代码及 Scala 代码的实现详见底部 GitHub 代码地址
2.1、MyRedisSource实现
功能: 自定义 Redis Source
由于存储的是 国家大区和编码的映射关系
类似于 k-v ,所以返回 HashMap 格式比较好
在 Redis 中保存的国家和大区的关系
Redis中进行数据的初始化,数据格式:
Hash 大区 国家
hset areas; AREA_US US
hset areas; AREA_CT TW,HK
hset areas AREA_AR PK,SA,KW
hset areas AREA_IN IN
需要把大区和国家的对应关系组装成 java 的 hashmap
代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
package henry.flink.customSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: Henry
* @Description: 自定义 Redis Source
* 由于存储的是 国家大区和编码的映射关系
* 类似于 k-v ,所以返回 HashMap 格式比较好
* 需要把大区和国家的对应关系组装成 java 的 hashmap
*
* @Date: Create in 2019/5/25 18:12
**/
public class MyRedisSource implements SourceFunction<HashMap<String,String>>{
private Logger logger = LoggerFactory.getLogger(MyRedisSource.class);
private final long SLEEP_MILLION = 60000 ;
private boolean isrunning = true;
private Jedis jedis = null;
public void run(SourceContext<HashMap<String, String>> ctx) throws Exception {
this.jedis = new Jedis("master", 6379);
// 存储所有国家和大区的对应关系
HashMap<String, String> keyValueMap = new HashMap<String, String>();
while (isrunning){
try{
// 每次执行前先清空,去除旧数据
keyValueMap.clear();
// 取出数据
Map<String, String> areas = jedis.hgetAll("areas");
// 进行迭代
for (Map.Entry<String, String> entry : areas.entrySet()){
String key = entry.getKey(); // 大区:AREA_AR
String value = entry.getValue(); // 国家:PK,SA,KW
String[] splits = value.split(",");
for (String split : splits){
// 这里 split 相当于key, key 是 value
keyValueMap.put(split, key); // 即 PK,AREA_AR
}
}
// 防止取到空数据
if(keyValueMap.size() > 0){
ctx.collect(keyValueMap);
}
else {
logger.warn("从Redis中获取到的数据为空!");
}
// 一分钟提取一次
Thread.sleep(SLEEP_MILLION);
}
// 捕获 Jedis 链接异常
catch (JedisConnectionException e){
// 重新获取链接
jedis = new Jedis("master", 6379);
logger.error("Redis链接异常,重新获取链接", e.getCause());
}// 捕获其他异常处理,通过日志记录
catch (Exception e){
logger.error("Source数据源异常", e.getCause());
}
}
}
/**
* 任务停止,设置 false
* */
public void cancel() {
isrunning = false;
// 这样可以只获取一次连接在while一直用
if(jedis != null){
jedis.close();
}
}
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
|
2.2、DataClean实现
主要代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
// 指定 Kafka Source
String topic = "allData";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "master:9092");
prop.setProperty("group.id", "con1");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<String>(
topic, new SimpleStringSchema(),prop);
// 获取 Kafka 中的数据,Kakfa 数据格式如下:
// {"dt":"2019-01-01 11:11:11", "countryCode":"US","data":[{"type":"s1","score":0.3},{"type":"s1","score":0.3}]}
DataStreamSource<String> data = env.addSource(myConsumer); // 并行度根据 kafka topic partition数设定
// mapData 中存储最新的国家码和大区的映射关系
DataStream<HashMap<String,String>> mapData = env.addSource(new MyRedisSource())
.broadcast(); // 可以把数据发送到后面算子的所有并行实际例中进行计算,否则处理数据丢失数据
// 通过 connect 方法将两个数据流连接在一起,然后再flatMap
DataStream<String> resData = data.connect(mapData).flatMap(
//参数类型代表: data , mapData , 返回结果; Json
new CoFlatMapFunction<String, HashMap<String, String>, String>() {
// 存储国家和大区的映射关系
private HashMap<String, String> allMap = new HashMap<String, String>();
// flatMap1 处理 Kafka 中的数据
public void flatMap1(String value, Collector<String> out)
throws Exception {
// 原数据是 Json 格式
JSONObject jsonObject = JSONObject.parseObject(value);
String dt = jsonObject.getString("dt");
String countryCode = jsonObject.getString("countryCode");
// 获取大区
String area = allMap.get(countryCode);
// 迭代取数据,jsonArray每个数据都是一个jsonobject
JSONArray jsonArray = jsonObject.getJSONArray("data");
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject1 = jsonArray.getJSONObject(i);
System.out.println("areas : - " + area);
jsonObject1.put("area", area);
jsonObject1.put("dt", dt);
out.collect(jsonObject1.toJSONString());
}
}
// flatMap2 处理 Redis 返回的 map 类型的数据
public void flatMap2(HashMap<String, String> value, Collector<String> out)
throws Exception {
this.allMap = value;
}
});
String outTopic = "allDataClean";
Properties outprop= new Properties();
outprop.setProperty("bootstrap.servers", "master:9092");
//设置事务超时时间
outprop.setProperty("transaction.timeout.ms",60000*15+"");
FlinkKafkaProducer011<String> myproducer = new FlinkKafkaProducer011<>(outTopic,
new KeyedSerializationSchemaWrapper<String>(
new SimpleStringSchema()), outprop,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
resData.addSink(myproducer);
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
|
3、实践运行
3.1、Redis
启动redis:
- 先从一个终端启动redis服务
2. 先从一个终端启动redis客户端,并插入数据
1
2
3
4
5
6
7
8
9
10
11
|
./redis-cli
127.0.0.1:6379> hset areas AREA_US US
(integer) 1
127.0.0.1:6379> hset areas AREA_CT TW,HK
(integer) 1
127.0.0.1:6379> hset areas AREA_AR PK,SA,KW
(integer) 1
127.0.0.1:6379> hset areas AREA_IN IN
(integer) 1
127.0.0.1:6379>
12345678910
|
hgetall查看插入数据情况:
3.2、Kafka
启动kafka:
1
2
|
./kafka-server-start.sh -daemon ../config/server.properties
1
|
jps查看启动进程:
kafka创建topc:
1
2
|
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic allData
1
|
创建topic成功:
监控kafka topic:
1
2
|
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic allDataClean
1
|
3.3、启动程序
先启动 DataClean 程序,再启动生产者程序,kafka生产者产生数据如下:
最后终端观察处理输出的数据:
DataStream<HashMap<String,String» mapData = env.addSource(new MyRedisSource()).broadcast();
1
运行结果:
控制台打印结果:
3.4、Flink yarn集群启动
向yarn提交任务:
1
2
|
./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -c henry.flink.DataClean /root/flinkCode/DataClean-1.0-SNAPSHOT-jar-with-dependencies.jar
1
|
任务成功运行启动:
通过 yarn UI 查看任务,并进入Flink job:
程序中设置的并行度:
启动kafka生产者:
监控topic消费情况:
1
2
|
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic allDataClean
1
|
最终终端输出结果, 同IDEA中运行结果:
转:https://blog.csdn.net/hongzhen91/article/details/90549079