贝壳找房 | 基于 Milvus 的向量搜索实践(一)

贝壳找房 | 基于 Milvus 的向量搜索实践(二)

摘要: 本系列文章分为三部分,第一篇主要讲基本概念、背景、选型及服务的整体架构;第二篇主要讲针对低延时、高吞吐需求,我们对Milvus部署方式的一种定制;本篇主要讲实现数据更新、保证数据一致性,以及保证服务稳定及提高资源利用率做的一些事情。

1.数据存储方案

第二篇中我们解决了部署方案的问题,接下来要考虑的是数据如果存储。在分布式部署情况下,Milvus是需要使用Mysql来存储元数据的[1]。Milvus分布式部署时,数据只会写一份,如何实现数据的分布式使用呢?基本的思路有两种:1)内部数据复制,典型的例子如elasticsearch[2],kafka[3][4];2)数据存储在共享存储上,如NFS,glusterfs,AWS EBS,GCE PD,Azure Disk等,都提供了kubernetes下的支持[5]。两种思路没有本质的区分,前者是应用自己实现了数据的存储及高可用(多副本);缺点是应用复杂度增加;优点是具有更高的灵活性。后者依赖于已有的通用的存储方案,只需要关注自身的核心功能,复杂度降低了,而且更方便在多种存储方案下切换。在云计算技术发展的今天,后者有一定的市场。Milvus选用了共享存储来存储数据。为了实现存储的统一及高可用,我们把单个Milvus集群所涉及到的所有数据存储(mysql数据文件和milvus的存储),都放到共享存储中。我们使用了glusterfs做为共享存储的具体实现。整体的存储方案如图1。

图1 使用glusterfs存储数据为了解决集群的自动创建,减少沟通维护成本以及物理资源的最大利用(Milvus是cpu密集型,glusterfs是存储密集型),我们将glusterfs同Milvus混合部署。我们参考实现了glusterfs在kubernetes下的超融合(Full Hyper-Convergence)部署,并借助heketi[7]实现了存储资源的动态分配。另外,在部署过程中,还需要注意的是glusterfs需要一个独立的磁盘/分区,你也可以使用loop设备[8];在部署过程中,因为各种原因,不可避免需要重置部署,这时你需要清除脏数据,可以参照以下命令。

# 清除逻辑卷
lvscan | awk 'system("lvremove  -y "$2 )'

# 清除卷分组
vgscan | grep group | awk -F '"' '{system("vgremove "$2)}'

glusterfs在kubernetes下的部署架构如图2所示,glusterfs服务可以分布在kubernetes的多个node上,我们可以根据存储的需求增加结点。

图2 glusterfs in kubernetes

实现了glusterfs在kubernetes的部署,我们更关心的是glusterfs本身的可用性:1)glusterfs是否可以实现数据的不丢失/高可用;2)glusterfs是否可以存储大批量数据。

由[9]可知,glusterfs有Distributed volume、Replicated volume、Distributed Replicated volume、Dispersed Glusterfs Volume、Distributed Dispersed Glusterfs Volume 5种类型的卷,其中Distributed volume可以解决数据分布存储数据,从而实现大批量数据的存储,Replicated volume通过数据的冗余来实现高可用,Distributed Replicated volume同时解决了高可用和大批量数据存储的问题,Dispersed Glusterfs Volume、Distributed Dispersed Glusterfs Volume是分别对Replicated volume、Distributed Replicated volume的优化,借助一种前向纠错码(erasure code[10])实现数据存储成本的降低。图3给出了Distributed Replicated volume类型卷的结构图。

图3 Distributed Replicated volume

最后,借助heketi[7]、以及kubernetes的StorageClass[11]、PVC[12],我们屏蔽掉了以上glusterfs卷创建、扩容、销毁的细节,比较完美解决了数据存储的问题。

2.数据更新方案

数据更新分为实时更新和批量/全量更新两种,Milvus本身是支持实时更新的,但是数据更新时需要重新创建索引,而索引构建需要消耗大量的CPU资源,从而引发服务整体的稳定性问题。综合考虑稳定性,以及业务的数据更新场景(绝大多数是T+1更新策略),我们采用了如图4所示的数据更新策略。

我们使用了A、B两组对等的资源(可以是同机房、跨机房)作为底层Milvus引擎,在引擎的外层,我们实现了读写分离,同一时刻,A、B集群只会承担读、写角色中的一个。在引擎外层,我们维护了读写角色与A、B集群的对照表;数据更新时,我们操作写集群完成数据写入、索引构建,写集群索引构建完成后,切换成角色成读集群;数据更新时出现任何问题,不影响读集群。另外,在读写集群都有正常数据(数据更新差一天)情况下,如果读集群出现问题,写集群可以随时切换成读集群,从而在实现数据更新的同时还实现了互备。由于底层资源使用对等的两份,如何没有特别的处理,不可避免会造成资源的浪费,后面内容会专门讨论解决这个问题的方案。

图4 T+1数据更新策略

3. 数据一致性保证

解决了数据更新的问题,另一个问题接踵而来:如何保证数据更新时一致性?如何做到以下三点:1)数据量不多不少;2)数据不重复;3)旧数据不会覆盖新数据。

由于我们的前提是数据全量更新,在业务数据本身不重复的情况下,不会存在数据覆盖问题,我们重点讨论前两点。

3.1 数据量不多不少

我们总体思路是,明确写入操作开始和结束(提供专门的api实现),在结束时检验数据量。数据全量写入开始时,我们清空数据,在数据全量写入结束时,判断数据写入的实际数量与预期是否一致,如果一致,我们可以确认数据数量是没有问题的。数据写入操作可以并发进行,以保证整体的写入吞吐量,但是需要使用方保证,结束写操作需要在所有写入操作之后。另外,为了兼顾数据一致性、引擎稳定性以及服务整体可用,可以设定一致性错误容忍度(比如可以容忍多少比例的数据量差异)。

3.2 数据不重复

我们假设,写入Milvus的请求返回成功,数据写入成功;请求返回失败,数据写入失败。

我们写入Milvus时,通过同步阻塞来实现数据不重复。具体地,写入时,我们设定写入超时时间大于引擎内部写入请求的处理时间,也就是留出足够时间来让引擎返回成功/失败(即感知到引擎因为各种问题引起的失败);如果失败,我们会执行一次删除操作(删除可能写入的指定数据),并进行重试(如果重试指定次数还未成功,会由数据量校验来决定是否全量更新成功)。

除了以上方案,还有两种可选的方案:

  1. 外部维护一个数据是否已经写入的标识,数据写入前进行判断,如果已经存在,就不再写入。
  2. Milvus自身支持upset(如果不存在就插入,如果存在就更新)操作。

方案1在实现同步阻塞方案效果的基础上,还兼顾了使用方与向量服务之间的可能网络异常(写入成功,但是没有返回给业务方,业务方重试,导致数据写入重复;Milvus在0.8.0下不能去重);但是,增加了额外的开销,系统的复杂度也随之增加。

方案2是一个更优秀的方案,把去重的工作外部透明了。当然,这个依赖于Milvus的版本迭代[13]。

图5展示了数据T+1全量更新的步骤:

  1. 全量写开始 - 删除Milvus中旧数据,清除内外id映射数据,扩容Milvus写实例。
  2. 批量写 - 向Milvus写实例批量写入数据,失败重试。
  3. 结束写 - 检验数据量是否符合预期。
  4. 触发异步建索引 - 调用Milvus建索引接口(数据量大时建索引接口可能会阻塞)。
  5. 异步等待 - 调用Milvus建索引接口返回(超时/完成),循环判断是否建索引成功(可以根据showCollectionInfo接口的返回判断)。
  6. 引擎预热 - 让引擎把数据加载到内存中;多partition时需要遍历所有的partition才能保证所有数据都加载。
  7. 引擎切换 - A、B引擎集群角色互换,并把对应关系持久化;对原有的读集群缩容。

图5 数据全量更新流程

4.存活检测

在Milvus0.8.0使用过程中,多次出现cpu指令异常,导致Milvus服务退出的情况;但是,由于Milvus没有暴露存活检测的接口,Milvus Pod 14 还被认为可用,还会有流量被负载均衡到,从而引发外部使用报错。

解决方式很直接,我们需要给Milvus0.8.0增加存活检测的接口,并且在kubernets下配置上对Milvus的检测。由[15]可知,kubernetes有readinessProbe、livenessProbe两者存活检测的手段,前者用于检测服务是否正常启动,后者用于检测服务正式在正常运行,如果不正常,会有相应的重启策略。

readinessProbe、livenessProbe的具体实现有exec、httpGet、tcpSocket三种;exec定时到指定容器中执行一个shell命令;httpGet定时请求容器暴露的http接口;tcpSocket是定时请求容器暴露的socket端口;三者根据指定格式的返回结果来判断服务是否正常,根据Probe配置来决定是否重启。具体的配置可以参考[15]。

有了kubernetes的支持,我们剩下需要做的就是如何判断Milvus是否正常;幸运的是,Milvus虽然没有暴露kubernetes指定格式的Probe接口,但是它提供的server_status接口可以判断服务是否正常运行。接下来,我们需要做的,就是如何包装下这个接口,返回kubernetes指定的格式。

最直接、简单的方案是exec。我们给原生Milvus0.8.0版本的docker镜像增加了执行python脚本功能的能力,并把以下python脚本打包到镜像中,最后exec配置定时调用以下脚本。我们使用这个思路初步解决了问题,但是,在后续的测试验证过程中发现,当同一台机器上存在多个Milvus实例时,服务空转时就消耗了不少的cpu资源。我们由[16]可知,exec最终调用了docker的exec api[17],docker exec api在执行shell命令外,它还做了不少额外工作,从而导致对资源的消耗[18]。

from milvus import Milvus, IndexType, MetricType, Status
client = Milvus(host='localhost', port='19530')
try:
 status,msg= client.server_status(timeout=10)
except Exception as e:
 print('1')
else:

 if status.OK():
  print('0')
 else:
  print('2')

为了解决exec的问题,我们采用了图6的方案。基于以上的分析,我们把python脚本包装成一个http服务器,在容器启动时,将http服务器启动为一个常驻的进程,然后我们采用httpGet方案解决检测的问题。经过实践检验,该方案对性能和资源占用基本没有影响。

图6 httpGet存活检测方案## 5.资源伸缩

考虑到资源的充分利用(我们重点考虑cpu资源),我们有必要在不使用时,对资源进行回收。对资源的回收有手动和自动两方案,整体思路见图7。

图7 资源伸缩### 5.1 手动