Managed Airflow(第 3 代) | Managed Airflow(第 2 代) | Managed Airflow(旧版第 1 代)
本页面介绍了如何在 Managed Airflow 中启用 CeleryKubernetesExecutor,以及如何在 DAG 中使用 KubernetesExecutor。
CeleryKubernetesExecutor 简介
CeleryKubernetesExecutor 是一种 执行程序,可以同时使用 CeleryExecutor 和 KubernetesExecutor。Airflow 会根据您为任务定义的队列选择执行程序。在一个 DAG 中,您可以使用 CeleryExecutor 运行一些任务,并使用 KubernetesExecutor 运行其他任务:
- CeleryExecutor 经过优化,可快速且可伸缩地执行任务。
- KubernetesExecutor 专为执行资源密集型任务和以隔离方式运行任务而设计。
Managed Airflow 中的 CeleryKubernetesExecutor
Managed Airflow 中的 CeleryKubernetesExecutor 能够让您为任务使用 KubernetesExecutor。在 Managed Airflow 中,无法将 KubernetesExecutor 与 CeleryKubernetesExecutor 分开使用。
Managed Airflow 会在您环境的集群中运行您使用 KubernetesExecutor 执行的任务,这些任务与 Airflow 工作器位于同一命名空间中。此类 任务与 Airflow 工作器具有相同的 绑定,并且可以访问项目中的资源。
您使用 KubernetesExecutor 执行的任务会使用 Managed Airflow 定价模式,因为包含这些 任务的 Pod 会在您环境的集群中运行。Managed Airflow 计算 SKU(适用于 CPU、内存和存储空间)适用于这些 pod。
我们建议在以下情况下使用 CeleryExecutor 运行任务:
- 任务启动时间非常重要。
- 任务不需要运行时隔离,并且不是资源密集型任务。
我们建议在以下情况下使用 KubernetesExecutor 运行任务:
- 任务需要运行时隔离。例如,由于任务在自己的 pod 中运行,因此不会争用内存和 CPU。
- 任务是资源密集型任务,并且您想要控制可用的 CPU 和内存资源。
KubernetesExecutor 与 KubernetesPodOperator 的比较
使用 KubernetesExecutor 运行任务与 使用 KubernetesPodOperator 运行任务类似。任务在 pod 中执行,因此可提供 pod 级任务隔离和更好的资源管理。
不过,两者之间存在一些关键差异:
- KubernetesExecutor 仅在您环境的版本化 Managed Airflow 命名空间中运行任务。在 Managed Airflow 中,无法更改此命名空间。您可以指定 KubernetesPodOperator 在其中运行 pod 任务的命名空间。
- KubernetesExecutor 可以使用任何内置的 Airflow 运算符。 KubernetesPodOperator 仅执行容器的入口点定义的提供的脚本。
- KubernetesExecutor 使用默认的 Managed Airflow Docker 映像,该映像具有与 Managed Airflow 环境中定义的 Python、Airflow 配置选项替换、环境变量和 PyPI 软件包相同的配置。
Docker 映像简介
默认情况下,KubernetesExecutor 使用 Managed Airflow 用于 Celery 工作器的同一 Docker 映像启动任务。这是您环境的 Managed Airflow 映像,其中包含您为环境指定的所有更改,例如自定义 PyPI 软件包或环境变量。
准备工作
您可以在 Managed Airflow(第 3 代)中使用 CeleryKubernetesExecutor。
在 Managed Airflow(第 3 代)中,无法使用 CeleryKubernetesExecutor 以外的任何执行程序。这意味着,您可以在一个 DAG 中使用 CeleryExecutor、KubernetesExecutor 或两者来运行任务,但无法将环境配置为仅使用 KubernetesExecutor 或 CeleryExecutor。
配置 CeleryKubernetesExecutor
您可能需要替换与 KubernetesExecutor 相关的现有 Airflow 配置 选项:
[kubernetes]worker_pods_creation_batch_size此选项定义每个调度器循环的 Kubernetes 工作器 Pod 创建调用次数。默认值为
1,因此每个调度器检测信号仅启动一个 pod。如果您大量使用 KubernetesExecutor,建议您增加此值。[kubernetes]worker_pods_pending_timeout此选项定义工作器可以处于
Pending状态(正在创建 Pod)的时长(以秒为单位),超过此时长后,系统会将其视为失败。默认值为 5 分钟。
使用 KubernetesExecutor 或 CeleryExecutor 运行任务
您可以在一个 DAG 中使用 CeleryExecutor、KubernetesExecutor 或两者来运行任务:
- 如需使用 KubernetesExecutor 运行任务,请在任务的
queue参数中指定kubernetes值。 - 如需使用 CeleryExecutor 运行任务,请省略
queue参数。
以下示例使用 KubernetesExecutor 运行 task-kubernetes 任务,并使用 CeleryExecutor 运行
task-celery 任务:
import datetime
import airflow
from airflow.operators.python_operator import PythonOperator
with airflow.DAG(
"composer_sample_celery_kubernetes",
start_date=datetime.datetime(2022, 1, 1),
schedule="@daily") as dag:
def kubernetes_example():
print("This task runs using KubernetesExecutor")
def celery_example():
print("This task runs using CeleryExecutor")
# To run with KubernetesExecutor, set queue to kubernetes
task_kubernetes = PythonOperator(
task_id='task-kubernetes',
python_callable=kubernetes_example,
dag=dag,
queue='kubernetes')
# To run with CeleryExecutor, omit the queue argument
task_celery = PythonOperator(
task_id='task-celery',
python_callable=celery_example,
dag=dag)
task_kubernetes >> task_celery
运行与 KubernetesExecutor 相关的 Airflow CLI 命令
您可以使用 gcloud 运行多个
与 KubernetesExecutor 相关的 Airflow CLI 命令
。
自定义工作器 pod 规范
您可以通过在任务的 executor_config 参数中传递工作器 pod 规范来自定义该规范。您可以使用此参数定义自定义 CPU
和内存要求。
您可以替换用于运行任务的整个工作器 pod 规范。如需
检索 KubernetesExecutor 使用的任务的 pod 规范,您可以
运行 kubernetes generate-dag-yaml Airflow CLI
命令。
如需详细了解如何自定义工作器 pod 规范,请参阅 Airflow 文档。
Managed Airflow(第 3 代)支持以下资源要求值:
| 资源 | 最小值 | 最大值 | 步骤 |
|---|---|---|---|
| CPU | 0.25 | 32 | 步骤值:0.25、0.5、1、2、4、6、8、10、...、32。请求的值会向上舍入到最接近的支持的步骤值(例如,5 舍入为 6)。 |
| 内存 | 2G (GB) | 128G (GB) | 步骤值:2、3、4、5、...、128。请求的值会向上舍入到最接近的支持的步骤值(例如,3.5G 舍入为 4G)。 |
| 存储 | - | 100G (GB) | 任意值。如果请求的存储空间超过 100 GB,则仅提供 100 GB。 |
如需详细了解 Kubernetes 中的资源单位,请参阅 Kubernetes 中的资源单位。
以下示例演示了一个使用自定义工作器 pod 规范的任务:
PythonOperator(
task_id='custom-spec-example',
python_callable=f,
dag=dag,
queue='kubernetes',
executor_config={
'pod_override': k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name='base',
resources=k8s.V1ResourceRequirements(requests={
'cpu': '0.5',
'memory': '2G',
})
),
],
),
)
},
)
查看任务日志
KubernetesExecutor 执行的任务的日志位于日志 标签页中,与 CeleryExecutor 运行的任务的日志一起显示:
在 Google Cloud 控制台中,前往环境 页面。
在环境列表中,点击您的环境名称。环境详情 页面会打开。
前往日志 标签页。
依次前往所有日志 > Airflow 日志 > 工作器。
名为
airflow-k8s-worker的工作器执行 KubernetesExecutor 任务。如需查找特定任务的日志,您可以在搜索中使用 DAG ID 或任务 ID 作为关键字。