教程 2:创建和管理任务及任务图 (DAGs)¶
简介¶
在本教程中,您将创建并使用 Snowflake 任务,以管理一些基本的存储过程。您还可以创建一个任务图,这也称为有向无环图 (DAG),用于通过更高级别的任务图 API 编排任务。
先决条件¶
备注
如果您已完成 Snowflake Python APIs 教程的常用设置 和 教程 1:创建数据库、架构、表和仓库,则可以跳过这些先决条件,继续进入本教程的第一步。
在开始本教程之前,必须完成以下步骤:
请按照 常见设置 说明操作,其中包括以下步骤:
设置开发环境。
安装 Snowflake Python APIs 包。
配置 Snowflake 连接。
导入 Python API 教程所需的所有模块。
创建一个 API
Root
对象。
运行以下代码,创建名为
PYTHON_API_DB
的数据库,并在该数据库中创建名为PYTHON_API_SCHEMA
的架构。database = root.databases.create( Database( name="PYTHON_API_DB"), mode=CreateMode.or_replace ) schema = database.schemas.create( Schema( name="PYTHON_API_SCHEMA"), mode=CreateMode.or_replace, )
这些与您在 教程 1 中创建的数据库和架构对象相同。
满足这些先决条件后,即可开始使用API 进行任务管理。
设置 Snowflake 对象¶
设置任务将调用的存储过程,以及保存存储过程的暂存区。您可以使用 Snowflake Python APIs root
对象,在之前创建的 PYTHON_API_DB
数据库和 PYTHON_API_SCHEMA
架构中创建一个暂存区。
要创建名为
TASKS_STAGE
的暂存区,在笔记本的下一个单元格中,运行以下代码:stages = root.databases[database.name].schemas[schema.name].stages stages.create(Stage(name="TASKS_STAGE"))
该暂存区将保存存储过程以及这些存储过程所需的任何依赖项。
若要创建任务将作为存储过程运行的两个基本 Python 函数,请在下一个单元格中,运行以下代码:
def trunc(session: Session, from_table: str, to_table: str, count: int) -> str: ( session .table(from_table) .limit(count) .write.save_as_table(to_table) ) return "Truncated table successfully created!" def filter_by_shipmode(session: Session, mode: str) -> str: ( session .table("snowflake_sample_data.tpch_sf100.lineitem") .filter(col("L_SHIPMODE") == mode) .limit(10) .write.save_as_table("filter_table") ) return "Filter table successfully created!"
这些函数执行的操作如下:
trunc()
:创建输入表的截断版本。filter_by_shipmode()
:按运输模式筛选SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM
表,将结果限制为 10 行,并将结果写入新表。备注
该函数用于查询 SNOWFLAKE_SAMPLE_DATA 数据库中的 TPC-H 示例数据。默认情况下,Snowflake 会在新账户中创建示例数据库。如果您的账户中尚未创建数据库,请参阅 使用示例数据库。
这些函数均刻意保持基本功能,仅供演示之用。
创建和管理任务¶
定义、创建和管理两个任务,这些任务会将先前创建的 Python 函数作为存储过程运行。
若要定义
task1
和task2
这两个任务,在笔记本的下一个单元格中,运行以下代码:tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE" task1 = Task( name="task_python_api_trunc", definition=StoredProcedureCall( func=trunc, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"], ), warehouse="COMPUTE_WH", schedule=timedelta(minutes=1) ) task2 = Task( name="task_python_api_filter", definition=StoredProcedureCall( func=filter_by_shipmode, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"], ), warehouse="COMPUTE_WH" )
在这段代码中,您指定了以下任务参数:
对于每个任务,都有一个由 StoredProcedureCall 对象表示的定义,其中包括以下属性:
要运行的可调用函数
将 Python 函数内容及其依赖项上传至的暂存区位置
存储过程的包依赖项
用于运行存储过程的仓库(使用
StoredProcedureCall
对象创建任务时需要)。本教程使用包含在您的试用账户中的COMPUTE_WH
仓库。根任务
task1
的运行计划。计划指定定期运行任务的时间间隔。
有关存储过程的更多信息,请参阅 使用 Python 编写存储过程。
要创建这两个任务,请从数据库架构中检索
TaskCollection
对象 (tasks
),并在任务集合上调用.create()
:# create the task in the Snowflake database tasks = schema.tasks trunc_task = tasks.create(task1, mode=CreateMode.or_replace) task2.predecessors = [trunc_task.name] filter_task = tasks.create(task2, mode=CreateMode.or_replace)
在此代码示例中,您还将
task1
设置为task2
的前置任务,从而将两个任务链接起来,创建一个最小任务图。要确认这两个任务目前是否存在,请在下一个单元格中,运行以下代码:
taskiter = tasks.iter() for t in taskiter: print(t.name)
创建任务时,这些任务默认暂停。
若要启动任务,在任务资源对象上调用
.resume()
:trunc_task.resume()
要确认
trunc_task
任务已启动,请在下一个单元格中,运行以下代码:taskiter = tasks.iter() for t in taskiter: print("Name: ", t.name, "| State: ", t.state)
输出应类似于以下内容:
Name: TASK_PYTHON_API_FILTER | State: suspended Name: TASK_PYTHON_API_TRUNC | State: started
您可以随时重复执行此步骤来确认任务的状态。
若要清理任务资源,首先需要暂停任务,然后再将其删除。
请在下一个单元格中运行以下代码:
trunc_task.suspend()
若要确认任务已暂停,请重复步骤 5。
可选:要删除这两项任务,请在下一个单元格中,运行以下代码:
trunc_task.drop() filter_task.drop()
创建和管理任务图¶
在需要协调大量任务的执行时,分别管理各项任务可能相当艰难。Snowflake Python APIs 提供了利用更高级别的任务图 API 编排任务的功能。
任务图又称为有向无环图 (DAG),它是一系列任务,由单项根任务和多个子任务组成,并根据它们之间的依赖关系进行组织。有关更多信息,请参阅 使用任务图管理任务依赖关系。
若要创建和部署任务图,请运行以下代码:
dag_name = "python_api_dag" dag = DAG(name=dag_name, schedule=timedelta(days=1)) with dag: dag_task1 = DAGTask( name="task_python_api_trunc", definition=StoredProcedureCall( func=trunc, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"]), warehouse="COMPUTE_WH", ) dag_task2 = DAGTask( name="task_python_api_filter", definition=StoredProcedureCall( func=filter_by_shipmode, stage_location=f"@{tasks_stage}", packages=["snowflake-snowpark-python"]), warehouse="COMPUTE_WH", ) dag_task1 >> dag_task2 dag_op = DAGOperation(schema) dag_op.deploy(dag, mode=CreateMode.or_replace)
在此代码中,您执行了以下操作:
通过调用
DAG
构造函数并指定名称和计划来创建任务图对象。使用
DAGTask
构造函数,定义特定于任务图的任务。请注意,该构造函数接受的实参与您在上一步中为StoredProcedureCall
类指定的实参相同。使用更便捷的语法,将
dag_task1
指定为根任务及dag_task2
的前置任务。将任务图部署到
PYTHON_API_DB
数据库的PYTHON_API_SCHEMA
架构。
若要确认任务图的创建,请在下一个单元格中,运行以下代码:
taskiter = tasks.iter() for t in taskiter: print("Name: ", t.name, "| State: ", t.state)
您可以随时重复执行此步骤来确认任务的状态。
若要通过启动根任务来启动任务图,请在下一个单元格中,运行以下代码:
dag_op.run(dag)
若要确认
PYTHON_API_DAG$TASK_PYTHON_API_TRUNC
任务已启动,请重复步骤 2。备注
任务图执行的函数调用不会成功,因为您在调用该函数时没有使用其所需的任何实参。此步骤的目的只是演示如何以编程方式启动任务图。
若要删除任务图,请在下一个单元格中,运行以下代码:
dag_op.drop(dag)
清理在这些教程中创建的数据库对象:
database.drop()
下一步是什么?¶
恭喜!在本教程中,您学习了如何使用 Snowflake Python APIs 创建和管理任务及任务图。
摘要¶
在此过程中,您完成了以下步骤:
创建一个可容纳存储过程及其依赖项的暂存区。
创建和管理任务。
创建和管理任务图。
通过删除 Snowflake 资源对象来清理这些对象。
下一个教程¶
现在您可以继续学习 教程 3:创建和管理 Snowpark 容器服务,了解如何在 Snowpark Container Services 中创建和管理组件。
其他资源¶
要获取更多使用 API 管理 Snowflake 中其他类型对象的示例,请参阅以下开发者指南:
指南 |
描述 |
---|---|
使用 API 创建和管理数据库、架构和表。 |
|
使用 API 创建和管理用户、角色和授权。 |
|
使用 API 创建和管理数据加载和卸载资源,包括外部卷、管道和暂存区。 |
|
使用 API 管理 Snowpark Container Services 的组件,包括计算池、镜像仓库、服务和服务函数。 |