使用 Python 管理 Snowflake 任务和任务图

您可以使用 Python 来管理 Snowflake 任务,通过它可在 Snowflake Scripting 中执行 SQL 语句、过程调用和逻辑。有关任务的概述,请参阅 任务简介

Snowflake Python API 表示具有两种不同类型的任务:

  • Task:显示任务的属性,例如其计划、参数和前置任务。

  • TaskResource:显示如何提取相应 Task 对象,执行任务,以及更改任务的方法。

先决条件

在本主题中的示例中,假设您已添加了用来连接 Snowflake 和创建 Root 对象以使用 Snowflake Python API 的代码。

例如,以下代码使用配置文件中定义的连接参数来创建与 Snowflake 的连接:

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

代码可通过生成的 Session 对象创建 Root 对象,从而使用 API 的类型和方法。有关更多信息,请参阅 使用 Snowflake Python API 连接到 Snowflake

创建任务

要创建任务,请先创建一个 Task 对象。然后,指定要在其中创建任务的数据库和架构,以创建 TaskCollection 对象。使用 TaskCollection.create 将新任务添加到 Snowflake。

以下示例中的代码创建了一个 Task 对象,表示一个名为 my_task 的任务,而该任务运行 definition 参数中指定的 SQL 查询:

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

my_task = Task(name='my_task', definition='<sql query>', schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
Copy

此代码从 my_db 数据库和 my_schema 架构中创建了一个 TaskCollection 变量 tasks。该变量可使用 TaskCollection.create 在 Snowflake 中创建新任务。

此代码示例还为任务的计划指定了一个小时的 timedelta 值。您可以使用 timedelta 值或 Cron 表达式定义任务的计划。

您还可以创建运行 Python 函数或存储过程的任务。以下示例中的代码创建一个名为 my_task2 的任务,该任务运行一个由 StoredProcedureCall 对象表示的函数:

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task

my_task2 = Task(
  StoredProcedureCall(
      dosomething, stage_location="@mystage"
  ),
  warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
Copy

此对象指定一个位于 @mystage 暂存区位置且名为 dosomething 的函数。在使用 StoredProcedureCall 对象创建任务时,您还必须指定 warehouse

创建或更新任务

您可以通过设置表示该任务的 Task 对象的属性,然后使用 TaskResource.create_or_update 方法将该对象传递给 Snowflake,从而更新现有表的特征。

当要创建任务时,还可以传递描述新任务的 Task 对象。

以下示例中的代码设置任务的名称、定义和计划,然后在 Snowflake 上更新任务,或者在任务还不存在时创建任务:

from datetime import timedelta
from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
tasks['my_task'].create_or_update(
  Task(name='my_task', definition='<sql query 2>', schedule=timedelta(hours=1))
)
Copy

列出任务

您可以使用 TaskCollection.iter 方法列出任务。该方法会返回 Task 对象的 PagedIter 迭代器。

以下示例中的代码列出了名称以 my 开头的任务:

from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core.task import TaskCollection

root = Root(connection)

tasks: TaskCollection = root.databases["my_db"].schemas["my_schema"].tasks
task_iter = tasks.iter(like="my%")  # returns a PagedIter[Task]
for task_obj in task_iter:
  print(task_obj.name)
Copy

执行任务操作

您可以执行常见的任务操作 – 例如使用 TaskResource 对象执行、暂停和恢复任务。

以下示例中的代码执行、暂停、恢复和删除 my_task 任务:

from snowflake.core import Root
from snowflake.core.task import Task

tasks = root.databases["my_db"].schemas["my_schema"].tasks
task_res = tasks['my_task']

task_res.execute()
task_res.suspend()
task_res.resume()
task_res.delete()
Copy

管理任务图中的任务

您可以管理任务图中已收集的任务。任务图是一系列任务,其中有单个根任务和按任务依赖关系组织的其他任务。

有关任务图中任务的更多信息,请参阅 任务图

创建任务图

要创建任务图,请先创建一个 DAG 对象,该对象指定任务图的名称和其他可选属性(例如任务图的计划)。您可以使用 timedelta 值或 Cron 表达式定义任务图的计划。

以下示例中的代码定义了一个 Python 函数 dosomething,然后在任务图中将该函数指定为名为 dag_task2DAGTask 对象:

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_

def dosomething(session: Session) -> None:
  df = session.table("target")
  df.group_by("a").agg(sum_("b")).save_as_table("agg_table")

with DAG("my_dag", schedule=timedelta(days=1)) as dag:
  # Create a task that runs some SQL.
  dag_task1 = DAGTask(
    "dagtask1",
    "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
  )
  # Create a task that runs a Python function.
  dag_task2 = DAGTask(
    StoredProcedureCall(
      dosomething, stage_location="@mystage",
      packages=["snowflake-snowpark-python"]
    ),
    warehouse="test_warehouse"
  )
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2  # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
Copy

该代码还将 SQL 语句定义为另一个名为 dag_task1DAGTask 对象,然后将 dag_task1 指定为 dag_task2 的前置任务。最后,它将任务图部署到 Snowflake 的 my_db 数据库和 my_schema 架构中。

创建一个具有 cron 计划、任务分支和函数返回值的任务图

您还可以使用指定的 cron 计划、任务分支和函数返回值创建任务图,这些返回值用作任务返回值。

以下示例中的代码创建了一个 DAG,并使用 Cron 对象为其指定计划。它定义了一个 名为 task1_branchDAGTaskBranch 对象以及其他 DAGTask 对象,并指定了它们之间的依赖关系:

from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  pass  # do something

def task_branch_handler(session: Session) -> str:
  # do something
  return "task3"

try:
  with DAG(
    "my_dag",
    schedule=Cron("10 * * * *", "America/Los_Angeles"),
    stage_location="@mystage",
    packages=["snowflake-snowpark-python"],
    use_func_return_value=True,
  ) as dag:
    task1 = DAGTask(
      "task1",
      task_handler,
      warehouse=test_warehouse,
    )
    task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
    task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
    task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
    task1 >> task1_branch
    task1_branch >> [task2, task3]
  schema = root.databases["my_db"].schemas["my_schema"]
  op = DAGOperation(schema)
  op.deploy(dag, mode=CreateMode.or_replace)
finally:
  session.close()
Copy

此代码示例还定义了任务处理程序函数,并使用分配给任务的指定任务处理程序创建每个 DAGTaskDAGTaskBranch 对象。该代码将 DAG的 use_func_return_value 参数设置为 True,它指定使用 Python 函数的返回值作为相应任务的返回值。否则,use_func_return_value 的默认值为 False

设置和获取任务图中任务的返回值

当任务的定义是 StoredProcedureCall 对象时,存储过程(或函数)的处理程序可通过使用 TaskContext 对象显式地设置任务的返回值。

有关更多信息,请参阅 SYSTEM$SET_RETURN_VALUE

以下示例中的代码定义了一个任务处理程序函数,该函数从当前会话创建了一个名为 contextTaskContext 对象。然后,它使用 TaskContext.set_return_value 方法显式地将返回值设置为指定的字符串:

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  # this return value can be retrieved by successor Tasks.
  context.set_return_value("predecessor_return_value")
Copy

在任务图中,直接后续任务可将前一任务标识为其前置任务,然后可以检索其前置任务显式设置的返回值。

有关更多信息,请参阅 SYSTEM$GET_PREDECESSOR_RETURN_VALUE

以下示例中的代码定义了一个任务处理程序函数,该函数使用 TaskContext.get_predecessor_return_value 方法获取名为 pred_task_name 的前置任务的返回值:

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  pred_return_value = context.get_predecessor_return_value("pred_task_name")
Copy
语言: 中文