1、前言

本文讨论了京东搜索在实时流量数据分析方面,利用Apache Flink和Apache Doris进行的探索和实践。流式计算在近些年的热度与日俱增,从Google Dataflow论文的发表,到Apache Flink计算引擎逐渐站到舞台中央,再到Apache Druid等实时分析型数据库的广泛应用,流式计算引擎百花齐放。但不同的业务场景,面临着不同的问题,没有哪一种引擎是万能的。我们希望京东搜索业务在流计算的应用实践,能够给到大家一些启发,也欢迎大家多多交流,给我们提出宝贵的建议。

2、搜索业务形态

京东集团的新使命是“技术为本,致力于更高效和可持续的世界”。京东搜索作为电商平台的一个入口,为众多商家与用户提供连接的纽带。京东搜索发挥着导流的作用,给用户提供表达需求的入口;为了正确理解用户意图,将用户的需求进行高效的转化,线上同时运行着多个AB实验算法,遍及POP形态与自营形态的多个商品,而这些商品所属的品类、所在的组织架构以及品牌店铺等属性,都需要在线进行监控,以衡量转化的效果和承接的能力。

3、实时技术的挑战

目前搜索上层应用业务对实时数据的需求,主要包含三部分内容:

1、 搜索整体数据的实时分析。

2、 AB实验效果的实时监控。

3、 热搜词的Top榜单以反映舆情的变化。

这三部分数据需求,都需要进行深度的下钻,维度细化需要到SKU粒度。同时我们也承担着搜索实时数据平台的建设任务,为下游用户输出不同层次的实时流数据。

我们的用户包括搜索的运营、产品、算法以及采销人员。虽然不同用户关心的数据粒度不同、时间频率不同、维度也不同,但是我们希望能够建立统一的实时OLAP数据仓库,并提供一套安全、可靠的、灵活的实时数据服务。

目前每日新增的曝光日志达到几亿条记录,而拆分到SKU粒度的日志则要翻10倍,再细拆到AB实验的SKU粒度时,数据量则多达上百亿记录,多维数据组合下的聚合查询要求秒级响应时间,这样的数据量也给团队带来了不小的挑战。

4、实时技术架构演进

我们之前的方案是以Apache Storm引擎进行点对点的数据处理,这种方式在业务需求快速增长的阶段,可以快速的满足实时报表的需求。但是随着业务的不断发展、数据量逐渐增加以及需求逐渐多样化,弊端随之产生。例如灵活性差、数据一致性无法满足、开发效率较低、资源成本增加等。

为解决之前架构出现的问题,我们首先进行了架构升级,将storm引擎替换为Apache Flink,用以实现高吞吐、exactly once的处理语义。同时根据搜索数据的特点,将实时数据进行分层处理,构建出PV流明细层、SKU流明细层和AB实验流明细层,期望基于不同明细层的实时流,构建上层的实时OLAP层。

OLAP层的技术选型,需要满足以下几点:

1:数据延迟在分钟级,查询响应时间在秒级

2:标准SQL交互引擎,降低使用成本

3:支持join操作,方便维度增加属性信息

4:流量数据可以近似去重,但订单行要精准去重

5:高吞吐,每分钟数据量在千万级记录,每天数百亿条新增记录

6:前端业务较多,查询并发度不能太低

通过对比目前业界广泛使用的支持实时导入的OLAP引擎,我们在druid、ES、clickhouse和doris之间做了横向比较:

通过对比开源的几款实时OLAP引擎,我们发现doris和clickhouse能够满足我们的需求,但是clickhouse的并发度太低是个潜在的风险,而且clickhouse的数据导入没有事务支持,无法实现exactly once语义,对标准sql的支持也是有限的。

最终,我们选定doris作为聚合层,用于实时OLAP分析。对于流量数据,使用聚合模型建表;对于订单行,我们将聚合模型换成Uniq模型,保证同一个订单最终只会存储一条记录,从而达到订单行精准去重的目的。在flink处理时,我们也将之前的任务拆解,将反复加工的逻辑封装,每一次处理都生成新的topic流,明细层细分了不同粒度的实时流。新方案如下:

目前的技术架构中,flink的任务是非常轻的,state状态非常小,并没有使用KeyedState自定义状态,而OperatorState中只包含kafka的offset信息,这样保证任务的运行开销很小,稳定性大大提升。同时基于生产的数据明细层,我们直接使用了doris来充当聚合层的功能,将原本可以在flink中实现的窗口计算,下沉到doris中完成。利用doris的routine load消费实时数据,虽然数据在导入前是明细粒度,但是基于聚合模型,导入后自动进行异步聚合。而聚合度的高低,完全根据维度的个数与维度的基数决定。通过在base表上建立rollup,在导入时双写或多写并进行预聚合操作,这有点类似于物化视图的功能,可以将数据进行高度的汇总,以提升查询性能。

在明细层采用kafka直接对接到doris,还有一个好处就是这种方式天然的支持数据回溯。数据回溯简单说就是当遇到实时数据的乱序问题时,可以将“迟到”的数据进行重新计算,更新之前的结果。这是因为我们导入的是明细数据,延迟的数据无论何时到达都可以被写入到表中,而查询接口只需要再次进行查询即可获得最新的计算结果。最终方案的数据流图如下:

5、技术取舍

实时数据处理的技术实现,需要在低延迟、准确性和成本之间进行取舍。我们采用doris作为实时仓库的聚合层,其实也是在多方面进行了取舍。例如:

1、routine load的导入任务,为了达到更高的写入吞吐量,我们将实时导入任务的最大时间间隔设置了30s,即增加了导入延迟,换来了更大的吞吐

2、为了降低开发成本,节省计算资源,我们通过建立rollup来支持快速的查询需求,但是却增加了存储压力以及写入时的IO压力

3、PV、UV等流量指标在聚合时采用的是HLL计算,降低了精度,换来了更短的查询响应时间

以上几点取舍,是结合业务场景与需求的要求而决定的,并非绝对的情况。所以,面对实时数据大规模、无界、乱序等特点,实时流计算的选型,最终考虑的就是如何取舍。

6、Doris在大促期间的优化

上文提到我们在doris中建立了不同粒度的聚合模型,包括PV粒度、SKU粒度以及AB实验粒度。我们这里以每日生产数据量最大的曝光AB实验模型为例,阐述在doris中如何支持大促期间每日新增百亿条记录的查询的。

AB实验的效果监控,业务上需要10分钟、30分钟、60分钟以及全天累计等四个时间段,同时需要根据渠道、平台和一二三级品类等维度进行下钻分析,观测的指标则包含曝光PV、UV、曝光SKU件次、点击PV、点击UV等基础指标,以及CTR等衍生指标。

在数据建模阶段,我们将曝光实时数据建立聚合模型,其中K空间包含日期字段、分钟粒度的时间字段、渠道、平台、一二三级品类等,V空间则包含上述的指标列,其中UV和PV进行HLL近似计算,而SKU件次则采用SUM函数,每到来一条记录则加1。由于AB实验数据都是�