作者: 邸星星

摘要:本文整理自汽车之家实时计算平台负责人邸星星在 Flink Forward Asia 2021 平台建设专场的演讲。主要内容包括:

一、应用场景

我们的应用场景与其他公司很类似,涵盖了实时指标统计、监控预警、实时数据处理、实时用户行为、实时入湖、实时数据传输这几个方面:

  • 实施指标统计包括实时的流量分析、车展大屏、818 实时大屏等,可以直接支持实时地查看重大活动的效果,方便及时调整运营策略;
  • 监控预警包括各个应用的后端日志分析报警、应用的性能监控预警、C 端用户的视频播放质量监控预警,这也是实时计算很典型的应用场景。通过定义合理的告警策略,可以在第一时间感知到核心系统的问题,并结合实时的数据分析快速定位问题;
  • 实时数据处理主要支持实时数仓建设、内容中台、电商中台等业务;
  • 实时用户行为主要根据用户在 APP 上的各种行为来记录用户的画像及特征,这部分应用的效果最终会直接提升用户体验。典型的场景就是智能推荐,我们会结合用户最近感兴趣的内容,来为用户实时推荐文章、小视频等优质资源,提升用户的使用体验;
  • 实时入湖是我们今年在平台建设方面重点发力的方向。我们落地的湖仓一体架构相比完全基于 Hive 的架构,在很多方面都有提升。目前已经在多个主题落地,提升效果也比较显著;
  • 最后是基础的实时数据传输场景,用户可以将业务库或 Kafka 中的数据便捷地分发到多种存储引擎中应对不同的业务需求,比如分发到 ES 中支持快速检索。

我们最早是使用 Storm平台,基于 Spout、Bolt 开发模型,实现了基础的实时计算开发,这个阶段是完全基于 Java 编码方式实现开发,开发门槛及学习成本都比较高。

第二阶段,我们在 18、19 年引入 Flink,并建设了 AutoStream 1.0 平台。这个阶段我们主要的目标是提效、降低开发门槛和学习成本。将之前纯 Java 开发方式转变为基于 SQL + UDF 的开发方式。由于 Flink 早期还不支持 DDL,所以我们很大一部分工作就是建设自己的 meta server,并通过 DDL 定义 source sink 组件,同时完善业务库数据的实时接入,并将公司内部常用的存储引擎集成到平台上,完成整个实时开发链路的打通。

第三阶段,我们将 Flink 升级到 1.9 版本,并将平台升级为 AutoStream 2.0 版本,支持原生 DDL,同时支持自助上传 UDF,简化了 UDF 使用流程。同时随着任务数及平台用户的增加,我们的日常 on call 时间也随之增加,所以我们上线了任务的健康评分机制,从多个方面分析任务的健康度,帮助用户了解任务可以优化的点并附带解决方案。我们还上线了在线诊断功能,支持动态修改日志级别,查看线程栈和火焰图,提升用户定位问题的效率,同时也降低了我们平台方的日常 on call 成本。

AutoStream 3.0 代表了我们今年主要的工作,首先是将 Flink 升级到 1.12 版本,这次升级带给我们的最直接的收益就是支持湖仓一体、 Native on K8S、PyFlink。同时本着降本提效的思路,新增了智能伸缩容的功能,一方面可以提升实时计算资源的利用率,另一方面也进一步降低用户优化任务资源的难度。

上图是 AutoStream 2.0 的架构,它包含了很多内容,涵盖了平台整体的功能与定位。但不可避免地存在诸多痛点。

由于实时计算离线的存储资源是混用的,离线 Hadoop 集群单独为实时计算拆出了一部分服务器并单独部署了一套 Yarn 供实时计算使用,这部分服务器的磁盘用来支持离线数据的存储,CPU 内存主要用来支持运行 Flink 任务,所以 Flink 计算资源并没有独占服务器,我们也没有对计算资源作严格的管控,所以导致有很多任务分配的资源是不合理的,通常是申请了过多的 CPU 资源但实际的利用率却比较低。

随着公司容器化建设的逐步推进,今年我们已经支持了离线和在线混部并错峰分配资源的方式。也就意味着 Hadoop 集群的 CPU 内存除了支持 Flink 实时计算,也可以支持在线业务的部署,对 Flink 计算资源管控的重要性及紧迫程度就凸显出来了。

接下来最重要的就是推动用户做资源的调优,这部分工作对用户来说存在一定难度。首先,要理解 CPU 内存和并行度的调整对任务的影响就是有成本的,而且通常修改任务资源、重启任务就需要几分钟,用户还需要持续观察是否对业务产生了影响,比如出现延迟或内存溢出等。简单来说,用户的调优成本是比较高的。

现有的基于 Hive 的数仓架构需要升级,t+1 或 h+1 的时效性已经无法满足很多业务场景的需求,我们最终选定 Flink 和 Iceberg 来构建实时湖仓一体的架构。

最后是实时计算支持的生态不够完善。我们的人工智能团队主要以 Python 语言为主,基于 SQL + UDF 的方式显然对他们不够友好,所以我们做了 PyFlink 的集成工作,解决了这一痛点。

上图是 AutoStream 3.0 的整体介绍。

基于 2.0 版本的痛点,除了功能和应用性升级之外,我们主要还做了以下几个方面的工作:

首先是加强了预算管控,上线了自动伸缩容功能,建设并落地了实时湖仓的架构,并上线了 PyFlink,支持 Python 开发实时任务。同时我们还基于 Flink + StarRocks 引擎,对实时多维分析的链路做了进一步简化。

二、预算资源管控

为什么需要做预算资源管控策略?

首先是服务器资源没有按团队做预算划分,先用先得,没有上限,任务资源利用率低,个别团队存在严重的资源浪费情况。同时没有外力的推动,大部分用户主动优化资源的意识很薄弱。

我们做的第一步就是启用预算的强控机制。与内部的资产云系统做对接初始化团队的可用预算,超出预算后任务将无法启动。还对此定义了规范,用户需要先优化团队内的低利用率任务来释放预算,原则上资源利用率低的任务数应该控制在 10% 以内,如果无法优化,可以在资产云系统上发起团队间预算调拨的流程,也就是借资源;如果还是失败,则会由平台开白名单临时支持业务。

平台规范里,我们对资源利用率低的任务也进行了定义,同时展示出低利用率的原因及解决方案。

目前我们主要是针对 CPU 使用率、内存使用率和空闲 slot 这几个核心规则来识别低利用率任务。早在 AutoStream 2.0 版本,我们就上线了 Flink 任务的健康评分机制,得到了丰富的细粒度得分数据,所以可以很容易地识别低利用率任务。

我们通过引入强控流程来严控计算资源的用量,通过制定规范来提升用户主动优化资源的意识,通过开发自动伸缩容功能降低用户的调优成本。由此达到的收益是,在实时计算业务稳步增长的前提下我们全年没有新增服务器。

为什么需要自动伸缩容功能?

  • 首先是降本、提升资源利用率;
  • 其次是降低资源调优的成本;
  • 最后是降低资源调优过程本身对任务稳定性的影响。

上图是自动伸缩容配置的页面,可以指定自动伸缩容的触发时间,比如可以指定在夜里低峰时期执行,降低伸缩容对业务的影响,支持指定 CPU 并行度、内存维度伸缩容的策略,每次执行伸缩容都会通过钉钉和邮件通知任务负责人,并且会记录伸缩容的触发原因和伸缩容之后的最新资源配置。

上图是自动伸缩容功能的整体设计。我们在 jobmanager 中增加了一个新的组件 RescaleCoordinator,它使用 ha 维护其生命周期,且与 dispatcher 之间彼此通信。RescaleCoordinator 会定期访问 AutoStream 提供的接口,AutoStream 平台会根据用户配置的伸缩容策略判断是否需要执行伸缩容。

整体的流程如下:RescaleCoordinator 获取到 leader 后会定期检查是否需要伸缩容,如果需要则向 dispatcher 通知 jobmanager 开始伸缩容。jobmaster 会向 resourcemanager 请求 taskmanager,待所有请求的 taskmanager 都准备就绪,就会将旧的 taskmanager 释放掉,然后基于新的 taskmanager 重新调度,最终把这次结果持久化到 zk 和 HDFS 上。

平台的 Flink 本就使用了 zk 和 HDFS 做 ha,所以我们不需要引入新的组件。此外,因为新的 container 是提前申请好的,又能省去 container 申请的时间,避免了因为资源不够而申请不到 slot 导致任务 recover 失败。如果是做并行度的伸缩容,需要在发起调度前修改 jobgraph 的并行度来实现。

以 CPU 内存为例,第一步是向 ResourceManager 申请 container 并为之打标记。新的 taskmanager container 通过 slot pool 向 resultmanager 请求,这一步需要在 slot pool 中维护新的资源配置,对应上图中的 CPU 2 核,内存 2GB,且需要支持回滚机制。如果这次伸缩容失败,资源设置回滚到 CPU 1 核,内存 1G。

第二步,停掉任务,删除 ExecutionGraph。

第三步,释放旧 taskmanager,重新构建 ExecutionGraph,并在标记的 taskmanager 上从保存点恢复任务。

第四步,将此次伸缩容的资源设置持久化到 zk 和 HDFS,如果 jobmanager 在这里挂掉,那么之前伸缩容的配置都会丢失,所以需要将伸缩容后的配置保存在 zk 和 HDFS 上,数据存在基于 HDFS 的 block server 中,在 zk 中会保存 block server 的 key。

最后,对伸缩容策略进行一个粗略的总结:

  • 首先是基于并行度的伸缩容:

    • 如果存在消费 Kafka 延迟且 CPU 使用率较低,很可能是 io 密集型任务,可以增加并行度;
    • 如果是存在空闲 slot 则执行缩容,避免资源浪费;
  • CPU 维度的伸缩容主要根据 CPU 使用率来判定,会根据 taskmanager 进程分配的 CPU 核数和 taskmanager 进程实际的 CPU 使用率,来计算出 CPU 使用率作为 CPU 扩缩容的关键指标;

  • 内存维度主要根据内存使用率和 GC 情况来判定是否需要扩容和缩容。

四、湖仓一体

基于 Hive 的数据仓库主要存在以下几个痛点:

  • 首先是时效性,目前基于 Hive 的数仓绝大部分是 t+1,数据产生后至少要一个小时才能在数仓中查询到。随着公司整体技术能力的提升,很多场景对数据的时效性要求越来越高,比如需要准实时的样本数据来支持模型训练,需要准实时的多维分析来帮助排查点击率下降的根因;
  • 其次是 Hive 2.0 无法支持 upsert 需求,业务库数据入仓只能 t+1 全量同步,数据修正成本很高,同时不支持 upsert 意味着存储层面无法实现批流一体;
  • 最后 Hive 的 Schema 属于写入型,一旦数据写入之后 Schema 就难以变更。

经过一番选型,我们决定选择基于 Iceberg 来构建湖仓一体架构,解决基于 Hive 的数据仓库的痛点。

Iceberg 的定位是开放的表格,不绑定某一种存储或计算引擎,同时它能提供增量快照机制,可以轻松实现准实时的数据写入和读取。Iceberg 的 v2 格式支持 acid 语义,可以满足 upsert 需求,为后续做存储层面的批流一体提供了可能性。读取型的 schema 对 schema 的变更也十分友好。目前主要的查询引擎都和 Iceberg 做了集成,读写路径上也都支持了流和批的方式,从流批一体的角度来看,也是十分友好的。

上图右侧是 Iceberg 增量快照机制的基本原理。每次针对表的 commit 操作都会产生一个新的快照,比如针对表的第一次数据写入的 commit 会生成 snapshot0 快照 (上图中的 s0),第二次写入的 commit 会生成 s1。每个快照对应一个 manifest list 对象,会指向多个 manifest file,每个 manifest file 又会指向多个 data file,也就是存储数据的文件。图中的 current metadata pointer 会为每一个 Iceberg 表指向一个最新的 metadata file,即最新生成的快照。

上图是 Iceberg 目前在我们内部的集成情况,最底层是基于 Hive Metastore 来统一 Hive 表和 Iceberg 表的元数据,基于 HDFS 来统一 Hive 表和 Iceberg 表的存储,这也是湖仓一体的基础。往上一层是表格式,即 Iceberg 对自身的定位:介于存储引擎和计算引擎之间的开放的表格式。再往上是计算引擎,目前 Flink 主要负责数据的实时入湖工作, Spark 和 Hive 作为主要的产品引擎。最上面是计算平台,Autostream 支持点击流和日志类的数据实时入湖,AutoDTS 支持关系型数据库中的数据实时入湖,离线平台与 Iceberg 做了集成,支持像使用 Hive 表一样来使用 Iceberg,在提升数据时效性的同时,尽量避免增加额外的使用成本。

下面是我们在湖仓一体架构落地过程中的一些典型的实践。

实时数据入湖方面

在 Iceberg 场景中,需要确保主键相同的数据写入到同一个 bucket 的下。由于 Flink 表的 DDL 并不支持 Iceberg 的 bucket 的定义,所以我们做的第一件事就是支持在 Flink DDL 的 property 中定义 bucket。

第二个问题是 Iceberg 表本身无法直接反映数据的写入进度,离线调度难以精确地触发下游任务,所以我们借助 Flink 良好的 watermark 机制,直接在入湖的阶段将 watermark 持久化到 Iceberg 表的元数据中,这样可以通过简单的脚本调用,就能知道 Iceberg 表的数据写入进度,从而精确地触发下游的调度任务。

第三个问题是实时入湖阶段和离线团队账号体系的打通。Flink 向 Iceberg 写入数据的时候,需要访问 HDFS 和 Hivemetastore,所以必然要和离线既有的团队账号体系打通。一个离线的 HDFS 目录只能给一个用户分配写入权限,所以在引入 Iceberg 之前,所有的 Flink 任务都是通过一个固定的 Hadoop 账号运行的,这样的好处就是方便我们做统一的资源管理,包括 checkpoint 目录的统一管理。我们通过修改 Iceberg 创建 Hadoop Filesystem 实例的代码,增加了账号代理的机制,实现使用自定义账号向 Iceberg 写入数据,同时扩展 HiveMetaStoreClient 增加代理机制来打通对 HiveMetaStore 的访问。

可用性和稳定性方面的实践

为了湖仓一体元数据的统一,我们坚持和离线数仓复用同一套 HiveMetaStore 服务,期间也遇到了很多稳定性和数据正确性的问题。

首先是访问 HiveMetaStore 异常,这是因为我们的 Hadoop 集群启用了 kerberos 机制,并且 Hive config 的过渡方法被误用,导致 Hive 客户端 kerberos 相关配置被覆盖,造成访问 HiveMetaStore 异常。我们做了相应做的修复,也已反馈给社区。

其次我们引入了基于 zk 的分布式锁用来替换默认的 HiveMetaStore 的锁。Iceberg 默认基于 HiveMetaStore 分布式锁来控制单表的并发 commit,但是存在一种情况,当 Flink 进程意外退出时,代码无法触达 unlock 逻辑,导致针对表级别的锁一直被占用,无法释放。Flink 任务被自动拉起后无法再次获取到锁,导致后续无法正常写入数据。随着入湖任务量的增加,这个问题每周都会至少出现一次,每次都需要人工介入去、手动访问 HiveMetaStore 释放锁,才能让 Flink 任务恢复正常。如果处理不及时,可能导致数据入湖延迟几个小时。

针对这一问题,在高版本的 Hive 中其实已经有了解决方案,就是针对分布式锁设置超时时间,超时之后会自动将锁释放。但我们是基于 Hive 2.0.1 版本,整体升级和拉取 patch 的成本都比较高,所以我们针对 Iceberg 做了改造,使用基于 zk 的分布式锁替换之前的锁机制,上线后这一问题也得到了根本的解决。

还有 Iceberg 表元数据文件被误删的问题,这个问题会导致出现找不到数据文件的异常,直接影响 Iceberg 表无法被访问。解决方法就是在修改 metadata_location 属性的时候,增加容错机制,优先尝试重试并检查是否修改成功,仅在确认元数据未保存成功的情况下,才会对 metadata.json 文件做删除操作。

对 v2 格式小文件合并方面的一些改进

Iceberg 实现 upsert 语义的原理是,它的 v2 格式通过引入 sequence number 并结合 position delete file 和 equality delete file 来实现的,写入思路是首先将 delete 行写在 equality delete file 中,如果 delete 行在当前的事务中 insert 过,就把 insert 行所在的文件行号和地址给 position delete file。

举个例子,假设有两个事务:

  • 第一个事务 insert 两条数据对应 ID 为 1 和 2 的数据,事务 commit 后会生成一个 datafile,此时的 sequence number 是 1。
  • 第二个事务先 insert 一条 ID 为 3 的数据,紧接着把它对应的 value 从 300 修改为 301,最后删除 ID 为 2 的数据。

这三条 SQL 产生的行为如下:首先生成了一个 data file2,里面包括了新写入的 ID 为 3 的数据即 I (3,300),还包含了 ID 为 3 的数据 update 之后的数据即 I (300,301),并把 I (3,300) 写入到 equality delete file 中。由于 ID 为 3 的数据在当前的事务中写入过,所以还会生成一个 position delete file,然后把 (3,300) 这条数据对应的 position 标记为删除。而因为 ID 为 2 的数据不是在当前事务中写入的,所以把 ID 为 2 的数据即 I (2,200) 追加到 equality delete file,并标记为删除即可。

upsert 语义在读路径上的实现思路如下:首先 position delete file 与不大于自己 sequence number 的 datafile 做 join,equality delete file 则与小于自己 sequence number 的 datafile 做 join。因为上图中,position delete file 会把 datafile2 中的第零条数据即 I (3,300) 删除, equality delete file 会把 datafile1 中 I (2,200) 删除掉,最终查询结果只有 ID 为 1 和 3 的两条数据。

由于 delete file 越来越多,查询性能也会随之降低。为了保证查询性能,我们每个小时都会对 Iceberg 表进行小文件合并。

但是在引入 sequence number 之前,针对 v1 格式的小文件合并无法保证 v2 格式数据在合并后的正确性。所以在实践的过程中,我们针对 v2 格式的小文件合并做了一些改造。

针对 v1 格式的小文件合并思路和读取思路完全一致,即将两种 delete file apply 到合适的 datafile 上,合并后删除对旧文件的引用,改为引用新生成的 datafile。根据之前 sequence number 的定义,它会一直递增,所以合并后的 datafile 对应的 sequence number 也会变大,会导致 v2 格式的数据在合并小文件场景下的冲突。

首先小文件合并过程中,修改 sequence number 会导致 Flink 实时写入的事务冲突,导致上图中事务 3 的 delete 语句失效。事务 3 将主键 3 的数据 delete,但事务 4 的小文件合并成功后会把这条数据又加回来,因为这条数据的 sequence number 变成 4,此时 delete file 的 sequence number 是 3,不会再把 sequence number 为 4 的数据删掉。

针对上述冲突,我们对 Iceberg 的小文件合并做了改造,改造的思路是合并小文件本质上并不会对最终的数据做修改,仅仅是优化文件的存储。所以在合并过程中,复用被合并的文件中最大的 sequence number 即可。

按照新的思路,事务 4 之后的 datafile 对应的新 sequence number 为 2,也就是复用了被合并的文件中的最大的 sequence number。sequence numbe2 依旧小于事务 3 的 sequence number3,所以可以保证 delete file 的语义正确性。

湖仓一体架构的落地为我们带来了不少的收益:

  • 首先,流量、内容、线索主题的数据时效性得到了大幅提升,从之前的天级/小时级提升到 10 分钟以内,数仓核心任务的 SLA 提前两个小时完成;
  • 其次,特征工程得以提效,在不改变原先架构的情况下,模型训练的实效性从天级/小时级提升到 10 分钟级;
  • 最后,业务库数据可以在我们的 AutoStream 平台上通过配置实现准实时入湖,相应的也可以通过 Spark SQL 做准实时的分析查询。同时我们也在小范围地测试将聚合任务的结果准实时入湖,通过 Flink+Iceberg 打造基于 Iceberg 的准实时的物化视图,可以大幅度提升分析的效率和体验。

引入 PyFlink,主要因为我们想把 Flink 强大的实时计算能力输出给人工智能团队。人工智能团队由于技术本身的特点,大部分开发人员都是基于 Python 语言开发,而 Python 本身的分布式和多线程支持比较弱,他们需要一个能快速上手又具备分布式计算能力的框架,来简化他们日常的程序开发和维护。我们正好也需要补全平台上对 Python 生态的支持不足,所以很自然地想到了把 PyFlink 集成到我们的平台上。

上图是 PyFlink 的基本架构。python vm 和 jvm 双向通讯的架构,实现了 Python API 和 Java API 的一一映射,用户可以通过编写 Python 代码来实现 Flink 任务的开发。

通过对比,我们最终选择了用 Kubernetes 方式来部署 PyFlink 环境。Kubernetes 除了可以较好地支持资源隔离,也可以方便地集成 Python 环境和其他机器学习的依赖。

![](https://img.6aiq.com/e/