首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >网站日志实时分析之Flink处理实时热门和PVUV统计

网站日志实时分析之Flink处理实时热门和PVUV统计

作者头像
王知无-import_bigdata
发布于 2020-08-12 07:54:51
发布于 2020-08-12 07:54:51
1.6K00
代码可运行
举报
运行总次数:0
代码可运行

实时热门统计

操作步骤:

  • 先从Kafka读取消费数据
  • 使用map算子对数据进行预处理
  • 过滤数据,只留住pv数据
  • 使用timewindow,每隔10秒创建一个20秒的window
  • 然后将窗口自定义预聚合,并且兹定于窗口函数,按指定输入输出case操作数据
  • 上面操作时候返回的是DataStream,那么就根据timestampEnd进行keyby
  • 使用底层API操作,对每个时间窗口内的数据进行排序,取top
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.ongbo.hotAnalysis

import java.sql.Timestamp
import java.util.Properties

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

/*
*定义输入数据的样例类
 */
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)
//定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

object HotItems {
  def main(args: Array[String]): Unit = {
    //1:创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //设置为事件事件
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //2:读取数据

    /*kafka源*/
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008")
    properties.setProperty("group.id","web-consumer-group")
    properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset","latest")
    val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(),properties))
//    val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/HotItemAnalysis/src/main/resources/UserBehavior.csv")
      .map(data =>{
        System.out.println("data:"+data)
        val dataArray = data.split(",")
//        if(dataArray(0).equals("ij"))
        UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)

      })
      .assignAscendingTimestamps(_.timestamp * 1000L)

    //3:transform处理数据
    val processStream = dataStream
      //筛选出埋点pv数据
      .filter(_.behavior.equals("pv"))
      //先对itemID进行分组
      .keyBy(_.itemId)
      //然后设置timeWindow,size为1小时,步长为5分钟的滑动窗口
      .timeWindow(Time.seconds(20), Time.seconds(10))
      //窗口聚合,按道理说应该不用窗口聚合,但是因为达到的数据可能时间顺序会扰乱,所以聚合后要keyby
      .aggregate(new CountAgg(), new WindowResult())
      .keyBy(_.windowEnd)      //按照窗口分组

      .process(new TopNHotItems(10))


    //sink:输出数据
    processStream.print("processStream::")
//    dataStream.print()
    //执行
    env.execute("hot Items Job")



  }
}

/*自定义预聚合函数*/
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{
  //累加器初始值
  override def createAccumulator(): Long = 0
  //每来一次就加一
  override def add(in: UserBehavior, acc: Long): Long = acc+1
  //
  override def getResult(acc: Long): Long = acc

  override def merge(acc: Long, acc1: Long): Long = acc + acc1
}

//自定义窗口函数,输出ItemViewCount
class WindowResult() extends WindowFunction[Long,ItemViewCount, Long, TimeWindow]{
  override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit =  {
    out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))
  }
}

//自定义处理函数
class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {
  private var itemState: ListState[ItemViewCount] = _

  override def open(parameters: Configuration): Unit = {
    itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))

  }
  override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {
    //把每条数据存入状态列表
    itemState.add(value)
    //注册一个定时器
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
  }
  //定时器触发时,对所有的数据排序,并输出结果
  override def onTimer(timestamp: Long, ctx: _root_.org.apache.flink.streaming.api.functions.KeyedProcessFunction[Long, _root_.com.ongbo.hotAnalysis.ItemViewCount, _root_.scala.Predef.String]#OnTimerContext, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
    //将所有state中的数据取出,放到一个list Buffer中
    val allItems: ListBuffer[ItemViewCount] = new ListBuffer()
    import scala.collection.JavaConversions._
    for(item <- itemState.get()){
      allItems += item
    }

    //按照点计量count大小排序,sortBy默认是升序,并且取前三个
    val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topsize)

    //清空状态
    itemState.clear()

    //格式化输出排名结果
    val result : StringBuilder = new StringBuilder
    result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")
    //输出每一个商品信息
    for(i<- sortedItems.indices){
      val currentItem = sortedItems(i)
      result.append("No").append(i+1).append(":")
        .append("  商品ID:").append(currentItem.itemId)
        .append("  浏览量:").append(currentItem.count).append("\n")
    }
    result.append("============================\n")
    //控制输出频率
    Thread.sleep(1000)
    out.collect(result.toString())
  }
}
/*自定义预聚合函数计算平均数*/
class AverageAgg() extends AggregateFunction[UserBehavior, (Long,Int), Double]{
  override def createAccumulator(): (Long, Int) = (0L,0)

  override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1+in.timestamp, acc._2+1)

  override def getResult(acc: (Long, Int)): Double = acc._1 /acc._2

  override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1, acc._2+acc1._2)
}

实时PV统计

这里按道理应该也要从Kafka读取数据的,但是这里暂时先从本地读,因为当时本地网络的原因,暂时不在服务器上创建数据,而直接用本地的。
这个很简单,直接创建滚动窗口,从而能够计算一个小时的PV,然后每隔一个小时更新一次。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.ongbo.NetWorkFlow_Analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/*
*定义输入数据的样例类
 */
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)

object PageVies {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    //用相对路径定义数据集
    val resource = getClass.getResource("/UserBehavior.csv")
    val dataStream = env.readTextFile(resource.getPath)
      .map(data =>{
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)
      .filter(_.behavior.equals("pv"))
      .map(data => ("pv", 1))
      .keyBy(_._1)
      .timeWindow(Time.hours(1))
      .sum(1)
    dataStream.print("pv count")
    env.execute("PV")
  }
}

实时UV统计:布隆过滤器

我们统计UV需要注意,很多重复的user会占用到内存,所以我们采用布隆过滤器优化,减少Flink缓存user从而降低性能。而且将数据count保存在Redis,可以给后端使用的。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.ongbo.NetWorkFlow_Analysis

import com.ongbo.NetWorkFlow_Analysis.UniqueView.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis

object UvWithBloom {
 def main(args: Array[String]): Unit = {
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   env.setParallelism(1)

   //用相对路径定义数据集
   val resource = getClass.getResource("/UserBehavior.csv")
   val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/NetWorkFlowAnalysis/src/main/resources/UserBehavior.csv")
     .map(data =>{
       val dataArray = data.split(",")
       UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)
     })
     .assignAscendingTimestamps(_.timestamp * 1000L)
     .filter(_.behavior.equals("pv"))
     .map( data => ("dummyKey",data.userId))
     .keyBy(_._1)
     .timeWindow(Time.hours(1))
     .trigger(new MyTrigger())
     .process(new UvCountWithBloom())

   dataStream.print()
   env.execute()
 }
}


//自定义窗口触发器
class MyTrigger() extends Trigger[(String,Long),TimeWindow]{
  override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    //每来一条数据就直接触发窗口操作,并清空所有状态
    TriggerResult.FIRE_AND_PURGE
  }

  override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}
class UvCountWithBloom() extends ProcessWindowFunction[(String,Long),UvCount,String, TimeWindow] {
  // 定义Redis连接
  lazy val jedis = new Jedis("114.116.219.97",5000)
  //29位,也就是64M
  lazy val bloom = new Bloom(1 << 29)
  override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
    //位图的存储方式 , key是windowwen,value是位图
    val storeKey = context.window.getEnd.toString
    var count = 0L
    //把每个窗口的count值,也存入Redis表里,存放内容位(windowEnd,uccount),所以要先从Redis中读取


    if(jedis.hget("count",storeKey) != null){
//      System.out.println(v)
      count = jedis.hget("count",storeKey).toLong
    }
    //用布隆过滤器判断当前用户是否已经存在
    val userId = elements.last._2.toString
    val offset = bloom.hash(userId, 61)
    //定义一个标志位,判断Redis位图中有没有这一位
    val isExist = jedis.getbit(storeKey, offset)
    if(!isExist){
      //如果不存在位图对应位置变成1,count+1
      jedis.setbit(storeKey,offset,true)
      jedis.hset("count",storeKey,(count+1).toString)
      out.collect(UvCount(storeKey.toLong,count+1))
    }else{
      out.collect(UvCount(storeKey.toLong,count))
    }
  }
}

class Bloom(size: Long) extends Serializable{
  //位图大小
  private val cap = if(size>0) size else 1 << 27

  //定义Hash函数
  def hash(value: String, seed: Int) : Long = {
    var result:Long = 0L
    for(i <- 0 until value.length){
      result = result * seed + value.charAt(i)
    }
    result & (cap-1)
  }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-08-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
基于 flink 的电商用户行为数据分析【5】| 基于埋点日志数据的网络流量统计
在《基于flink的电商用户行为数据分析【3】| 实时流量统计》这篇文章中,博主为大家介绍了基于服务器 log 的热门页面浏览量统计。 最后通过运行结果的验证,我们发现,从 web 服务器 log 中得到的 url,往往更多的是请求某个资源地址(/*.js、/*.css),如果要针对页面进行统计往往还需要进行过滤。而在实际电商应用中,相比每个单独页面的访问量,我们可能更加关心整个电商网站的网络流量。这个指标,除了合并之前每个页面的统计结果之外,还可以通过统计埋点日志数据中的“pv”行为来得到…
大数据梦想家
2021/01/27
1.2K0
基于 flink 的电商用户行为数据分析【5】| 基于埋点日志数据的网络流量统计
基于flink的电商用户行为数据分析【2】| 实时热门商品统计
在上一期内容中,菌哥已经为大家介绍了电商用户行为数据分析的主要功能和模块介绍。本期内容,我们需要介绍的是实时热门商品统计模块的功能开发。
大数据梦想家
2021/01/27
2.1K0
基于flink的电商用户行为数据分析【2】| 实时热门商品统计
Flink双流处理:实时对账实现
更多内容详见:https://github.com/pierre94/flink-notes
皮皮熊
2020/03/10
4.3K1
Flink双流处理:实时对账实现
基于 flink 的电商用户行为数据分析【8】| 订单支付实时监控
本篇是flink 的「电商用户行为数据分析」的第 8 篇文章,为大家带来的是市场营销商业指标统计分析之订单支付实时监控的内容!通过本期内容,我们可以实现通过使用CEP和Process Function来实现订单支付实时监控的功能,还能学会通过connect 和 join来实现flink双流join的功能,可谓干货满满!受益的朋友记得三连支持一下 ~
大数据梦想家
2021/01/27
3.2K0
基于 flink 的电商用户行为数据分析【8】| 订单支付实时监控
零基础学Flink:实时热销榜Top5(案例)
如前文所预告的一样,今天我们来分析一下,如何通过flink完成实时热销榜单Top5的计算,本文案例,需要使用前文一些内容,如果不了解的同学,请移步《零基础学Flink:Join两个流》。
麒思妙想
2020/07/10
6390
Flink最难知识点再解析 | 时间/窗口/水印/迟到数据处理
时间、窗口、水印、迟到数据这四个知识点几乎是Flink这个框架最难点。我之前发了很多文章来解释。很多同学仍然理解不了。
王知无-import_bigdata
2020/02/24
5.2K3
5分钟Flink - 时间语义和Watermark
在 Flink 的流式处理中,绝大部分的业务都会使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。
Python编程爱好者
2020/09/08
7430
5分钟Flink - 时间语义和Watermark
基于 flink 的电商用户行为数据分析【7】| 页面广告分析
本篇是flink 的「电商用户行为数据分析」的第 7 篇文章,为大家带来的是市场营销商业指标统计分析之页面广告分析的内容。通过本期内容,我们可以实现页面广告点击量统计和黑名单过滤的功能。
大数据梦想家
2021/01/27
8200
基于 flink 的电商用户行为数据分析【7】| 页面广告分析
全网最详细4W字Flink入门笔记(下)
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中,但是当task挂掉,那么这个task所对应的状态都会被清空,造成了数据丢失,无法保证结果的正确性,哪怕想要得到正确结果,所有数据都要重新计算一遍,效率很低。想要保证 At -least-once 和 Exactly-once,需要把数据状态持久化到更安全的存储介质中,Flink提供了堆内内存、堆外内存、HDFS、RocksDB等存储介质。
BookSea
2023/10/16
1.1K0
全网最详细4W字Flink入门笔记(下)
基于flink的电商用户行为数据分析【3】| 实时流量统计
前言 在上一期内容中,菌哥已经为大家介绍了实时热门商品统计模块的功能开发的过程(?基于flink的电商用户行为数据分析【2】| 实时热门商品统计)。本期文章,我们要学习的是实时流量统
大数据梦想家
2021/01/27
2.4K0
基于flink的电商用户行为数据分析【3】| 实时流量统计
彻底搞清 Flink 中的 Window 机制
在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。
大数据老哥
2021/11/04
1.3K0
Flink基于EventTime和WaterMark处理乱序事件和晚到的数据
在实际的业务中,我们经常会遇到数据迟到的情况,这个时候基于窗口进行计算的结果就不对了,Flink中watermark就是为了解决这个问题的,理解watermark之前,先来说一下flink中的三个与流数据相关的概念,ProcessTime、EventTime、IngestionTime,不然很难理解watermark是怎么回事.
王知无-import_bigdata
2019/06/20
4K0
Flink基于EventTime和WaterMark处理乱序事件和晚到的数据
Flink窗口全解析:三种时间窗口、窗口处理函数使用及案例
我们经常需要在一个时间窗口维度上对数据进行聚合,窗口是流处理应用中经常需要解决的问题。Flink的窗口算子为我们提供了方便易用的API,我们可以将数据流切分成一个个窗口,对窗口内的数据进行处理。本文将介绍如何在Flink上进行窗口的计算。
PP鲁
2020/02/17
8.6K0
Flink窗口全解析:三种时间窗口、窗口处理函数使用及案例
全网最详细4W字Flink入门笔记(中)
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中,但是当task挂掉,那么这个task所对应的状态都会被清空,造成了数据丢失,无法保证结果的正确性,哪怕想要得到正确结果,所有数据都要重新计算一遍,效率很低。想要保证 At -least-once 和 Exactly-once,需要把数据状态持久化到更安全的存储介质中,Flink提供了堆内内存、堆外内存、HDFS、RocksDB等存储介质。
BookSea
2023/07/21
6400
附代码|Flink实时计算TopN
在上一章代码中使用了timeWindow,使得我们可以操作Flink流中的一个时间段内的数据,这就引出了Flink中的"窗口"概念:在大多数场景下,数据流都是"无限的",因引我们无法等待数据流终止后才进行一些统计计算,而通常的需求是对一段时间或是一定范围内的数据进行分析。
小晨说数据
2022/03/10
1.4K0
附代码|Flink实时计算TopN
Flink处理函数实战之四:窗口处理
本文是《Flink处理函数实战》系列的第四篇,内容是学习以下两个窗口相关的处理函数:
程序员欣宸
2021/04/19
1.8K0
Flink处理函数实战之四:窗口处理
2021年最新最全Flink系列教程__Flink高级API(四)
并添加Watermark来解决一定程度上的数据延迟和数据乱序(最多延时 3 秒)问题。
Maynor
2021/12/07
3730
2021年最新最全Flink系列教程__Flink高级API(四)
Flink CEP 原理和案例详解
(1)定义 复合事件处理(Complex Event Processing,CEP)是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。 (2)特征 CEP的特征如下: 目标:从有序的简单事件流中发现一些高阶特征; 输入:一个或多个简单事件构成的事件流; 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件; 输出:满足规则的复杂事件。
王知无-import_bigdata
2020/08/20
8.2K0
Flink CEP 原理和案例详解
【Flink】基于 Flink 的流式数据实时去重
在实时计算 PV 信息时,用户短时间内重复点击并不会增加点击次数,基于此需求,我们需要对流式数据进行实时去重。
阿泽 Crz
2020/09/28
9.9K1
基于flink的电商用户行为数据分析【4】| 恶意登录监控
前言 在上一期内容中,菌哥已经为大家介绍了实时热门商品统计模块的功能开发的过程(?基于flink的电商用户行为数据分析【3】| 实时流量统计)。本期文章,我们需要学习的是恶意登录监控模
大数据梦想家
2021/01/27
1.1K0
基于flink的电商用户行为数据分析【4】| 恶意登录监控
推荐阅读
相关推荐
基于 flink 的电商用户行为数据分析【5】| 基于埋点日志数据的网络流量统计
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验