同城基于构建实时数仓实践
导读
随着公司用户增长业务快速发展,陆续孵化出 部落、同镇、C 端会员、游戏等非常多的业务板块。与此同时产品及运营对实时数据需求逐渐增多,帮助他们更快的做出决策,更好的进行产品迭代,实时数仓的建设变得越发重要起来。本文主要介绍用户增长业务基于 Flink 构建实时数仓的实践之路。
实时数仓1.0介绍
如下图是早期的实时计算架构,实时数据需求较少,架构简单,数据链路少,一路到底的开发模式能很快满足业务需求;
但是,随着产品和业务人员对实时数据需求的不断增多,上面的架构带来了如下的问题:
- 需求驱动的烟囱式开发,导致开发成本居高不下;
- 数据模型,数据分层缺失,导致实时计算资源成本高,数据指标一致性不可控;
- 数据使用场景变多,明细数据,多维分析,业务指标计算等,单一的需求驱动式开发模式难以应付多种需求场景;
基于以上的问题,我们开始构建 2.0 版本的实时计算架构。
实时数仓2.0介绍
一、模型
我们参考离线数仓,将实时数仓按照分层设计的思想分为三层,如上图所示,分别为:
ODS 层: 存储 APP 各业务线埋点数据以及业务端各种日志;
DW 之明细层: 分为公共层和业务层,APP 客户端收集的埋点数据通过关联维表,统一将产品,渠道,版本,类别,地域等客户端公共维度属性进行 ETL,并统一进行去重,过滤,分流等动作,这样就生成了公共行为明细表和业务行为明细表;然后业务明细表关联各自业务维表形成了业务主题明细表,比如同镇业务,部落业务等;
DW 之汇总层: 读取业务主题明细表计算出各业务主题关心的通用维度和指标,并存储到 自研的wtable中;(wtable 是 公司自研的分布式 kv、klist 存储系统 )
ADS 层: 这一层主要提供 ad-hoc 查询和实时大盘服务,其中 ad-hoc 查询是指通过 Flink 将主题明细宽表实时导入到 ClickHouse 中,为分析师和产品提供 ad-hoc 查询;而实时大盘主要是读取 wtable 中存储的DWS多维汇总指标来提供;
二、技术架构
三:实现方案
接下来我们从五个部分来讲具体的实现方案,分别为DWD构建、维表Join、Flink Sink ClickHouse、异步 Load Batch HDFS、任务与数据质量监控;
1:DWD构建
主要包括六个功能,分别为格式化、反作弊、去重、Imei-id 服务、Kafka sink exactly-once、分流。
- 格式化
主要是将原始日志按照 dwd 明细表的模型进行解析,并校验各字段的准确和完整性;
- 反作弊
主要是把线上测试渠道日志以及实时反作弊策略命中的 IMEI 进行过滤;
- 去重
主要是将客户端产生日志、服务端接收日志、flume 同步日志、kafka 流转日志这四个阶段产生的重复日志进行过滤,减少对业务分析和决策产生的影响;由于 APP 每日产生的用户行为日志高达数百亿条,所以如何做到高效精确去重?下面介绍两种技术方案;
\\ 方案一:redis cluster bitset 方案**
实现方式是把每条日志 hash 多次,通过管道方式向 redis 集群请求 setbit 命令,通过布隆过滤的方式,判断日志是否重复。流程图如下所示:
此方案缺点是:重度依赖第三方 redis,需要解决 qps 高,redis 内存倾斜,redis 资源占用大,以及整体架构的稳定性和扩展性等问题。
方案二:flink state 状态去重方案
实现方式是使用 RocksDBStateBackend,开启增量检查点,存储大状态。根据日志数据量和日志 hash 冲突测试,设计日志去重的 key。
time 可以根据自己的去重方案设计,比如 ‘20200601’ 或者 ‘2020060112’ 等动态时间戳。使用 ValueState ,可以判断 distinctKey 是否存在。不存在代表不是重复日志,存在代表是重复日志。并且设置状态的过期时间为 24 小时。
流程图如下所示:
此方案基于 flink 强大的 state 功能,架构简单可靠,程序处理速度与稳定性良好。
最终我们采用方案二来实现日志去重的功能;
- **Imei-id 服务 **
主要实现如下两个功能:
- 一是根据 IMEI 实时判断新老用户;
- 二是把 IMEI 映射成一个正整数,支持使用 BitMap 计算用户留存率;
下面分四步详细介绍实现方案;
1)生成唯一 ID 方案对比:
2) 技术方案如下图所示:根据上图的方案比对,最终选择了 wtable 方案。wtable可以根据 KV 存储结构实时为每个 Imei 生成唯一数字;
3) Operator 处理流程:
如上图所示,利用 flink 状态,避免每条日志都去请求 wtable,提高处理速度。
4) 线程安全问题:
Imei 生成唯一数字 ID 服务,在一个 flink 任务中是线程安全的,可以保证 imei 对应唯一 ID;若有多个 flink 任务同时实现此服务则是不安全的。解决方案有如下两个:
方案一: 运用分布式锁,保证多进程之间数据安全。由于日志数据量比较大,需要频繁加锁和释放锁,导致 Flink 程序处理逻辑比较复杂,程序处理性能低等问题;
方案二: 根据 flink 的多流合并 api,把多个 kafka 数据源 Data stream 合并一个后,通过 key by imei 的方式来处理日志并生成 ID,保证了线程安全,并且架构和代码实现逻辑简单;目前我们采用的是此方案。
- Kafka sink exactly-once
为了保证每条数据仅仅被处理一次,通过 Flink 两阶段提交语义来实现端到端一致性;
1)预提交阶段
2)提交阶段
3)代码实现
1:开启 check point,设置 CheckpointingMode.EXACTLY_ONCE;
2:输出 kafka 版本需要 0.11 及以上;
3:enableCheckpointing(1000 60 5),checkpoint时间间隔为5分钟
4)手动恢复任务
1:如果程序需要重启或代码变更等情形下,从系统指定的savepoint目录进行状态恢复;
2:操作示例
- 分流
该功能主要目的是将APP公共行为日志按业务进行拆分,降低业务读kafka流量,节省Flink计算资源。具体实现过程为将公共行为明细日志关联埋点报备维表,将关联上的业务行为日志按照事先申请的topic进行分发。
2:维表 join
在实时数仓的建设过程中底层实时宽表的构建可谓是重中之重,如何保证实时宽表低延迟、高完整的产出是保证数据质量的关键,为此我们整理了 DataStream 层面常用的三种维表 join 方式供大家参考;
1)在 RichFlatMapFunction 的 open 方法中读取外部数据源,定时触发更新,例如加载 mysql 维表数据,在 open 方法中先读取指定维表信息,再将数据加载至内存,每次关联是取内存中匹配对应的维表数据;
优点:
- 实现方案简单直接
缺点:
- 维表更新时机不易把控可能存在 join 延迟
- 需要评估维表数据量级以避免内存溢出
2)将维表数据加载到热存储中,如 redis、wtabe、hbase 等。通过第三方热存储来缓存维表数据,在数据关联时直接用异步 IO 的方式从热存储获取数据并关联;
优点:
- 维表量级不受限与内存可加载大量数据
- 维表更新延迟较低(受限于维表数据写入热存储)
缺点:
- 需要额外热存储资源且占用一定网络开销
3)将维表分为全量维表和增量维表,程序启动时加载全量维表到内存,采用 BroadcastState 将增量维表数据转换为流式数据,实时 Join 主体流与维表流;
优点:
- 流式 Join 不依赖热存储实时更新维表数据
缺点:
- 需要评估维表量级避免内存溢出
3:Flink Sink ClickHouse
如下图是Flink 接入ClickHouse的流程图 ,我们基于jdbc实现了高可靠的ClickHouse Sink,依据DataTime或DataSize实现了流数据低延迟导入CK;
4:异步 Load Batch HDFS
1:流程图
2:实现方案
**(1)**采用10分钟一个分区,按照 dt,hour,minute 层级目录划分;
**(2)** hdfs sink 的方案
- 如果写入的 hdfs 版本小于 2.7,则选择 BucketingSink;
- 如果写入的 hdfs 版本大于等于 2.7,则选择 StreamingFileSink 或者 BucketingSink;
- 当前平台的 hadoop 版本为 2.6,故采用了 BucketingSink 方案;
(3)日志实时写入 hdfs 文件的方案
- 目前是每分钟数据落地到 hdfs 上;
- 根据日志量与 hadoop 设置的块大小,目前采用的是 256M 落地到 hdfs 上;
- 代码示例:
sink.setBatchRolloverInterval(1 * 60 * 1000L);
sink.setBatchSize(1024 * 1024 * 256L)
(4)日志落地 hdfs 时间分区的方案选择
- 根据日志 EventTime 落地,数据写入 EventTime 对应的时间分区目录下,如图:
优点:数据分区准确,延迟的数据也会归位到真实的时间分区下。
缺点:若有延迟数据,每次查询历史分区,数据计算结果会发生变化。
- 根据日志 ProcessingTime 落地,数据写入当前系统时间分区目录下,如图:
\\ 优点:\\ 没有日志回溯问题,每次查询历史分区,数据计算结果不变。
缺点: 如果发生数据延迟,数据只会追加到当前处理时间的分区目录下,不能真实反映数据分布情况。
- 最终方案
经过线上任务实际测试,在正常情况下数据延迟率比较低,通常在秒级别,所以采用两种时间结合的方案。即在 每天的 0 点 0 分-0 点 5 分时间段,采用 EventTime 时间处理,尽可能保证延迟数据不出现跨天的问题。其他时间段采用 ProcessingTime 时间处理,保证数据多次计算结果一致。
(5) Hdfs sink exactly-once
- 开启 check point,设置 CheckpointingMode.EXACTLY_ONCE;
- hdfs sink 输出的格式需要指定为 text 格式;
- 若需要重启 flink 任务,需要指定上一次 savepoint 的 hdfs 文件路径;
- 由于 hadoop 2.6 版本不支持 truncate,flink 任务重启后需要人工干预处理 hdfs 的文件,才能保证 hdfs sink 端到端一致性。
- 编写脚本,处理 flink 任务重启后生成的 hdfs 文件;
(6)实时 hive 表与分区
- 创建外部表,设置 location 为日志写入 hdfs 的路径;
- 通过定时任务动态建立 dt、hour、minute 的分区;
\\ 5:任务与数据质量监控 **
1:实时任务监控, 在大数据平台自建的 Flink 开发平台 Wstream 系统中,针对任务提供了 QPS、GC、CPU、Lag 延迟、内存使用等多维度的监控指标配置,一旦触发规则
- 原文作者:知识铺
- 原文链接:https://geek.zshipu.com/post/%E4%BA%92%E8%81%94%E7%BD%91/%E5%90%8C%E5%9F%8E%E5%9F%BA%E4%BA%8E%E6%9E%84%E5%BB%BA%E5%AE%9E%E6%97%B6%E6%95%B0%E4%BB%93%E5%AE%9E%E8%B7%B5/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。
- 免责声明:本页面内容均来源于站内编辑发布,部分信息来源互联网,并不意味着本站赞同其观点或者证实其内容的真实性,如涉及版权等问题,请立即联系客服进行更改或删除,保证您的合法权益。转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。也可以邮件至 sblig@126.com