使用 CeleryKubernetesExecutor

Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)

本页面介绍了如何在 Managed Airflow 中启用 CeleryKubernetesExecutor,以及如何在 DAG 中使用 KubernetesExecutor。

CeleryKubernetesExecutor 简介

CeleryKubernetesExecutor 是一种 执行程序,可以同时使用 CeleryExecutor 和 KubernetesExecutor。Airflow 会根据您为任务定义的队列选择执行程序。在一个 DAG 中,您可以使用 CeleryExecutor 运行一些任务,并使用 KubernetesExecutor 运行其他任务:

  • CeleryExecutor 经过优化,可快速且可伸缩地执行任务。
  • KubernetesExecutor 专为执行资源密集型任务和以隔离方式运行任务而设计。

Managed Airflow 中的 CeleryKubernetesExecutor

Managed Airflow 中的 CeleryKubernetesExecutor 能够让您为任务使用 KubernetesExecutor。在 Managed Airflow 中,无法将 KubernetesExecutor 与 CeleryKubernetesExecutor 分开使用。

Managed Airflow 会在您环境的集群中运行您使用 KubernetesExecutor 执行的任务,这些任务与 Airflow 工作器位于同一命名空间中。此类 任务与 Airflow 工作器具有相同的 绑定,并且可以访问项目中的资源。

您使用 KubernetesExecutor 执行的任务会使用 Managed Airflow 定价模式,因为包含这些 任务的 Pod 会在您环境的集群中运行。Managed Airflow 计算 SKU(适用于 CPU、内存和存储空间)适用于这些 pod。

我们建议在以下情况下使用 CeleryExecutor 运行任务:

  • 任务启动时间非常重要。
  • 任务不需要运行时隔离,并且不是资源密集型任务。

我们建议在以下情况下使用 KubernetesExecutor 运行任务:

  • 任务需要运行时隔离。例如,由于任务在自己的 pod 中运行,因此不会争用内存和 CPU。
  • 任务是资源密集型任务,并且您想要控制可用的 CPU 和内存资源。

KubernetesExecutor 与 KubernetesPodOperator 的比较

使用 KubernetesExecutor 运行任务与 使用 KubernetesPodOperator 运行任务类似。任务在 pod 中执行,因此可提供 pod 级任务隔离和更好的资源管理。

不过,两者之间存在一些关键差异:

  • KubernetesExecutor 仅在您环境的版本化 Managed Airflow 命名空间中运行任务。在 Managed Airflow 中,无法更改此命名空间。您可以指定 KubernetesPodOperator 在其中运行 pod 任务的命名空间。
  • KubernetesExecutor 可以使用任何内置的 Airflow 运算符。 KubernetesPodOperator 仅执行容器的入口点定义的提供的脚本。
  • KubernetesExecutor 使用默认的 Managed Airflow Docker 映像,该映像具有与 Managed Airflow 环境中定义的 Python、Airflow 配置选项替换、环境变量和 PyPI 软件包相同的配置。

Docker 映像简介

默认情况下,KubernetesExecutor 使用 Managed Airflow 用于 Celery 工作器的同一 Docker 映像启动任务。这是您环境的 Managed Airflow 映像,其中包含您为环境指定的所有更改,例如自定义 PyPI 软件包或环境变量。

准备工作

  • 您可以在 Managed Airflow(第 3 代)中使用 CeleryKubernetesExecutor。

  • 在 Managed Airflow(第 3 代)中,无法使用 CeleryKubernetesExecutor 以外的任何执行程序。这意味着,您可以在一个 DAG 中使用 CeleryExecutor、KubernetesExecutor 或两者来运行任务,但无法将环境配置为仅使用 KubernetesExecutor 或 CeleryExecutor。

配置 CeleryKubernetesExecutor

您可能需要替换与 KubernetesExecutor 相关的现有 Airflow 配置 选项:

  • [kubernetes]worker_pods_creation_batch_size

    此选项定义每个调度器循环的 Kubernetes 工作器 Pod 创建调用次数。默认值为 1,因此每个调度器检测信号仅启动一个 pod。如果您大量使用 KubernetesExecutor,建议您增加此值。

  • [kubernetes]worker_pods_pending_timeout

    此选项定义工作器可以处于 Pending 状态(正在创建 Pod)的时长(以秒为单位),超过此时长后,系统会将其视为失败。默认值为 5 分钟。

使用 KubernetesExecutor 或 CeleryExecutor 运行任务

您可以在一个 DAG 中使用 CeleryExecutor、KubernetesExecutor 或两者来运行任务:

  • 如需使用 KubernetesExecutor 运行任务,请在任务的 queue 参数中指定 kubernetes 值。
  • 如需使用 CeleryExecutor 运行任务,请省略 queue 参数。

以下示例使用 KubernetesExecutor 运行 task-kubernetes 任务,并使用 CeleryExecutor 运行 task-celery 任务:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

运行与 KubernetesExecutor 相关的 Airflow CLI 命令

您可以使用 gcloud 运行多个 与 KubernetesExecutor 相关的 Airflow CLI 命令

自定义工作器 pod 规范

您可以通过在任务的 executor_config 参数中传递工作器 pod 规范来自定义该规范。您可以使用此参数定义自定义 CPU 和内存要求。

您可以替换用于运行任务的整个工作器 pod 规范。如需 检索 KubernetesExecutor 使用的任务的 pod 规范,您可以 运行 kubernetes generate-dag-yaml Airflow CLI 命令。

如需详细了解如何自定义工作器 pod 规范,请参阅 Airflow 文档

Managed Airflow(第 3 代)支持以下资源要求值:

资源 最小值 最大值 步骤
CPU 0.25 32 步骤值:0.25、0.5、1、2、4、6、8、10、...、32。请求的值会向上舍入到最接近的支持的步骤值(例如,5 舍入为 6)。
内存 2G (GB) 128G (GB) 步骤值:2、3、4、5、...、128。请求的值会向上舍入到最接近的支持的步骤值(例如,3.5G 舍入为 4G)。
存储 - 100G (GB) 任意值。如果请求的存储空间超过 100 GB,则仅提供 100 GB。

如需详细了解 Kubernetes 中的资源单位,请参阅 Kubernetes 中的资源单位

以下示例演示了一个使用自定义工作器 pod 规范的任务:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '0.5',
                            'memory': '2G',
                        })
                    ),
                ],
            ),
        )
    },
)

查看任务日志

KubernetesExecutor 执行的任务的日志位于日志 标签页中,与 CeleryExecutor 运行的任务的日志一起显示:

  1. 在 Google Cloud 控制台中,前往环境 页面。

    前往“环境”

  2. 在环境列表中,点击您的环境名称。环境详情 页面会打开。

  3. 前往日志 标签页。

  4. 依次前往所有日志 > Airflow 日志 > 工作器

  5. 名为 airflow-k8s-worker 的工作器执行 KubernetesExecutor 任务。如需查找特定任务的日志,您可以在搜索中使用 DAG ID 或任务 ID 作为关键字。

后续步骤