实时热门统计

操作步骤:

  • 先从Kafka读取消费数据
  • 使用map算子对数据进行预处理
  • 过滤数据,只留住pv数据
  • 使用timewindow,每隔10秒创建一个20秒的window
  • 然后将窗口自定义预聚合,并且兹定于窗口函数,按指定输入输出case操作数据
  • 上面操作时候返回的是DataStream,那么就根据timestampEnd进行keyby
  • 使用底层API操作,对每个时间窗口内的数据进行排序,取top
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package com.ongbo.hotAnalysis

import java.sql.Timestamp
import java.util.Properties

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

/*
*定义输入数据的样例类
 */
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)
//定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

object HotItems {
  def main(args: Array[String]): Unit = {
    //1:创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //设置为事件事件
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //2:读取数据

    /*kafka源*/
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008")
    properties.setProperty("group.id","web-consumer-group")
    properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset","latest")
    val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(),properties))
//    val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/HotItemAnalysis/src/main/resources/UserBehavior.csv")
      .map(data =>{
        System.out.println("data:"+data)
        val dataArray = data.split(",")
//        if(dataArray(0).equals("ij"))
        UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)

      })
      .assignAscendingTimestamps(_.timestamp * 1000L)

    //3:transform处理数据
    val processStream = dataStream
      //筛选出埋点pv数据
      .filter(_.behavior.equals("pv"))
      //先对itemID进行分组
      .keyBy(_.itemId)
      //然后设置timeWindow,size为1小时,步长为5分钟的滑动窗口
      .timeWindow(Time.seconds(20), Time.seconds(10))
      //窗口聚合,按道理说应该不用窗口聚合,但是因为达到的数据可能时间顺序会扰乱,所以聚合后要keyby
      .aggregate(new CountAgg(), new WindowResult())
      .keyBy(_.windowEnd)      //按照窗口分组

      .process(new TopNHotItems(10))

    //sink:输出数据
    processStream.print("processStream::")
//    dataStream.print()
    //执行
    env.execute("hot Items Job")

  }
}

/*自定义预聚合函数*/
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{
  //累加器初始值
  override def createAccumulator(): Long = 0
  //每来一次就加一
  override def add(in: UserBehavior, acc: Long): Long = acc+1
  //
  override def getResult(acc: Long): Long = acc

  override def merge(acc: Long, acc1: Long): Long = acc + acc1
}

//自定义窗口函数,输出ItemViewCount
class WindowResult() extends WindowFunction[Long,ItemViewCount, Long, TimeWindow]{
  override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit =  {
    out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))
  }
}

//自定义处理函数
class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {
  private var itemState: ListState[ItemViewCount] = _

  override def open(parameters: Configuration): Unit = {
    itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))

  }
  override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {
    //把每条数据存入状态列表
    itemState.add(value)
    //注册一个定时器
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
  }
  //定时器触发时,对所有的数据排序,并输出结果
  override def onTimer(timestamp: Long, ctx: _root_.org.apache.flink.streaming.api.functions.KeyedProcessFunction[Long, _root_.com.ongbo.hotAnalysis.ItemViewCount, _root_.scala.Predef.String]#OnTimerContext, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
    //将所有state中的数据取出,放到一个list Buffer中
    val allItems: ListBuffer[ItemViewCount] = new ListBuffer()
    import scala.collection.JavaConversions._
    for(item <- itemState.get()){
      allItems += item
    }

    //按照点计量count大小排序,sortBy默认是升序,并且取前三个
    val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topsize)

    //清空状态
    itemState.clear()

    //格式化输出排名结果
    val result : StringBuilder = new StringBuilder
    result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")
    //输出每一个商品信息
    for(i<- sortedItems.indices){
      val currentItem = sortedItems(i)
      result.append("No").append(i+1).append(":")
        .append("  商品ID:").append(currentItem.itemId)
        .append("  浏览量:").append(currentItem.count).append("\n")
    }
    result.append("============================\n")
    //控制输出频率
    Thread.sleep(1000)
    out.collect(result.toString())
  }
}
/*自定义预聚合函数计算平均数*/
class AverageAgg() extends AggregateFunction[UserBehavior, (Long,Int), Double]{
  override def createAccumulator(): (Long, Int) = (0L,0)

  override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1+in.timestamp, acc._2+1)

  override def getResult(acc: (Long, Int)): Double = acc._1 /acc._2

  override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1, acc._2+acc1._2)
}

实时PV统计

这里按道理应该也要从Kafka读取数据的,但是这里暂时先从本地读,因为当时本地网络的原因,暂时不在服务器上创建数据,而直接用本地的。

这个很简单,直接创建滚动窗口,从而能够计算一个小时的PV,然后每隔一个小时更新一次。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.ongbo.NetWorkFlow_Analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/*
*定义输入数据的样例类
 */
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)

object PageVies {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    //用相对路径定义数据集
    val resource = getClass.getResource("/UserBehavior.csv")
    val dataStream = env.readTextFile(resource.getPath)
      .map(data =>{
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)
      .filter(_.behavior.equals("pv"))
      .map(data => ("pv", 1))
      .keyBy(_._1)
      .timeWindow(Time.hours(1))
      .sum(1)
    dataStream.print("pv count")
    env.execute("PV")
  }
}

实时UV统计:布隆过滤器

我们统计UV需要注意,很多重复的user会占用到内存,所以我们采用布隆过滤器优化,减少Flink缓存user从而降低性能。而且将数据count保存在Redis,可以给后端使用的。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.ongbo.NetWorkFlow_Analysis

import com.ongbo.NetWorkFlow_Analysis.UniqueView.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis

object UvWithBloom {
 def main(args: Array[String]): Unit = {
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   env.setParallelism(1)

   //用相对路径定义数据集
   val resource = getClass.getResource("/UserBehavior.csv")
   val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/NetWorkFlowAnalysis/src/main/resources/UserBehavior.csv")
     .map(data =>{
       val dataArray = data.split(",")
       UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)
     })
     .assignAscendingTimestamps(_.timestamp * 1000L)
     .filter(_.behavior.equals("pv"))
     .map( data => ("dummyKey",data.userId))
     .keyBy(_._1)
     .timeWindow(Time.hours(1))
     .trigger(new MyTrigger())
     .process(new UvCountWithBloom())

   dataStream.print()
   env.execute()
 }
}

//自定义窗口触发器
class MyTrigger() extends Trigger[(String,Long),TimeWindow]{
  override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    //每来一条数据就直接触发窗口操作,并清空所有状态
    TriggerResult.FIRE_AND_PURGE
  }

  override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}
class UvCountWithBloom() extends ProcessWindowFunction[(String,Long),UvCount,String, TimeWindow] {
  // 定义Redis连接
  lazy val jedis = new Jedis("114.116.219.97",5000)
  //29位,也就是64M
  lazy val bloom = new Bloom(1 << 29)
  override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
    //位图的存储方式 , key是windowwen,value是位图
    val storeKey = context.window.getEnd.toString
    var count = 0L
    //把每个窗口的count值,也存入Redis表里,存放内容位(windowEnd,uccount),所以要先从Redis中读取

    if(jedis.hget("count",storeKey) != null){
//      System.out.println(v)
      count = jedis.hget("count",storeKey).toLong
    }
    //用布隆过滤器判断当前用户是否已经存在
    val userId = elements.last._2.toString
    val offset = bloom.hash(userId, 61)
    //定义一个标志位,判断Redis位图中有没有这一位
    val isExist = jedis.getbit