来源: 何巨放 贝壳找房 稿

1. 背景

Flink作为新一代的实时计算引擎在贝壳越来越多的应用场景下被应用,比如实时指标、实时ETL、实时监控等。目前贝壳内部线上运行中Flink任务已经超过4K,日均处理数据量万亿级。如何提升大规模Flink任务的稳定性、资源利用率、运维效率是一个有挑战的问题。本篇文章将会讲述贝壳实时计算团队基于Flink指标在任务监控报警、资源分析与调优、运维提效以及容器化等相关的实践。

2. 指标采集与存储

2.1 指标采集

Flink框架本身提供了丰富的监控指标,任务运行指标反映了实际运行的情况,指标的采集与存储是后续监控、报警、资源调优、智能分析的基础。早期是通过官方提供的influxdb metrics reporter将指标采集到influxdb,随着任务数的增长,指标的上报、存储、查询遇到瓶颈,原因是开源的influxdb只支持单点部署,大规模的聚合查询性能存在瓶颈。后续平台实现了指标采集与存储方案的升级,整体架构如下图所示。

公司内部存在多个底层依赖Flink的平台,如日志平台、实时计算平台、实时数仓等等。采用统一的指标采集存储方案。将指标采集过滤规则统一存储在redis,过滤规则修改不需再重启任务,通过自定义的Kafka reporter,过滤后将指标信息统一发送到kafka。

2.2 指标存储

指标采集到Kafka后,将全量指标实时写入influxdb,influxdb的写入和查询比较灵活,读写性能优秀,对于各种指标的格式没有强制限制,且底层存储为明细数据,因此适合用来提供大而全的整体监控大盘。同时将核心指标写入druid,如CPU load、内存指标、gc指标、稳定性相关指标、checkpoint等,druid预聚合能力可以降低存储数据量并且基于hdfs存储历史数据,便于存储长期的指标数据用来做资源的资源分析和智能诊断,另外druid的实时数据可用来作为报警检测。

由于单机的Influxdb存在性能瓶颈,集群模式为收费产品。因此我们对Influxdb存储进行了扩展,搭建Influxdb分布式集群来应对未来更大的数据存储压力,整体架构如下图所示:

我们通过OpenResty做集群节点的代理,通过Lua脚本来做读写的分发。通过统一的域名提供服务,根据taskid将对应指标的读写转发到不同的InfluxDB节点,并且集群支持灵活的扩容,提升整体的写入和读取性能,解决单台机器的瓶颈问题。InfluxDB集群目前包括4个节点,可以满足3k+任务的7天的详细指标存储,并且写入和查询都在毫秒级。

数据是分析的基石,有了前期的数据准备,后续的分析挖掘才更加有力。

3. 监控与报警

3.1 可视化监控

基于influxdb的全量指标,我们为所有Flink任务提供了大而全的监控大盘,包括数十个任务指标看板,方便用户了解任务运行现状的全貌。同时,为了让用户更直观的看到自己Flink任务的健康状况,降低监控的使用门槛,我们进行了任务健康分的建设,如下图所示:

一个Flink任务运行过程中的健康状况,可以通过以下几个维度来评价:

整体稳定性 。任务整体运行的情况,根据任务正常运行时长占比和任务自动重启次数来衡量。若运行时长占比过低或者频繁重启说明任务可能存在问题。

Checkpoint情况 。Checkpoint是Flink实现容错机制最核心的功能,我们根据近期Checkpoint成功率和Checkpoint平均耗时来衡量。若Checkpoint耗时过长可能有失败的风险。

算子处理效率 。算子的处理效率在实时计算中非常重要,我们通过近期背压时间占比和input buffer内存使用比例来衡量。反压持续过高,说明有算子存在瓶颈,存在数据延迟风险。

资源利用率 。资源利用率包括内存和CPU的使用情况,资源的不足和浪费都是不健康的,我们通过JVM堆内存的使用率和TaskManager的CPU使用率来衡量。CPU利用率反应了CPU资源的利用情况,过高说明资源不足或程序计算有瓶颈,例如效率过低的正则会导致CPU过高;过低说明CPU资源没有充分利用,存在资源浪费的情况。

GC情况 。使用TaskManager Full GC的次数来衡量。若Full GC次数过高说明存在OOM等风险。

监控看板的用户不仅包括熟悉Flink原理的大数据工程师,这些用户可以通过查看详细的监控大盘进行任务调优诊断,也包括可能对Flink原理不太熟悉的Flink SQL用户和通过可视化模版配置任务的各种研发人员,因此简单易懂且直观的健康分大盘也十分重要。

3.2 报警检测

指标报警旨在提供统一的指标查询、分布式报警检测、报警回调、规则管理、任务信息注册等服务。整体架构图如下图所示:

Flink指标根据指标重要程度分为核心指标与非核心指标。核心指标存储为druid,全量指标存入InfluxDB,因此指标报警根据指标类型进行区分,查询不同的数据源。

Flink任务指标报警根据报警等级分为:

基础报警 。任务运行后默认添加基础报警配置,比如Checkpoint成功率,任务正常运行时长占比,GC次数及反压时长占比等。

自定义报警 。支持用户针对Flink原生指标和自定义指标进行个性化的报警配置。

平台级报警 。支持平台级角度的报警配置,比如近10分钟任务重启次数不达标的任务数,近30分钟Checkpoint成功率不达标的任务数等。

其中分布式调度能力依赖了开源框架PowerJob,其拥有完善的定时策略,丰富的执行模式以及强大的故障转移和恢复能力,其调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,引入时间轮算法,实现了无锁化调度。并且支持多台部署实现水平扩展和高可用,由于篇幅原因,不再展开介绍。

3.3 日志分析

我们对Flink任务的错误日志进行统一收集、存储和分析,提高任务的运维效率。通过Log4j的Kafka appender将错误日志以Json的形式写入Kafka中,然后进行实时的ETL。主要分成两条流处理,其中一条流将错误日志的明细信息,收敛后接入日志平台,落入ES;另一条流对错误日志类型进行统计,并提取日志的关键维度,写入Druid中,为后续聚合分析与报警做数据准备。

4 资源预估

4.1 资源使用现状

Flink任务多,所使用的计算资源、存储资源规模也比较大,很多用户对Flink资源不够了解,不清楚如何设置任务资源,存在资源浪费情况。随着业务流量波动,资源需求也随着变化,难以自动满足资源需求。依赖平台同学自身经验对任务进行资源调优,人力成本高。缺乏资源细粒度使用能力,大量的小任务数据量较小,资源利用率长期比较低。

4.2 算子链路层优化

第一部分,从算子链路层层面进行资源优化。

4.2.1 Metrics增强

Flink任务算子间的传递情况我们可以通过算子输入输出速率来衡量,而算子本身的处理效率,一般需要通过单条消息处理耗时来衡量,Flink运行指标体系中没有相关指标提供,所以我们需要对Metrics做进一步的增强,即修改部分Flink源码,在Flink自身Metircs基础上,为每个算子添加单条消息处理耗时指标,算子处理耗时主要包括反序列化、处理耗时、序列化等耗费的时间。然后可以根据算子输入速率与单条消息处理耗时的比值来计算当前算子所需要最小并发。

4.2.2 去除异常情况

在日常运维中,我们发现不是所有数据延迟问题都是资源不合理导致的,所以需要先排除大部分的非资源问题,再分析任务资源的合理性。其中,比较常见且影响较大的非资源问题有如下两种:

数据倾斜 :若发生数据倾斜,很容易造成任务的延迟,而导致数据倾斜有很多原因,需要人工介入解决,所以需要提前排除。针对延迟的任务,我们通过算子每个sub_task的处理流量判断是否存在数据倾斜情况。通过对流量数据归一化后求方差来衡量是否存在倾斜。

算子效率过低 :算子内部逻辑存在问题,也难以通过调整资源来解决问题,比如复杂的正则解析,阻塞IO等。我们通过设定单条消息处理耗时阈值来排除这些效率过低的算子。

4.2.3 计算任务并发

首先获取Flink任务数据源的生产速率,基于该生产速率的90分位值来预估任务的实际吞吐。目前大部分流任务数据源都是Kafka,我们通过Kafka集群的监控信息获取Kafka生产速率和消费延迟情况。

然后Kafka元数据中的消费延迟情况和Flink自身指标分析任务是否存在延迟等问题,并将任务划分为延迟任务和非延迟任务。针对延迟任务,根据数据源生产速率和单条处理耗时计算得到每个算子需要的并发,并以算子中最大并发作为任务需要扩容的并发值。针对非延迟任务,定期计算该任务所需的最小并发,若当前配置并发远大于任务真实所需并发,则进行缩容;若计算所得真实所需并发与配置并发值相差不大,则暂不调整,因为流任务每次重启都需要一定成本。

4.3 资源指标层优化

算子链路层通过并发的调整是从大粒度上来调整任务的资源,而资源指标层的优化,则是从更小的粒度来精细化调整资源。任务核心资源主要包括内存与CPU两方面,因此利用指标数据可得到当前资源利用情况。

4.3.1 内存预估

首先由Flink内存模型可以得出Flink内存主要包括堆内内存和堆外内存,其中堆内主要包括FrameWork Heap Memory、Task Heap Memory,堆外内存主要包括Managed Memory、Framework Off-heap Memory、Network。

FrameWork为Flink框架本身所用内存,通常比较固定,network默认从总内存的固定比例计算得出,通常不是任务瓶颈。从指标观察和历史经验来说内存的瓶颈主要出现在Task Memory,其中off-heap默认流任务不使用,因此瓶颈主要出现在Task Heap Memory,这也是与真正的数据处理逻辑相关的内存,那么针对内存的资源调优主要由Task Heap Memory推算得出。

内存预估可以由《Java Performance》建议进行预估计算,核心点是获得Full GC之后年老代内存空间大小来预估整个heap所需要的内存,推荐的计算公式如下所示。

Full GC之后年老代内存空间大小可由Flink指标获得,通过观察下图的堆内内存与Full GC监控图可以得出,对于绝大多数的正常运行的Flink任务,其内存使用符合如下规律:随着新数据(如kafka原始数据)的不断摄入,新生代内存使用随着时间推移不断增高,当新生代内存不足时便会进行young gc释放内存。而长期存活的数据(如部分规则数据、缓存数据)则会进入老年代,young gc时不会释放。因为可以根据每次gc后的内存使用低谷作为年老代内存空间的大小。

若任务数据量超大,新生代内存不足,会导致数据进入老年代,导致任务频繁full gc,如下图所示。这种非正常运行的场景,可以根据单位时间full gc次数和当前内存大小预估所需的内存。

因此可以简单汇总规则:1、内存资源过大存在浪费时,可以根据年老代内存空间大小预估内存。2、内存资源不足时根据Full GC次数和当前内存大小设置内存。

4.3.2 CPU预估

CPU资源利用指标由单位数据处理量、单条数据处理复杂度有关。当CPU利用率较高时,可能是代码逻辑导致的CPU飙升,如复杂的正则表达式等;CPU利用率较低时可能是数据量较少或算子内部由阻塞时IO等,因此需要先根据平均单条数据处理耗时排除到异常情况。

根据历史CPU Load 90分位可以得到当前任务CPU的利用情况,之后可以给CPU设置一个合理的利用率区间,当CPU利用率较高时,适当调大CPU资源,反之同理。比如运行在K8S集群的任务,CPU利用率可以精细到千分之一核,通过预估CPU资源可以实现资源的精细化利用。

5. 容器化

Kubernetes(K8S)是容器集群管理系统,在资源管理、容器编排、自动化运维等方面具有一定优势,越来越多的存储、计算服务在和K8S结合。Flink on K8S可以结合K8S的优势从资源利用率、稳定性、运维效率三个方面对Flink任务进行优化。

5.1.1 资源利用率

通过使用上述资源预估机制能够实现资源调优自动化,将资源预估机制与K8S的以下优势结合来提升整体资源利用率:

  • 细粒度资源控制。比如K8S可以通过Cgroup 机制将CPU资源精细划分到千分之一核,一些数据量较小的任务,资源利用率长期比较低,因此可以将CPU限制到小于1核。
  • 动态扩缩容能力。比如通过结合K8S弹性伸缩能力动态调整任务并行度,定期检测Flink任务是否存在资源浪费或资源不足情况,然后动态调整任务使用资源。
  • 服务混合部署。可以与各种在线服务和组件进行混布,进一步提升机器资源使用率。

5.1.2 稳定性

K8S在资源隔离上更为严格,能够避免同一台机器上的任务互相影响。Flink On Yarn 模式,CPU没有隔离,某个实时任务造成某台机器 CPU 使用过高时,会对该机器其他实时任务造成影响。同时K8S有更细粒度的权限控制、弹性自愈能力,能够更好的保障业务稳定。另外,Flink on K8S基于ETCD的HA模式也在不断完善中,能够实现与ZK解耦,减少外部依赖。

5.1.3 运维效率

Flink任务数增多也导致了平台方运维成本增加,而K8S优势之一是容器管理与编排能力,能够提升部署和运维自动化能力,提升Flink任务日常运维效率,比如能够更方便的做任务生命周期管理、Flink版本升级等。另外,Flink on K8S可以支持Flink任务存储与计算分离,在集群变更时能够降低运维成本。

Flink on K8S目前存在多种架构,从0到1实现Flink上云面临着技术架构选型等问题。下面简要介绍几种Flink on K8S方案并进行对比。了解了不同方案的原理、优缺点、能够更好做技术选型。

方案1:Standalone Cluster on K8S

Standalone Cluster on K8S在Flink的早期版本中就已经支持,基本原理是在K8S集群中部署standalone集群然后提交任务。

流程:

1、通过yaml文件预先定义Flink Master deployment、taskmanager deployment、configmap等。

2、通过yaml文件部署Flink JobManager的service将jobmanager服务暴露出去。

3、通过Flink client和JobManager service将任务提交到集群。

总结:

优点:Flink任务通过多个yaml文件在K8S部署standalone集群,Flink无需做任何源码修改,流程简单易操作。

缺点:Flink感知不到K8S的存在,任务资源无法自动调整;没有任务的生命周期管理,任务管理成本较高。

方案2:Native Flink on K8S

为了降低Flink on K8S的使用门槛,同时解决Flink on K8S资源无法自动调整的问题,Flink官方推出了Native Flink on K8S。

流程:

1、Flink client通过内置的K8S client与K8S API Server直接进行通信,之后部署master deployment、jobmanager service、configmap等。

2、JobMaster 通过 K8SResourceManager 向 K8S Master 动态申请资源去创建运行 TaskManager 的 pod,然后 TaskManager再向JobMaster 进行注册。

总结:

优点:native的特点是任务提交的时候才创建 Flink 集群,并且资源可以按需申请;对不熟悉K8S的开发人员比较友好,不涉及太多的K8S概念,部署成本较低。

缺点:缺乏任务生命周期管理能力;K8S集群权限不好管理;不同的用户jar包需要分别构建镜像,需要管理任务镜像;关于K8S相关资源的配置不够灵活,很多自定义配置无法使用,导致Flink Web UI、任务日志等不好管理;没有被大规模生产环境验证。

方案3:Flink K8S Operator

Operator的核心思想是“自动化”,将某一特定场景下的专家人工运维流程通过编码实现自动化,降低运维成本。Operator包含两个核心概念。

  • 自定义资源CRD: 根据具体的业务场景实现自定义资源,CRD可以在不修改K8S源码的基础上方便的扩展K8S的功能。
  • 控制器Controller: 控制器通过内部循环不停地追踪至少一种资源和该资源的期望状态,并且负责确保其当前状态接近期望状态。

Operator通过将CRD和Controller相结合实现了声明式API,声明式API是K8S容器编排的核心,通过API对象来声明期望的状态,并且支持多client以 PATCH 的方式对 API 对象进行修改,K8S可以方便地对API对象进行管理,完成对“实际状态”和“期望状态”的调谐(Reconcile)过程。

流程:

1、在K8S集群中部署Flink Operator,包括FlinkCluster CRD和controller。

2、通过声明式API将FlinkCluster提交给K8S API Server。

3、API Server验证CRD信息并生成FlinkCluster CR存储在ETCD。

4、K8S触发FlinkCluster add事件然后分发到Flink Controller。

5、Flink Controller解析FlinkCluster CR,调用API Server创建底层资源(jobmanager deployment、jobmanager service、ingress、taskmanager deployment)。

6、Flink Controller开始调协循环(reconciliation loop),监控FlinkCluster底层资源的变化并更新相应的状态,不断的采取行动来使得期望状态和观察到的状态无限接近。

7、Job Submitter通过Rest API获取Flinkcluster状态,当Flinkcluster正常启动后,提交Flink任务,提交完成后结束job。

总结:

  • 优点:能够提供生命周期管理能力;通过声明式API管理任务和Operator管理任务,可以方便的自定义K8S资源并进行扩展,如支持远程jar、initContainers、sidecars、volumes、podAnnotations、securityContext等丰富能力;降低运维成本,如自动生成savepoint并重启任务,在任务配置变更或集群升级情况下比较方便;方案比较成熟,业内有通过Operator进行管理Flink任务的案例。
  • 缺点:非native方式,无法根据用户代码并行度自动调整资源;需要对K8S比较了解,有一定门槛。

方案对比

通过上述几种Flink on K8S方案进行概览和对比后,我们了解了其基础原理和优缺点。可以了解到Flink相对于普通后端服务来说较为复杂,包含很多需要考虑的技术细节。同时考虑到大规模Flink任务生命周期管理成本、后续的运维成本、相关方案成熟度、平台方的兼容改造成本等。综上所述,目前贝壳选择了Flink K8S Operator方案。

5.3 存储与计算分离

Flink主要是一个数据计算服务,同时也依赖着外部存储,比如checkpoint、运行日志、提交日志、外部依赖等都需要分布式文件系统存储。存储与计算分离不仅可以降低运维成本,还可以提升整体稳定性。

5.3.1 Flink的存储需求

首先回顾下Flink对于存储的需求。

  • Checkpoint 。Checkpoint是Flink保证数据一致性和错误恢复的基础,Flink支持的存储HDFS、 S3、 GFS、 NFS等,之前Flink运行在Yarn上,checkpoint保存在HDFS上,checkpoint的存储规模在10TB左右,文件数达到了30万。
  • 运行日志 。主要包括Flink jobmanager和taskmanager运行过程中不断产生的日志,目前日志规模在900TB左右。
  • 启动日志 。是指Flink任务短时间提交过程中产生的日志,日志量较少但对于排查问题必不可少,总存储需求只有50MB左右。
  • 依赖文件 。包括Flink任务提交时所需的用户jar、udf jar、或者其他自定义的依赖文件等。

5.3.2 Flink的存储与计算分离方案

Flink on Yarn的存储基本依赖hdfs,K8S若保持使用hdfs需要比较复杂的权限校验且需要添加hadoop相关依赖。

K8S存储的管理通过 PVC/PV 机制将存储如何供应的细节从其如何被使用中抽象出来,使得存储的使用比较清晰易配置,同时降低存储系统变更带来的影响。Flink作为存储需求方只需要通过PVC描述pod期望的持久化存储的属性,比如存储的大小、读写权限等,对于运行日志可以声明较大的HDD存储,对于部分checkpoint较大的任务可以声明少量的SSD存储,为不同的存储需求声明不同的PVC,便于后续的运维管理。PV和StorageClass由则K8S运维方统一管理,PV描述一个具体的Volume的属性,比如Volume的类型、挂载目录、相关的权限信息、远程存储服务器地址等,StorageClass完成PV/PVC绑定。

从上述需求回顾中可以得出结论,Flink on K8S的计算存储分离需要能够支撑较大存储量、能够支持K8S容器环境使用的分布式文件系统。通过调研并与存储团队沟通后我们选择ChubaoFS(储宝文件系统)。

ChubaoFS是为云原生应用提供稳定的,高性能,高可用,可扩展的分布式文件系统和对象存储服务,可为容器平台提供持久化存储解决方案,实现计算与存储分离。ChubaoFS的以下特性能够满足Flink存储与计算分离的需求:

可扩展元数据管理 。ChubaoFS中使用了分布式元数据子系统,以便提供高可扩展性,其元数据子系统可以认为是内存元数据存储。能够应对Flink存储需求文件数多的问题。

优秀的读写性能 。ChubaoFS的存储引擎同时提供了对大文件和小文件的随机/顺序读写的支持并作了性能优化。

支持并发访问 。不同的容器可以并发访问同一个文件,能够支持Flink任务的一些公用的依赖文件的并发访问、共享访问。

支持持久化存储 。当pod挂掉依然存在,pod后能够自动重新挂载,能够满足Flink任务自动重启、历史任务日志保存回溯等需求。

Flink根据业务分为多个namespace,每个namespace使用三个ChubaoFS卷,分别用于存储checkpoint、日志、依赖文件。任务内部checkpoint路径改为容器本地file(如:file:///checkpoint/12345),其中/checkpoint目录对应chubao mountPath。

Flink任务内部的运行日志量比较大但价值密度较低,若通过K8S常用的日志采集的方式写入外部存储系统(如ES)则会增大存储成本。因此,将日志存储系统默认输出路径也挂在上ChubaoFS卷,则可以将运行日志实时写入ChubaoFS。另外,对Operator进行改造,支持将启动日志自定义输出到ChubaoFS Subpath上,启动日志也可以在ChubaoFS查看。

通过taskid和versionid划分chubao subpath,可以minio gateway代理可以在不同路径下看到历史日志详情,如下图所示。

5.4 网络

5.4.1 性能优化

K8S的网络插件通常会带来一定的性能损耗,比如一些基于VXLAN“隧道”机制的网络方案,性能损失都在 20%~30%左右,Flink流任务本质上是pipeline模式的数据处理任务,不同的taskmanager pod之间可能会有大量的数据传输,因此必须考虑网络性能。

我们采用的网络方案为Calico的bgp模式,Calico是一个纯三层的网络插件,容器通信过程降低了额外的封包和解包带来的性能损耗,Calico 使用边界网关协议 (BGP) 来构建帮助实现代理程序节点间通信的路由表,Calico网络可以实现更高的性能和更好网络隔离。

5.4.2 Web UI

Flink Web UI本质上是一个http服务,Flink上云之后需要考虑如何把Web UI暴露,我们将jobmanager http服务抽象为service,service为应用程序公开为网络服务的抽象方法。每个Flink任务通过ingress代理jobmanager service,并且为每个任务指定一个单独的域名,如:{任务id}.xxx.com,但是Flink任务多,每个都是不同的域名,不可能为每个域名都配置hosts,因此通过DNS域名通配符的方案进行统一管理。

5.5 生产实践中的优化

5.5.1 生命周期管理优化

在平台进行任务生命周期管理的时候,通常需要查询任务状态,比如任务启停或任务监控。Flink上云我们采取了Flink K8S Operator,其中使用到了CRD,CRD在ETCD里面是以JSON格式来存储的,而K8S的API对象是以protobuf格式存储,在资源对象数量多的时候JSON的序列化和反序列化性能会成为瓶颈。另外,随着运行在K8S上的任务增多,CR也随着增多,任务的生命周期管理需要频繁获取任务状态,目前运行在K8S上的任务上千个,频繁的查询K8S API Server会遇到性能瓶颈。

如何解决上述问题,我们采用的办法是双“Operator”模式,除了Flink K8S Operator,我们使用kubernetes-client/java新起了一个Operator,用另一个Operator的Informer去监听同一类型CRD,基于使用通过Etcd的Watch机制获取任务最新状态之后并缓存到MySQL并提供查询API,平台侧查询任务状态时的性能得到了大幅提升,也降低了K8S API Server的压力。

5.5.2 任务启动慢优化

Flink任务迁移到K8S上我们发现了部分任务启动很慢,从之前的在Yarn上1分钟内启动完成到4分钟以上才能运行起来,会影响用户体验。我们分析任务启动流程,来缩短任务启动时间。

首先从Operator层面进行优化,Operator 默认设置的readiness的检查周期较长,默认为60s,pod启动成功后到任务pod就绪可能会有时间间隔,因此将检测周期减小,减小了几十秒的启动时间。

之后,从容器内部进行优化,通过查看pod监控可以看到Flink任务启动过程中CPU利用率几乎到达了100%,需要CPU资源较多,而我们的资源预估和K8S的资源细粒度控制能力将一些长期数据量很小的任务CPU限制到了0.1核。0.1核对于一些任务启动成功后处理数据来说是足够的,但是会导致启动时间变长,那么为了能够精细化使用资源同时降低启动耗时,我们深入分析了Flink启动流程,查看为什么启动过程会耗费大量CPU资源。

通过查看进程的CPU使用占比发现,任务启动过程中耗费了很多时间在实行启动脚�