如果你在大数据工作,你很可能听说过Apache Airflow。它于 2014 年在Airbnb启动开源项目,以帮助公司处理其批量数据管道。自那时起,它已成为数据工程中最受欢迎的开源工作流管理平台之一。

Apache Airflow是用 Python 书写的,它具有灵活性和稳健性。其强大且设备齐全的用户界面简化了工作流程管理任务,如跟踪工作和配置平台。由于它依赖于代码来定义其工作流程,用户可以在过程的每一步编写他们想要执行的代码。

今天,我们将探索这个流行的工具的基本知识,以及基本面。

我们将涵盖:

什么是Apache Airflow?

Apache Airflow是用于编程编写、调度和监控工作流的强大调度器。它旨在处理和协调复杂的数据管道。它最初是为了解决与长期任务和大量脚本相对应的问题而开发的,但它已发展成为市场上最强大的数据管道平台之一。

我们可以将Airflow描述为定义、执行和监控工作流的平台。我们可以将*工作流程*定义为您为实现特定目标而采取的任何步骤序列。成长中的大数据团队中常见的问题是,在端到端工作流中将相关作业拼接在一起的能力有限。在Airflow之前,有 Oozie,但它有很多限制,但Airflow已经超过了它复杂的工作流。

Airflow也是一个代码第一的平台,其设计理念是数据管道最好作为代码表达。它被构建为可扩展的,可用插件,允许与许多常见的外部系统交互,以及平台,使自己的平台,如果你想。它有能力每天执行数千项不同的任务,简化工作流程管理。

Airflow用于许多行业:

  • 大数据
  • 机器学习
  • 计算机软件
  • 金融服务
  • IT 服务
  • 银行业
  • 等。

image-20211025151606085

Apache Airflow有何不同?

下面列出了Airflow和其他工作流管理平台之间的一些差异。

  • 定向循环图 (DAGs) 以 Python 书写,Python 具有平滑的学习曲线,比 Oozie 使用的 Java 应用更为广泛。
  • 有一个大社区为 Airflow 做出贡献,这使得为主要服务和云提供商找到集成解决方案变得容易。
  • Airflow是多才多艺的,富有表现力的,并建立创造复杂的工作流。它提供了有关工作流的高级指标。
  • 与其他工作流管理平台相比,Airflow具有丰富的 API 和直观的用户界面。
  • 其使用 Jinja 模板允许使用案例,例如引用与 DAG 运行日期对应的文件名。
  • 有管理Airflow云服务,如谷歌作曲家和Astronomer.io。

为什么要使用Apache Airflow?

在本节中,我们将查看Airflow的一些利弊,以及一些值得注意的使用案例。

优点:

  • 开源:您可以下载Airflow并立即开始使用它,您可以与社区中的同行合作。
  • 与云集成:Airflow在云环境中运行良好,为您提供了许多选择。
  • 可扩展:Airflow上下可高度扩展。它可以部署在单个服务器上,也可以扩展到具有多个节点的大型部署。
  • 灵活和可定制:Airflow的构建是为了配合大多数软件开发环境的标准架构,但其灵活性允许大量定制机会。
  • 监控能力:Airflow允许多种监控方式。例如,您可以从用户界面查看任务的状态。
  • 代码第一平台:这种对代码的依赖使您可以自由地编写要在管道的每一步执行的代码。
  • 社区:Airflow的大型和活跃社区有助于扩展信息,并提供与同行联系的机会。

缺点:

  • 依赖 Python:虽然许多人认为Airflow如此严重地依赖 Python 代码是件好事,但那些没有太多经验使用 Python 的人可能有更陡峭的学习曲线。
  • 格利奇:虽然Airflow通常是可靠的,但可能会有像任何产品一样的小故障。

使用案例

Airflow可用于几乎所有批次数据管道,并且有许多不同的记录使用案例,最常见的是大数据相关项目。以下是Airflow Github 存储库中列出的一些用例示例:

  • 使用与谷歌 BigQuery 的Airflow为数据工作室仪表板供电
  • 使用Airflow帮助设计和管理 AWS 上的数据湖
  • 使用Airflow处理生产升级,同时最大限度地减少停机时间

Apache Airflow的基本面

现在,我们已经讨论了Airflow的基本知识以及福利和使用案例,让我们深入探讨这个强大的平台的基本原理。

定向循环图 (DAG)

工作流使用定向循环图 (DAG) 进行定义,该图形由要执行的任务及其连接的依赖性组成。每个 DAG代表要运行的一组任务,它们在 Apache Airflow的用户界面中显示任务之间的关系。让我们细分首字母缩略词:

  • 指示:如果您有多个具有依赖性的任务,每个任务至少需要一个指定的上游或下游任务。
  • 循环:任务不允许生成自引用的数据。这避免了产生无限循环的可能性。
  • 图表:任务处于逻辑结构中,具有明确定义的过程和与其他任务的关系。例如,我们可以使用 DAG 来表达三个任务之间的关系:X、Y 和 Z。我们可以说,”只有在 X 被执行后才能执行 Y,但 Z 可以在任何时候独立执行。我们可以定义其他限制,例如为失败任务执行的重述数量以及何时开始执行任务。

注意:DAG 定义了如何执行任务,但没有定义特定任务执行的内容。

DAG 可以通过对对象进行刻例来指定,如下示例所示。DAG 将在 Web 服务器的 UI 中显示为”示例 1”,并将运行一次。airflow.models.dag.DAG

dag = DAG('Example1',
          schedule_interval='@once',
          start_date=days_ago(1),)

达格运行

执行 DAG 时,它称为*DAG 运行*。假设您计划每小时运行一个 DAG。该 DAG 的每一个实例都建立了 DAG 运行。可以同时连接到 DAG 运行的多个 DAG 运行。

任务

任务是操作员的即时化,其复杂性各不相同。您可以将其想象为 DAG 中节点所表示的工作单元。它们将工作流程的每一步所完成的工作与操作员定义的实际工作进行描绘。

image-20211025151544488

运营商

当 DAG 定义工作流程时,操作员定义工作。操作员就像执行特定任务的模板或类。所有运营商都来自。有许多一般任务的操作员,例如:BaseOperator

  • PythonOperator
  • MySqlOperator
  • EmailOperator
  • BashOperator

这些操作员用于指定在 Python、MySQL、电子邮件或 bash 中执行的操作。

操作员主要有三种类型:

  1. 执行操作或请求其他系统执行操作的操作员
  2. 将数据从一个系统移动到另一个系统的操作员
  3. 运行到满足某些条件的操作员

挂钩允许Airflow与第三方系统接口。通过挂钩,您可以连接到外部数据库和 API,如 MySQL、蜂巢、GCS 等。它们就像操作员的积木。挂钩中不包含任何安全信息。它存储在Airflow的加密元数据数据库中。

注意:Apache Airflow具有社区维护的包,包括核心和服务,如谷歌和亚马逊。这些可以直接安装在Airflow环境中。operatorshooks

关系

Airflow超过定义任务之间的复杂关系。比方说,我们希望指定任务执行之前的任务。我们可以用四种不同的语句来定义这种确切的关系:t1t2

t2.set_upstream(t1)
t1.set_downstream(t2)
t1 >> t2
t2 << t1

Apache Airflow是如何工作的?

image-20211025151451661

大图Apache Airflow架构

组成这个强大且可扩展的工作流程调度平台主要有四个组件

  1. 调度器:调度器监控所有 DAG 及其相关任务。当满足任务的依赖性时,调度员将启动任务。它定期检查要启动的活动任务。

  2. 网络服务器:网络服务器是Airflow的用户界面。它显示了工作状态,并允许用户与数据库进行交互,并阅读来自远程文件存储的日志文件,如S3、谷歌云存储、微软 Azure blobs 等。

  3. 数据库:数据库中保存 DAG 的状态及其相关任务,以确保计划记住元数据信息。Airflow使用 SQL 化学和对象关系映射 (ORM) 连接到元数据数据库。调度器检查所有 DAG 和存储相关信息,如计划间隔、每次运行的统计数据和任务实例。

  4. 执行者:执行者决定如何完成工作。不同类型的执行器可用于不同的使用案例。

执行例:

  • SequentialExecutor:此执行器可以在任何给定时间运行单个任务。它不能并行运行任务。它有助于测试或调试情况。
  • LocalExecutor: 此执行器实现并行和超读。非常适合在本地机器或单个节点上运行Airflow。
  • CeleryExecutor:此执行器是运行分布式Airflow聚类的优先方式。
  • KubernetesExecutor: 此执行者呼叫库伯内茨 API 为运行的每个任务实例制作临时吊舱。

那么,Airflow是如何工作的呢?

Airflow在某个时期检查背景中的所有 DAG。此句段使用配置设置,等于一秒。检查 DAG 文件后,根据调度参数进行 DAG 运行。对于需要执行的任务,任务实例进行刻录,并将它们的状态设置为元数据数据库。processor_poll_intervalSCHEDULED

计划查询数据库,检索状态下的任务,并将它们分发给执行者。然后,任务状态更改为 。这些队列任务由执行这些任务的工人从队列中提取。当这种情况发生时,任务状态将更改为 。SCHEDULEDQUEUEDRUNNING

任务完成后,工作人员会将其标记为*失败*或*完成*,然后调度器更新元数据数据库中的最终状态。

与Apache Airflow合作的第一步

现在,你知道Apache Airflow的基本知识,你准备开始!学习这个工具的一个很好的方法是用它来构建一些东西。下载Airflow后,您可以设计自己的项目或在线为开源项目做出贡献

一些有趣的开源项目:

  • 允许您在浏览器中编辑 DAG 的插件
  • Docker Apache Airflow
  • 动态生成来自 YAML 配置文件的 DAG
  • 以及更多

关于Airflow还有很多东西要学。接下来要涵盖的一些推荐主题包括:

  • 子发展目标
  • 斯拉
  • Airflow传感器