如何从单个域事件处理程序失败或整个系统在事务提交后但在所有域事件处理程序处理事件之前崩溃的情况中恢复。

与软件中的大多数(如果不是全部?)问题一样,这里没有万能的解决方案。相反,您必须找到最能满足您的特定系统要求的解决方案。在这篇博文中,我们将研究一种保证最终一致性的简单(ish)方法,即使我们不时错过一个领域事件。这些示例假设我们使用的是 Spring 和 Java,但这些原则也适用于其他框架和语言。

总体思路

这种方法背后的想法是使用计划作业而不是(或除此之外)域事件来同步聚合之间甚至有界上下文之间的状态。这些作业在一天中的不同时间间隔或时间自动运行,但也可以由域事件处理程序触发。这意味着如果一切顺利,数据将在域事件触发后的*几秒钟内在整个系统中变得一致。*另一方面,如果出现问题,数据将*在下一次成功的计划运行后*变得一致。

图片

像这样的系统并不难实现,但也没有乍看之下那么简单,因为有一些警告。但是,您可以通过遵循以下准则来避免它们:

让您的工作接受所有或部分输入

当您运行计划作业时,它通常是某种适用于所有适用输入的批处理操作。但是,在这种特殊情况下,您还希望能够仅在一个特定输入或一小组输入上运行作业。这提供了很大的灵活性,如果您从一开始就以这种方式设计工作,则很容易做到。

例子

假设您有一项工作将为已发货的订单创建发票。该类将具有以下方法:

@Component
public class InvoiceCreationJob {

    @Transactional(propagation = REQUIRES_NEW)    
    public void createInvoiceForOrders(OrderId... orders) {
        //...
    }

    @Transactional(propagation = REQUIRES_NEW)    
    public void createInvoiceForAllOrders() {
        //...
    }
}
  • 第一种方法为其 ID 已作为方法参数传递的订单创建发票(当然,前提是这些订单已发货但尚未开票;稍后会详细介绍)。
  • 第二种方法是批处理作业,它为*所有*已发货但尚未开票的订单创建发票。

解耦 Job 和 Job 的触发

在设计工作时,请考虑何时以及如何触发它们:

  • 系统什么时候启动?
  • 由用户手动?
  • 响应域事件?
  • 作为预定工作?
  • 通过 JMX?
  • 在其他方面你还不知道?

我会推荐一个看起来像这样的设计:

图片

  • 作业本身是一个单独的对象,当被其他对象告知时,它会做它的事情。
  • 应用程序服务启动作业以响应人类用户操作。
  • 域事件处理程序启动作业以响应域事件。
  • 工作人员处理计划的作业执行(例如每小时一次、每十分钟一次或每天午夜一次)。

例子

让我们继续发票生成示例。我们从一个域事件处理程序开始,它会在订单发货后立即生成发票:

@DomainEventHandler
public class InvoiceGenerationOnShipmentTrigger {

    private final InvoiceCreationJob job;

    InvoiceGenerationOnShipmentTrigger(
            InvoiceCreationJob job) {
        this.job = job;
    }

    @TransactionalEventListener
    public void onInvoiceShipped(InvoiceShippedEvent evt) {
        job.createInvoiceForOrders(evt.getOrderId());
    }
}

接下来,我们要在每天午夜触发作业:

@Component
public class InvoiceGenerationWorker {

    private final InvoiceCreationJob job;

    InvoiceGenerationWorker(InvoiceCreationJob job) {
        this.job = job;
    }

    @Scheduled(cron = "0 0 0 1/1 * ? *")
    public void atMidnight() {
        job.createInvoiceForAllOrders();
    }
}

最后,我们希望管理员能够通过应用服务随时触发作业:

@Service
public class InvoiceGenerationService {

    private final InvoiceCreationJob job;

    InvoiceGenerationService(InvoiceCreationJob job) {
        this.job = job;
    }

    @Secured("ROLE_ADMIN")
    @Async
    public void createInvoiceForAllOrders() {
        job.createInvoiceForAllOrders();
    }
}

使您的工作具有幂等性

当您编写作业时,您知道系统的预期最终状态将是什么。但是,您永远不应该对此做出任何假设,也不应该对起始状态做出假设。你的工作应该总是*检查*开始状态是什么,然后弄清楚它需要采取什么行动才能达到所需的结束状态。如果系统已经处于期望的状态,你的工作*应该什么都不做*。

如果您的工作是这样实现的,您可以使用相同的输入多次运行它,并且不会产生任何副作用(例如为同一订单生成多张发票)。如果由于网络错误或断电等原因导致作业执行失败,您可以稍后重新运行它。

例子

让我们再次继续发票生成示例。在初始设计中,订单将遍历的状态如下:

图片

因此,我们通往幂等性之路的第一步是检查订单的状态,并且仅在该状态为 时才采取行动SHIPPED。对于每个发货的订单,该作业将执行以下操作:

图片

但是,这里有一个问题:如果第二笔交易由于某种原因失败了怎么办?这意味着下次作业运行时,即使状态为 ,也已经有发票SHIPPED

因此,我们通往幂等性之路的下一步是检测是否已经为特定订单创建了发票。我们可以通过向聚合中添加一个OrderId字段来做到这一点,并Invoice在创建新发票之前使用它来检查是否未创建任何发票。如果发票已经存在,则作业将继续将状态设置为INVOICED(您将需要一些数据库约束或悲观锁定来正确实现此功能,但这超出了本文的范围)

在更复杂的情况下,您可能必须添加一些中间状态(例如INVOICE_GENERATION_IN_PROGRESS),以便在发生故障时能够从中断的地方继续。

避免同时执行同一作业

由于您的工作可以通过多种不同的方式启动,因此存在在它已经运行的同时启动它的风险。如果作业是幂等的,这应该不会导致任何数据一致性问题,但最终可能会导致事务死锁或其他并发问题。这也是资源的浪费,可能会减慢系统的其他部分。

如果同一作业的同时执行很少发生,您可以选择忍受它并让幂等性处理它。但是,如果它经常发生,你应该以某种方式处理它。您采取的操作取决于用例,但通常是以下之一:

  • 让当前的工作完成,不要开始新的工作。
  • 取消当前作业,然后开始新作业。
  • 让当前工作完成,然后开始新工作。

例子

我可以只写一篇关于这个特定问题的完整博文,所以对于这个例子,我们将缩小范围并继续使用发票生成案例。

假设我们正在处理一个仅部署为单个实例的小型应用程序,因此我们无需担心相同作业同时在不同机器上运行的情况。

我们要做的第一件事是确保一次只能运行一个批处理操作实例。最简单的方法是使用AtomicBoolean(因为InvoiceCreationJob是单例):

@Component
public class InvoiceCreationJob {

    private final AtomicBoolean working 
        = new AtomicBoolean(false);

    @Transactional(propagation = REQUIRES_NEW)    
    public void createInvoiceForAllOrders() {
        if (working.compareAndSet(false, true)) {
            try {
                // Do the work
            } finally {
                working.set(false);
            }
        }
    }
}
  • 如果working变量是false, if until 设置为true并且允许开始工作。完成后,将working变量设置回false.
  • 如果working变量是true,则该方法将不执行任何操作。

这已经足够好了,因为批处理操作仅用作备用解决方案,以防我们无法正确处理某些InvoiceShippedEvents。

createInvoiceForOrders方法呢?此方法可能会同时针对不同Order的 s 运行,但这不是问题,因此我们不想阻止这种情况。

Order尽管它仍然*可能*发生,但该方法不太可能同时运行。在这种情况下,我们可以依靠操作的幂等性。这同样适用于同时运行的createInvoiceForOrders情况createInvoiceForAllOrders

明智地使用线程

当你实现一个作业时,你应该考虑它将在哪个线程中运行。无论它是如何启动的,它是否总是在它自己的线程中运行,或者在启动它的线程中,或者两者的组合?

让我们看一些例子来说明为什么这很重要:

  • 假设一个工作计划由一个工人连续执行,有五分钟的延迟。这意味着新作业应在前一个作业完成五分钟后开始。现在,如果作业在其自己的线程中运行,则工作线程会在作业线程开始时认为它已经完成。如果工作需要超过五分钟,即使旧工作仍在运行,工人也会开始新工作。在这种情况下,最好让作业在启动它的线程中运行。
  • 用户通过应用服务启动另一个作业。这也是一项长期的工作。现在,如果作业在启动它的线程内运行,它将阻塞 UI,直到作业完成。在这种情况下,最好让作业在自己的线程中运行。

现在假设上面提到的工作实际上是同一个工作。为了让您的选项保持开放,*您应该始终实现您的作业以在调用线程内运行*,然后*让调用者担心使用哪个线程*。

例子

向上滚动到演示触发作业的不同方式的示例。请注意,应用程序服务使用@Async注释,而域事件处理程序和工作人员不使用。为什么?让我们来看看每个案例:

  • 应用程序服务通过 UI 调用,并由管理员用于手动触发批处理作业。这项工作可能需要一些时间才能完成,我们不希望这会阻塞 UI。因此,我们在后台线程中异步运行它。
  • 域事件处理程序在事件到达时被调用,这是一种边缘情况:当您有少量快速返回的事件处理程序时,您可以在调用线程中运行它们。但是,如果您有大量的处理程序或其中一些需要时间来完成,您可能希望在后台线程中运行它们。
  • worker 已经被后台线程调用了,所以我们可以继续使用它。

在查询中使用上限

您的作业将执行不同的查询以确定系统的状态。需要处理的数据量取决于许多因素,例如系统的使用量或上次运行作业的时间。

正因为如此,无论何时编写查询,都应遵循良好的经验法则:任何查询都将不返回任何记录、仅返回一条记录或大量记录

除非您确定查询属于前两个类别,否则应始终为查询添加上限。您可以实现作业以批量运行,直到没有更多数据要处理,或者只处理前 N 条记录,然后等待下一次运行。

明智地安排工作

安排工作时,您通常有以下选择:

  • *固定速率*:作业以固定速率启动,例如每 60 分钟一次。
  • *固定延迟*:作业在上一次执行完成后以固定延迟启动。
  • *Cron*:作业在一天中的特定时间或一周中的某一天开始,例如每小时过去五分钟,或周日凌晨 2 点。

为您的工作选择正确的时间表可能需要一些试验和错误。如果您运行得太频繁,您最终可能会在旧作业完成之前开始新作业,从而减慢整个系统的速度并导致系统其他部分的事务死锁。如果你跑得太少,用户就会不高兴。您可能必须尝试不同的时间表才能找到最佳平衡点。

最后记得把你所有的工作作为一个整体来看待:处理相同数据的不同工作之间是否存在冲突的风险?不同的重量级作业是否同时运行?有些工作需要在其他工作之前运行吗?

架构考虑

现在,当我们熟悉了一般概念后,我们需要看看它如何适应系统架构(假设为六边形)。

我们已经知道应用程序服务和领域事件处理程序,但是工人和工作会去哪里呢?让我们从工人开始,因为它是最简单的。

工作器是一个组件,它触发操作以响应计时器生成的事件。如果需要,它可以控制自己的事务和线程。这将它与域事件处理程序一起放入*编排*器类别,因此它属于应用程序层。

但是工作本身呢?答案是:视情况而定。继续阅读。

单一有界上下文

当作业是关于在同一个有界上下文中传播更改时,该作业最有可能实现为*域服务*。域事件处理程序、工作者和应用程序服务位于应用程序层,并根据需要调用域服务(并且还控制事务)。

多个有界上下文

当工作是关于将更改从一个有界上下文传播到另一个时,我们正在谈论上下文映射。作业的实现和位置取决于您最终选择的集成模式。让我们看三个例子(有更多的解决方案,但我们必须在某处划清界限)。

客户供应商和墨守成规者

在这种情况下,两个上下文之间存在单向关系,其中*下游*上下文希望*上游*上下文执行某些操作(推)或返回某些内容(拉)。这通常会导致以下架构:

图片

  • 上游上下文提供了一个适配器,下游上下文可以使用该适配器来访问系统。
    • 在客户-供应商关系中,需要上游团队根据下游团队的需求来设计这个适配器。
    • 在顺从关系中,下游团队将使用上游团队选择放入适配器的任何内容,而没有任何发言权。
  • 作业本身是一个应用程序服务委托。这意味着它存在于应用层,但它不是一个纯粹的应用服务,因为它根本不执行任何安全检查。
  • 如果上下文在同一个单体应用程序中运行,则作业可以由来自两个上下文的域事件触发。
  • 如果上下文在单独的应用程序中运行,则作业只能由来自下游上下文的域事件触发,因为我们无法分发事件。再说一次,这也是我们改用预定作业的原因之一。

反腐败层

在这种情况下,上下文之间再次存在单向关系,但在这种情况下,下游团队决定将应用程序与上游上下文完全屏蔽:

图片

  • 下游上下文提供了一个适配器(反腐败层),它使上游上下文适应下游团队已声明的 API。
  • 该作业仍然是应用程序服务委托,但它通过 API 与上游上下文对话。
  • 如果反腐败层支持将事件从上游上下文传播到下游上下文,则作业可以由来自两个上下文的事件触发。否则,它仅限于来自下游上下文的事件。

中间件

如果您无法控制任何涉及的上下文,或者即使上下文本身不知道彼此的存在,关系是双向的,中间件解决方案可能会派上用场:

图片

  • 作业、工作人员、任何域事件处理程序和其他组件都从有界上下文中移出并进入单独的*中间件*组件。
  • 中间件负责在上下文之间来回移动数据,而上下文甚至不知道它。
  • 这适用于单体和分布式系统。

优点和缺点

现在是时候总结一下使用计划作业实现最终一致性的利弊了。

这种方法的主要优点如下:

  • @TransactionalEventListener到目前为止,它可以与我们在此博客文章系列中介绍的所有其他基于 Spring 的构建块和工具一起工作。
  • 您不需要引入任何新的移动部件(如消息代理)或编写任何基础架构代码来持久化和分发域事件。
  • 它很容易实现,并且可以根据用例调整其复杂性。
  • 如果做得好,它会导致系统具有弹性和健壮性。

但是,它也有几个缺点:

  • 您的系统实际上不是事件驱动的。有些事件用于提前触发作业,但作业本身是数据驱动的。
  • 额外的状态可能必须存储在聚合中以指导作业(例如,为了实现幂等性)。
  • 随着作业必须完全了解所涉及的所有聚合,耦合会增加。
  • 当需要快速传播更改时没有用。
  • 对于您添加的每项工作,您的数据库将需要做的工作越多。这增加了事务死锁和性能问题的风险。
  • 横向扩展或引入冗余需要特别注意:您不希望多个服务器同时开始对相同数据运行相同的作业(即使幂等作业应确保数据不会损坏)。