将 Apache Airflow™ 与 Cortex Code CLI 结合使用

Cortex Code 为 Apache Airflow™ 提供内置支持,提供自然语言界面来管理 DAGs、调试故障、创作管道、分析数据以及跟踪跨 Airflow 部署的沿袭关系。

功能

功能

描述

示例提示

管道监控

运行状况检查、DAG 检查、连接和变量可见性、调度控制

“我的 Airflow 实例运行状况良好吗?”

运行管理

按需触发 DAGs、等待结果、传递自定义配置

“测试 daily_etl DAG,完成后通知我”

故障调试

跨运行状态、任务实例和日志进行根本原因分析,并提供影响评估和修复建议

“为什么 my_pipeline 昨晚失败了?”

DAG 创作

使用您现有的模式、连接和提供程序,通过“发现-计划-实现-验证-测试”工作流进行引导式 DAG 创建

“创建一个每天从 Snowflake 提取数据并加载到 S3 的 DAG”

数据分析

数据仓库查询、表分析和新鲜度检查,包含模式缓存和“概念到表”学习

“本季度我们有多少活跃客户?”

数据沿袭

通过 DAG 源代码进行上游来源追踪和下游影响分析,并附带关键性评级

“如果我更改客户表架构,会破坏什么?”

Airflow 3 迁移

使用 Ruff 规则、导入修复、上下文键替换和元数据访问模式更新进行自动化代码迁移

“将我的 DAGs 从 Airflow 2 迁移到 Airflow 3”

dbt 集成

通过 Astronomer Cosmos 将 dbt Core 或 Fusion 项目作为 Airflow DAGs 运行,包含解析、执行和配置文件配置

“使用 Cosmos 设置我的 dbt 项目以在 Airflow 中运行”

人机协同

DAGs 中的审批门控、表单输入和人工驱动分支 (Airflow 3.1+)

“在部署任务之前添加审批步骤”

本地环境

使用 Astro CLI 启动、停止、重新启动本地 Airflow 环境并对其进行故障排除

“启动我的本地 Airflow 环境”

先决条件

Cortex Code 的 Airflow 集成需要 uv (https://docs.astral.sh/uv/getting-started/installation/)。如果未安装 uvcortex airflow 会提供一条包含安装链接的帮助信息。

设置 Airflow 集成

在您使用 Cortex Code 管理 Airflow 实例之前,必须先配置连接。您可以通过环境变量进行配置,也可以在 Cortex Code CLI 中使用交互式设置命令进行配置。

环境变量设置

在启动 Cortex Code 之前,请在 shell 中导出所需的变量,如下所示。您可以使用基于令牌的身份验证或用户名/密码身份验证。如果您始终使用同一个 Airflow 实例,请在您的 shell 配置文件(~/.bashrc~/.zshrc)中包含如下代码,以避免每次都重新输入。

# Token auth
export AIRFLOW_API_URL=https://airflow.example.com
export AIRFLOW_AUTH_TOKEN=your-api-token

# Username/password auth
export AIRFLOW_API_URL=https://airflow.example.com
export AIRFLOW_USERNAME=your-username
export AIRFLOW_PASSWORD=your-password
交互式设置

在 Cortex Code 中输入 /airflow,通过全屏 UI 管理实例。系统同时支持令牌身份验证和用户名/密码身份验证。

命令

描述

/airflow

管理 Airflow 实例(打开实例管理器)

/airflow show

显示当前配置(系统会隐藏敏感信息)

/airflow clear

移除所有配置

/airflow 支持多个命名实例。可以使用实例管理器来添加、切换或删除它们。

Airflow CLI 命令

使用 cortex airflow 从终端与 Airflow 实例进行交互,示例如下。

检查实例运行状况:

cortex airflow health

列出所有 DAGs:

cortex airflow dags list

获取特定 DAG 的详细信息:

cortex airflow dags get my_pipeline

查看 DAG 源代码:

cortex airflow dags source my_pipeline

触发 DAG 运行:

cortex airflow runs trigger my_pipeline

列出 DAG 的最近运行记录:

cortex airflow runs list my_pipeline

检查特定运行的任务状态:

cortex airflow tasks list my_pipeline <run_id>

暂停或取消暂停 DAG:

cortex airflow dags pause my_pipeline
cortex airflow dags unpause my_pipeline

输入 cortex airflow --help 可获取完整命令列表。

故障排除

连接被拒绝

症状: Airflow 操作失败并出现连接错误。

解决方案: 验证您的实例 URL 是否正确,以及 Airflow API 是否可访问。检查您当前的实例配置,并通过健康检查测试连接。

身份验证失败

症状: 操作返回 401 或 403 错误。

解决方案: 尝试以下步骤:

  • 确保您的令牌或凭据正确无误。

  • 检查令牌是否已过期;如有必要,请重新生成。

  • 确保用户和角色在 Airflow 中具有 API 访问权限。

未找到 DAG

症状: 操作报告 DAG 不存在。

解决方案: 检查是否存在可能阻止 DAG 加载的导入或解析错误。确保 DAG ID 完全匹配。

uv 未安装

症状: cortex airflow 显示“cortex airflow requires uv”。

解决方案: 通过 uv 网站 (https://docs.astral.sh/uv/getting-started/installation/) 安装 uv