1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
package com.ongbo.hotAnalysis
import java.sql.Timestamp
import java.util.Properties
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
/*
*定义输入数据的样例类
*/
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)
//定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object HotItems {
def main(args: Array[String]): Unit = {
//1:创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//设置为事件事件
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//2:读取数据
/*kafka源*/
val properties = new Properties()
properties.setProperty("bootstrap.servers","114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008")
properties.setProperty("group.id","web-consumer-group")
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset","latest")
val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(),properties))
// val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/HotItemAnalysis/src/main/resources/UserBehavior.csv")
.map(data =>{
System.out.println("data:"+data)
val dataArray = data.split(",")
// if(dataArray(0).equals("ij"))
UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
//3:transform处理数据
val processStream = dataStream
//筛选出埋点pv数据
.filter(_.behavior.equals("pv"))
//先对itemID进行分组
.keyBy(_.itemId)
//然后设置timeWindow,size为1小时,步长为5分钟的滑动窗口
.timeWindow(Time.seconds(20), Time.seconds(10))
//窗口聚合,按道理说应该不用窗口聚合,但是因为达到的数据可能时间顺序会扰乱,所以聚合后要keyby
.aggregate(new CountAgg(), new WindowResult())
.keyBy(_.windowEnd) //按照窗口分组
.process(new TopNHotItems(10))
//sink:输出数据
processStream.print("processStream::")
// dataStream.print()
//执行
env.execute("hot Items Job")
}
}
/*自定义预聚合函数*/
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{
//累加器初始值
override def createAccumulator(): Long = 0
//每来一次就加一
override def add(in: UserBehavior, acc: Long): Long = acc+1
//
override def getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = acc + acc1
}
//自定义窗口函数,输出ItemViewCount
class WindowResult() extends WindowFunction[Long,ItemViewCount, Long, TimeWindow]{
override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))
}
}
//自定义处理函数
class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {
private var itemState: ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = {
itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))
}
override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {
//把每条数据存入状态列表
itemState.add(value)
//注册一个定时器
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
}
//定时器触发时,对所有的数据排序,并输出结果
override def onTimer(timestamp: Long, ctx: _root_.org.apache.flink.streaming.api.functions.KeyedProcessFunction[Long, _root_.com.ongbo.hotAnalysis.ItemViewCount, _root_.scala.Predef.String]#OnTimerContext, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
//将所有state中的数据取出,放到一个list Buffer中
val allItems: ListBuffer[ItemViewCount] = new ListBuffer()
import scala.collection.JavaConversions._
for(item <- itemState.get()){
allItems += item
}
//按照点计量count大小排序,sortBy默认是升序,并且取前三个
val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topsize)
//清空状态
itemState.clear()
//格式化输出排名结果
val result : StringBuilder = new StringBuilder
result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")
//输出每一个商品信息
for(i<- sortedItems.indices){
val currentItem = sortedItems(i)
result.append("No").append(i+1).append(":")
.append(" 商品ID:").append(currentItem.itemId)
.append(" 浏览量:").append(currentItem.count).append("\n")
}
result.append("============================\n")
//控制输出频率
Thread.sleep(1000)
out.collect(result.toString())
}
}
/*自定义预聚合函数计算平均数*/
class AverageAgg() extends AggregateFunction[UserBehavior, (Long,Int), Double]{
override def createAccumulator(): (Long, Int) = (0L,0)
override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1+in.timestamp, acc._2+1)
override def getResult(acc: (Long, Int)): Double = acc._1 /acc._2
override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1, acc._2+acc1._2)
}
|