使用 Python 管理 Snowflake 任务和任务图¶
您可以使用 Python 来管理 Snowflake 任务,通过它可在 Snowflake Scripting 中执行 SQL 语句、过程调用和逻辑。有关任务的概述,请参阅 任务简介。
Snowflake Python API 表示具有两种不同类型的任务:
Task
:显示任务的属性,例如其计划、参数和前置任务。TaskResource
:显示如何提取相应Task
对象,执行任务,以及更改任务的方法。
先决条件¶
在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root
对象以使用 Snowflake Python API 的代码。
例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
代码可通过生成的 Session
对象创建 Root
对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python API 连接到 Snowflake。
创建任务¶
要创建任务,请先创建一个 Task
对象。然后,指定要在其中创建任务的数据库和架构,以创建 TaskCollection
对象。使用 TaskCollection.create
将新任务添加到 Snowflake。
以下示例中的代码创建了一个 Task
对象,表示一个名为 my_task
的任务,而该任务运行 definition
参数中指定的 SQL 查询:
from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task
my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
此代码从 my_db
数据库和 my_schema
架构中创建了一个 TaskCollection
变量 tasks
。该变量可使用 TaskCollection.create
在 Snowflake 中创建新任务。
此代码示例还为任务的计划指定了一个小时的 timedelta
值。您可以使用 timedelta
值或 Cron
表达式定义任务的计划。
您还可以创建运行 Python 函数或存储过程的任务。以下示例中的代码创建一个名为 my_task2
的任务,该任务运行一个由 StoredProcedureCall
对象表示的函数:
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task
my_task2 = Task(
StoredProcedureCall(
dosomething, stage_location="@mystage"
),
warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
此对象指定一个位于 @mystage
暂存区位置且名为 dosomething
的函数。在使用 StoredProcedureCall
对象创建任务时,您还必须指定 warehouse
。
创建或更新任务¶
您可以通过设置表示该任务的 Task
对象的属性,然后使用 TaskResource.create_or_update
方法将该对象传递给 Snowflake,从而更新现有表的特征。
当要创建任务时,还可以传递描述新任务的 Task
对象。
以下示例中的代码设置任务的名称、定义和计划,然后在 Snowflake 上更新任务,或者在任务还不存在时创建任务:
from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task
tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
列出任务¶
您可以使用 TaskCollection.iter
方法列出任务。该方法会返回 Task
对象的 PagedIter
迭代器。
以下示例中的代码列出了名称以 my 开头的任务:
from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection
root = Root(connection)
tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%") # returns a PagedIter[Task]
for task_obj in task_iter:
print(task_obj.name)
执行任务操作¶
您可以执行常见的任务操作 – 例如使用 TaskResource
对象执行、暂停和恢复任务。
以下示例中的代码执行、暂停、恢复和删除 my_task
任务:
from snowflake.core import Root
from snowflake.core.task import Task
tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']
task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
管理任务图中的任务¶
您可以管理任务图中已收集的任务。任务图是一系列任务,其中有单个根任务和按任务依赖关系组织的其他任务。
有关任务图中任务的更多信息,请参阅 任务图。
创建任务图¶
要创建任务图,请先创建一个 DAG
对象,该对象指定任务图的名称和其他可选属性(例如任务图的计划)。您可以使用 timedelta
值或 Cron
表达式定义任务图的计划。
以下示例中的代码定义了一个 Python 函数 dosomething
,然后在任务图中将该函数指定为名为 dag_task2
的 DAGTask
对象:
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_
def dosomething(session: Session) -> None:
df = session.table("target")
df.group_by("a").agg(sum_("b")).save_as_table("agg_table")
with DAG("my_dag", schedule=timedelta(days=1)) as dag:
# Create a task that runs some SQL.
dag_task1 = DAGTask(
"dagtask1",
"MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
)
# Create a task that runs a Python function.
dag_task2 = DAGTask(
StoredProcedureCall(
dosomething, stage_location="@mystage",
packages=["snowflake-snowpark-python"]
),
warehouse="test_warehouse"
)
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2 # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
该代码还将 SQL 语句定义为另一个名为 dag_task1
的 DAGTask
对象,然后将 dag_task1
指定为 dag_task2
的前置任务。最后,它将任务图部署到 Snowflake 的 my_db
数据库和 my_schema
架构中。
创建一个具有 cron 计划、任务分支和函数返回值的任务图¶
您还可以使用指定的 cron 计划、任务分支和函数返回值创建任务图,这些返回值用作任务返回值。
以下示例中的代码创建了一个 DAG
,并使用 Cron
对象为其指定计划。它定义了一个 名为 task1_branch
的 DAGTaskBranch
对象以及其他 DAGTask
对象,并指定了它们之间的依赖关系:
from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
pass # do something
def task_branch_handler(session: Session) -> str:
# do something
return "task3"
try:
with DAG(
"my_dag",
schedule=Cron("10 * * * *", "America/Los_Angeles"),
stage_location="@mystage",
packages=["snowflake-snowpark-python"],
use_func_return_value=True,
) as dag:
task1 = DAGTask(
"task1",
task_handler,
warehouse=test_warehouse,
)
task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
task1 >> task1_branch
task1_branch >> [task2, task3]
schema = root.databases["my_db"].schemas["my_schema"]
op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)
finally:
session.close()
此代码示例还定义了任务处理程序函数,并使用分配给任务的指定任务处理程序创建每个 DAGTask
和 DAGTaskBranch
对象。该代码将 DAG的 use_func_return_value
参数设置为 True
,它指定使用 Python 函数的返回值作为相应任务的返回值。否则,use_func_return_value
的默认值为 False
。
设置和获取任务图中任务的返回值¶
当任务的定义是 StoredProcedureCall
对象时,存储过程(或函数)的处理程序可通过使用 TaskContext
对象显式地设置任务的返回值。
有关更多信息,请参阅 SYSTEM$SET_RETURN_VALUE。
以下示例中的代码定义了一个任务处理程序函数,该函数从当前会话创建了一个名为 context
的 TaskContext
对象。然后,它使用 TaskContext.set_return_value
方法显式地将返回值设置为指定的字符串:
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
context = TaskContext(session)
# this return value can be retrieved by successor Tasks.
context.set_return_value("predecessor_return_value")
在任务图中,直接后续任务可将前一任务标识为其前置任务,然后可以检索其前置任务显式设置的返回值。
有关更多信息,请参阅 SYSTEM$GET_PREDECESSOR_RETURN_VALUE。
以下示例中的代码定义了一个任务处理程序函数,该函数使用 TaskContext.get_predecessor_return_value
方法获取名为 pred_task_name
的前置任务的返回值:
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
context = TaskContext(session)
pred_return_value = context.get_predecessor_return_value("pred_task_name")