教程 2:创建和管理任务及任务图 (DAGs)

简介

在本教程中,您将创建并使用 Snowflake 任务,以管理一些基本的存储过程。您还可以创建一个任务图,这也称为有向无环图 (DAG),用于通过更高级别的任务图 API 编排任务。

先决条件

备注

如果您已完成 Snowflake Python APIs 教程的常用设置教程 1:创建数据库、架构、表和仓库,则可以跳过这些先决条件,继续进入本教程的第一步。

在开始本教程之前,必须完成以下步骤:

  1. 请按照 常见设置 说明操作,其中包括以下步骤:

    • 设置开发环境。

    • 安装 Snowflake Python APIs 包。

    • 配置 Snowflake 连接。

    • 导入 Python API 教程所需的所有模块。

    • 创建一个 API Root 对象。

  2. 运行以下代码,创建名为 PYTHON_API_DB 的数据库,并在该数据库中创建名为 PYTHON_API_SCHEMA 的架构。

    database = root.databases.create(
      Database(
        name="PYTHON_API_DB"),
        mode=CreateMode.or_replace
      )
    
    schema = database.schemas.create(
      Schema(
        name="PYTHON_API_SCHEMA"),
        mode=CreateMode.or_replace,
      )
    
    Copy

    这些与您在 教程 1 中创建的数据库和架构对象相同。

满足这些先决条件后,即可开始使用API 进行任务管理。

设置 Snowflake 对象

设置任务将调用的存储过程,以及保存存储过程的暂存区。您可以使用 Snowflake Python APIs root 对象,在之前创建的 PYTHON_API_DB 数据库和 PYTHON_API_SCHEMA 架构中创建一个暂存区。

  1. 要创建名为 TASKS_STAGE 的暂存区,在笔记本的下一个单元格中,运行以下代码:

    stages = root.databases[database.name].schemas[schema.name].stages
    stages.create(Stage(name="TASKS_STAGE"))
    
    Copy

    该暂存区将保存存储过程以及这些存储过程所需的任何依赖项。

  2. 若要创建任务将作为存储过程运行的两个基本 Python 函数,请在下一个单元格中,运行以下代码:

    def trunc(session: Session, from_table: str, to_table: str, count: int) -> str:
      (
        session
        .table(from_table)
        .limit(count)
        .write.save_as_table(to_table)
      )
      return "Truncated table successfully created!"
    
    def filter_by_shipmode(session: Session, mode: str) -> str:
      (
        session
        .table("snowflake_sample_data.tpch_sf100.lineitem")
        .filter(col("L_SHIPMODE") == mode)
        .limit(10)
        .write.save_as_table("filter_table")
      )
      return "Filter table successfully created!"
    
    Copy

    这些函数执行的操作如下:

    • trunc():创建输入表的截断版本。

    • filter_by_shipmode():按运输模式筛选 SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM 表,将结果限制为 10 行,并将结果写入新表。

      备注

      该函数用于查询 SNOWFLAKE_SAMPLE_DATA 数据库中的 TPC-H 示例数据。默认情况下,Snowflake 会在新账户中创建示例数据库。如果您的账户中尚未创建数据库,请参阅 使用示例数据库

    这些函数均刻意保持基本功能,仅供演示之用。

创建和管理任务

定义、创建和管理两个任务,这些任务会将先前创建的 Python 函数作为存储过程运行。

  1. 若要定义 task1task2 这两个任务,在笔记本的下一个单元格中,运行以下代码:

    tasks_stage = f"{database.name}.{schema.name}.TASKS_STAGE"
    
    task1 = Task(
        name="task_python_api_trunc",
        definition=StoredProcedureCall(
          func=trunc,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH",
        schedule=timedelta(minutes=1)
    )
    
    task2 = Task(
        name="task_python_api_filter",
        definition=StoredProcedureCall(
          func=filter_by_shipmode,
          stage_location=f"@{tasks_stage}",
          packages=["snowflake-snowpark-python"],
        ),
        warehouse="COMPUTE_WH"
    )
    
    Copy

    在这段代码中,您指定了以下任务参数:

    • 对于每个任务,都有一个由 StoredProcedureCall 对象表示的定义,其中包括以下属性:

      • 要运行的可调用函数

      • 将 Python 函数内容及其依赖项上传至的暂存区位置

      • 存储过程的包依赖项

    • 用于运行存储过程的仓库(使用 StoredProcedureCall 对象创建任务时需要)。本教程使用包含在您的试用账户中的 COMPUTE_WH 仓库。

    • 根任务 task1 的运行计划。计划指定定期运行任务的时间间隔。

    有关存储过程的更多信息,请参阅 使用 Python 编写存储过程

  2. 要创建这两个任务,请从数据库架构中检索 TaskCollection 对象 (tasks),并在任务集合上调用 .create()

    # create the task in the Snowflake database
    tasks = schema.tasks
    trunc_task = tasks.create(task1, mode=CreateMode.or_replace)
    
    task2.predecessors = [trunc_task.name]
    filter_task = tasks.create(task2, mode=CreateMode.or_replace)
    
    Copy

    在此代码示例中,您还将 task1 设置为 task2 的前置任务,从而将两个任务链接起来,创建一个最小任务图。

  3. 要确认这两个任务目前是否存在,请在下一个单元格中,运行以下代码:

    taskiter = tasks.iter()
    for t in taskiter:
        print(t.name)
    
    Copy
  4. 创建任务时,这些任务默认暂停。

    若要启动任务,在任务资源对象上调用 .resume()

    trunc_task.resume()
    
    Copy
  5. 要确认 trunc_task 任务已启动,请在下一个单元格中,运行以下代码:

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    输出应类似于以下内容:

    Name:  TASK_PYTHON_API_FILTER | State:  suspended
    Name:  TASK_PYTHON_API_TRUNC | State:  started
    

    您可以随时重复执行此步骤来确认任务的状态。

  6. 若要清理任务资源,首先需要暂停任务,然后再将其删除。

    请在下一个单元格中运行以下代码:

    trunc_task.suspend()
    
    Copy
  7. 若要确认任务已暂停,请重复步骤 5。

  8. 可选:要删除这两项任务,请在下一个单元格中,运行以下代码:

    trunc_task.drop()
    filter_task.drop()
    
    Copy

创建和管理任务图

在需要协调大量任务的执行时,分别管理各项任务可能相当艰难。Snowflake Python APIs 提供了利用更高级别的任务图 API 编排任务的功能。

任务图又称为有向无环图 (DAG),它是一系列任务,由单项根任务和多个子任务组成,并根据它们之间的依赖关系进行组织。有关更多信息,请参阅 使用任务图管理任务依赖关系

  1. 若要创建和部署任务图,请运行以下代码:

    dag_name = "python_api_dag"
    dag = DAG(name=dag_name, schedule=timedelta(days=1))
    with dag:
        dag_task1 = DAGTask(
            name="task_python_api_trunc",
            definition=StoredProcedureCall(
                func=trunc,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task2 = DAGTask(
            name="task_python_api_filter",
            definition=StoredProcedureCall(
                func=filter_by_shipmode,
                stage_location=f"@{tasks_stage}",
                packages=["snowflake-snowpark-python"]),
            warehouse="COMPUTE_WH",
        )
        dag_task1 >> dag_task2
    dag_op = DAGOperation(schema)
    dag_op.deploy(dag, mode=CreateMode.or_replace)
    
    Copy

    在此代码中,您执行了以下操作:

    • 通过调用 DAG 构造函数并指定名称和计划来创建任务图对象。

    • 使用 DAGTask 构造函数,定义特定于任务图的任务。请注意,该构造函数接受的实参与您在上一步中为 StoredProcedureCall 类指定的实参相同。

    • 使用更便捷的语法,将 dag_task1 指定为根任务及 dag_task2 的前置任务。

    • 将任务图部署到 PYTHON_API_DB 数据库的 PYTHON_API_SCHEMA 架构。

  2. 若要确认任务图的创建,请在下一个单元格中,运行以下代码:

    taskiter = tasks.iter()
    for t in taskiter:
        print("Name: ", t.name, "| State: ", t.state)
    
    Copy

    您可以随时重复执行此步骤来确认任务的状态。

  3. 若要通过启动根任务来启动任务图,请在下一个单元格中,运行以下代码:

    dag_op.run(dag)
    
    Copy
  4. 若要确认 PYTHON_API_DAG$TASK_PYTHON_API_TRUNC 任务已启动,请重复步骤 2。

    备注

    任务图执行的函数调用不会成功,因为您在调用该函数时没有使用其所需的任何实参。此步骤的目的只是演示如何以编程方式启动任务图。

  5. 若要删除任务图,请在下一个单元格中,运行以下代码:

    dag_op.drop(dag)
    
    Copy
  6. 清理在这些教程中创建的数据库对象:

    database.drop()
    
    Copy

下一步是什么?

恭喜!在本教程中,您学习了如何使用 Snowflake Python APIs 创建和管理任务及任务图。

摘要

在此过程中,您完成了以下步骤:

  • 创建一个可容纳存储过程及其依赖项的暂存区。

  • 创建和管理任务。

  • 创建和管理任务图。

  • 通过删除 Snowflake 资源对象来清理这些对象。

下一个教程

现在您可以继续学习 教程 3:创建和管理 Snowpark 容器服务,了解如何在 Snowpark Container Services 中创建和管理组件。

其他资源

要获取更多使用 API 管理 Snowflake 中其他类型对象的示例,请参阅以下开发者指南:

指南

描述

使用 Python 管理 Snowflake 数据库、架构、表和视图

使用 API 创建和管理数据库、架构和表。

使用 Python 管理 Snowflake 用户、角色和授权

使用 API 创建和管理用户、角色和授权。

用 Python 管理数据加载和卸载资源

使用 API 创建和管理数据加载和卸载资源,包括外部卷、管道和暂存区。

使用 Python 管理 Snowpark Container Services(包括服务函数)

使用 API 管理 Snowpark Container Services 的组件,包括计算池、镜像仓库、服务和服务函数。

语言: 中文