使用任务图创建任务序列

在 Snowflake 中,您可以使用任务图管理多个任务,也称为有向无环图,或 DAG。任务图由根任务和从属子任务组成。依赖项必须沿着从开始到结束的方向运行,没有循环。可选最终任务(终结器)可以在所有其他任务完成后执行清理操作。

使用运行时值、图形级配置和父任务的返回值在任务正文中指定基于逻辑的操作,从而构建具有动态行为的任务图。

您可以使用 支持的语言和工具 <label-stored-procedures-handler-languages>`(例如 SQL、JavaScript、Python、Java、Scala 或 Snowflake Scripting),创建任务和任务图。本主题提供 SQL 示例。有关 Python 示例,请参阅 :doc:/developer-guide/snowflake-python-api/snowflake-python-managing-tasks`。

创建任务图

使用 CREATE TASK 创建根任务,然后使用 CREATE TASK ..AFTER 以选择父任务。

根任务定义 任务图运行时间。子任务按任务图定义的顺序执行。

当多个子任务具有相同的父任务时,子任务将并行运行。

当一个任务有多个父任务时,该任务将等待所有前面的任务成功完成后才启动。(该任务也可能在跳过某些父任务时运行。有关更多信息,请参阅 跳过或挂起子任务)。

以下示例创建无服务器任务图,该图从计划每分钟运行的根任务开始。根任务有两个并行运行的子任务。(该图显示一个示例,其中一个任务比另一个任务运行更长时间。)这两个任务完成后,第三个子任务会运行。终结器任务在所有其他任务完成或无法完成之后运行:

任务序列图。
CREATE TASK task_root
  SCHEDULE = '1 MINUTE'
  AS SELECT 1;

CREATE TASK task_a
  AFTER task_root
  AS SELECT 1;

CREATE TASK task_b
  AFTER task_root
  AS SELECT 1;

CREATE TASK task_c
  AFTER task_a, task_b
  AS SELECT 1;
Copy

注意事项:

  • 一个任务图最多包含 1000 个任务。

  • 单个任务最多可以有 100 个父任务和 100 个子任务。

  • 当在同一用户管理的仓库上并行运行时,必须调整 计算资源 的大小以处理并发任务运行。

终结器任务

您可以添加一个可选的终结器任务,以便在任务图中所有其他任务完成(或无法完成)后运行。使用此任务执行以下操作:

  • 执行清理操作,例如,清理不再需要的中间数据。

  • 发送有关任务成功或失败的通知。

任务序列显示一个指向两个子任务的根任务,子任务又指向另一个任务。在底部显示一个终结器任务,在所有其他任务完成或无法完成后运行。

要创建终结器任务,请对根任务使用 CREATETASK ...FINALIZE ...。示例:

CREATE TASK task_finalizer
  FINALIZE = task_root
  AS SELECT 1;
Copy

注意事项:

  • 终结器任务始终与根任务关联。每个根任务只能有一个终结器任务,一个终结器任务只能与一个根任务相关联。

  • 当跳过任务图的根任务时(例如,由于 重叠任务图运行),终结器任务将无法启动。

  • 终结器任务不能有任何子任务。

  • 仅当当前任务图运行中没有其他任务正在运行或排队时,才会调度终结器任务。

有关更多示例,请参阅 终结器任务示例:发送电子邮件通知终结器任务示例:纠正错误

管理任务图所有权

任务图中的所有任务必须具有相同的任务所有者,并且存储在同一数据库和架构中。

您可以通过以下任一操作转移任务图中所有任务的所有权:

  • 使用 DROP ROLE 删除任务图中所有任务的所有者。Snowflake 将所有权转让给运行 DROP ROLE 命令的角色。

  • 针对架构中的所有任务,使用 GRANT OWNERSHIP 转移任务图中所有任务的所有权。

当您使用这些方法转移任务图中任务的所有权时,任务图中的任务将保留彼此之间的关系。

转移单个任务的所有权将切断该任务与任何父任务和子任务之间的依赖关系。有关更多信息,请参阅 :ref:`label-tasks-graphs-link-sever`(本主题内容)。

备注

如果任务图的所有者与执行复制的角色不同,则数据库复制不适用于任务图。

在任务图中运行或计划任务

手动运行任务图

您可以运行任务图的单个实例。这有利于在生产中启用任务图之前测试新的或修改的任务图,或者根据需要进行一次性运行。

在启动任务图之前,请对您要在运行中包含的每个子任务(包括可选的终结器任务)使用:doc:ALTER TASK ...RESUME </sql-reference/sql/alter-task>

要运行任务图的单个实例,请对根任务使用 EXECUTE TASK。运行根任务时,任务图中所有恢复的子任务将按照任务图定义的顺序执行。

按计划或作为触发的任务运行任务

在根任务中,定义任务图的运行时间。任务图可以按定期计划运行,也可以由事件触发。有关详细信息,请参阅以下主题:

要启动任务图,您可以执行以下任一操作:

  • 恢复要包含在运行中的每个单独的子任务(包括终结器),然后使用 ALTER TASK ... RESUME

  • 在根任务上使用 SYSTEM$TASK_DEPENDENTS_ENABLE (<root_task_name>) 同时恢复任务图中的所有任务。

查看任务图中的依赖任务

要查看根任务的子任务,请调用 TASK_DEPENDENTS 表函数。要在任务图中检索所有任务,请在调用该函数时输入根任务。

您还可以使用 Snowsight 管理和查看您的任务图。有关更多信息,请参阅 查看 Snowsight 中的任务和任务图

修改、挂起或重试任务

修改任务图中的任务

要修改计划任务图中的任务,请使用 ALTER TASK ... SUSPEND。 如果任务图正在运行,则完成当前运行。根任务的所有未来计划运行都将取消。

当根任务挂起时,子任务(包括终结器任务)保留其状态(挂起、正在运行或已完成)。子任务不需要单独挂起。

挂起根任务后,您可以修改任务图中的任何任务。

要恢复任务图,您可以执行以下任一操作:

  • 使用 ALTER TASK ... RESUME。以前运行的单个子任务不需要恢复。

  • 通过调用 SYSTEM$TASK_DEPENDENTS_ENABLE 并传入根任务的名称,同时恢复任务图中的所有任务。

跳过或挂起子任务

要跳过任务图中的子任务,请使用 ALTER TASK ... SUSPEND

挂起子任务时,任务图会继续运行,好像子任务已成功一样。只要 至少有一个 前置任务处于恢复状态,且所有恢复的前置任务都成功运行完成,具有多个前置任务的子任务即可运行。

图形显示了包含挂起子任务的任务图。跳过挂起的子任务,任务图完成。

重试失败的任务

使用 EXECUTE TASK ...RETRY LAST 尝试从上次失败的任务运行任务图。如果任务成功,所有子任务将继续运行,因为它们的前置任务已完成。

自动重试

默认情况下,如果子任务失败,则认为整个任务图失败。

您不必等到下一个计划任务图运行,而是可以在根任务上设置 TASK_AUTO_RETRY_ATTEMPTS 参数,指示任务图立即重试。当子任务失败时,立即重试整个任务图,最多重试指定的次数。如果任务图仍未完成,则认为任务图失败。

失败的任务图运行后,挂起任务图

默认情况下,任务图在连续失败 10 次后挂起。您可以对根任务设置 SUSPEND_TASK_AFTER_NUM_FAILURES,从而更改此值。

在下面的示例中,每当子任务失败时,任务图会立即重试两次,然后整个任务图才会被视为失败。如果任务图连续三次失败,则任务图将挂起。

CREATE OR REPLACE TASK task_root
  SCHEDULE = '1 MINUTE'
  TASK_AUTO_RETRY_ATTEMPTS = 2   --  Failed task graph retries up to 2 times
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3   --  Task graph suspends after 3 consecutive failures
  AS SELECT 1;
Copy

重叠任务图运行

默认情况下,Snowflake 确保一次只允许运行一项特定任务图的一个实例。只有在该任务图中的所有任务都运行完毕后,才会安排根任务的下一次运行。这意味着,如果运行任务图中所有任务所需的累积时间超过了根任务定义中设置的明确计划时间,则至少会跳过任务图的一次运行。

要允许子任务重叠,请在根任务上使用 CREATE TASKALTER TASK,并将 ALLOW_OVERLAPPING_EXECUTION 设置为 TRUE。(根任务从不重叠。)

重叠任务图运行

任务图重叠运行所执行的读/写 SQL 操作不会产生错误或重复的数据时,重叠运行是可以容忍的(甚至是可取的)。但是,对于其他任务图,任务所有者(对任务图中所有任务具有 OWNERSHIP 权限的角色)应为根任务设置适当的计划,并选择适当的仓库大小(或使用无服务器计算资源),以确保在下次计划运行根任务之前完成任务图的一个实例。

为了更好地将任务图与根任务中定义的计划保持一致,请执行以下操作:

  1. 如可行,延长根任务运行之间的计划时间。

  2. 考虑修改计算密集型任务,以使用无服务器计算资源。如果任务依赖于由用户管理的计算资源,请增加运行任务图中大型或复杂 SQL 语句或存储过程的仓库的大小。

  3. 分析每项任务执行的 SQL 语句或存储过程。确定是否可以重写代码,以利用并行处理。

如果以上解决方案都没有帮助,请考虑是否需要对根任务设置 ALLOW_OVERLAPPING_EXECUTION = TRUE,以允许并行运行任务图。可在创建任务时(使用 CREATE TASK)或稍后使用 ALTER TASK 或 Snowsight)来定义此参数。

版本控制

当恢复或手动执行任务图中的根任务时,Snowflake 会设置整个任务图的版本,包括任务图中所有任务的所有属性。暂停和修改任务后,恢复或手动执行根任务时 Snowflake 会设置一个新的版本。

要修改或重新创建任务图中的任何任务,必须先暂停根任务。根任务暂停时,根任务的所有未来计划运行都将被取消;但如果当前有任何任务正在运行,则这些任务和任何子任务都将使用当前版本继续运行。

备注

如果在任务图执行过程中,任务调用的存储过程的定义发生变化,则可以在当前运行的任务调用存储过程时执行新的编程。

例如,假设任务图中的根任务已暂停,但该任务的计划运行已开始。在根任务仍在运行期间,任务图中所有任务的所有者修改了子任务调用的 SQL 代码。子任务使用根任务开始运行时现行的任务图版本,运行并执行其定义中的 SQL 代码。恢复或手动执行根任务时,将设置新版本的任务图。这个新版本包括对子任务的修改。

要检索任务版本的历史记录,请查询 TASK_VERSIONS Account Usage 视图 (在 SNOWFLAKE 共享数据库中)。

任务图持续时间

任务图持续时间包括从根任务计划开始到最后一个子任务完成的时间。要计算任务图的持续时间,请查询 COMPLETE_TASK_GRAPHS 视图 并比较 SCHEDULED_TIME 与 COMPLETED_TIME。

例如,下图显示了计划每分钟运行的任务图。根任务及其两个子任务各排队 5 秒并运行 10 秒,总共需要 45 秒才能完成。

一个任务图的图表,包括三个具有依赖关系的任务。每个任务排队 5 秒并运行 10 秒,总共运行 45 秒。

注意事项

  • 对于无服务器任务,Snowflake 会自动缩放资源,以确保任务在目标完成间隔(包括排队时间)内完成。

  • 对于用户管理任务,当任务计划在共享或繁忙仓库上运行时,较长的排队时间很常见。

  • 对于任务图,总时间可能包括子任务等待其前置任务完成的额外排队时间。

创建具有逻辑(运行时信息、配置和返回值)的任务图

任务图中的任务使用父任务的返回值,在其函数体中执行基于逻辑的操作。

注意事项:

  • 一些基于逻辑的命令(如 SYSTEM$GET_PREDECESSOR_RETURN_VALUE)区分大小写。但是,使用不带引号的 CREATE TASK 创建的任务 以大写存储和解析。要对此进行管理,您可以执行以下任一操作:

    • 仅使用大写字母创建任务名称。

    • 命名和调用任务时使用引号。

    • 对于使用小写字符定义的任务名称,请使用大写字符调用任务。例如:由“CREATE TASK task_c...”定义的任务可以 SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('TASK_C') 形式调用。

将配置信息传递给任务图

您可以使用任务图中其他任务可以读取的 JSON 对象传递配置信息。使用语法 CREATE/ALTER TASK … CONFIG 设置、取消设置或修改根任务中的配置信息。使用函数 SYSTEM$GET_TASK_GRAPH_CONFIG 检索。示例:

CREATE OR REPLACE TASK "task_root"
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  CONFIG='{"environment": "production", "path": "/prod_directory/"}'
  AS
    SELECT 1;

CREATE OR REPLACE TASK "task_a"
  USER_TASK_TIMEOUT_MS = 600000
  AFTER "task_root"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
      INSERT INTO demo_table VALUES('task c path',:value);
    END;
Copy

在任务之间传递返回值

您可以在任务图中的任务之间传递返回值。使用函数 SYSTEM$SET_RETURN_VALUE 从任务中添加返回值,并使用函数 SYSTEM$GET_PREDECESSOR_RETURN_VALUE 检索返回值。

当一个任务有多个前置任务时,您必须指定哪个任务具有您想要的返回值。在下面的示例中,我们在添加配置信息的任务图中创建根任务。

CREATE OR REPLACE TASK "task_c"
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task_c successful');
    END;

CREATE OR REPLACE TASK "task_d"
  USER_TASK_TIMEOUT_MS = 60000
  AFTER "task_c"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('task_c'));
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
      INSERT INTO demo_table VALUES('Value from predecessor task_c', :value);
    END;
Copy

获取和使用运行时信息

使用函数 SYSTEM$TASK_RUNTIME_INFO 报告有关当前任务运行的信息。该函数有几个特定于任务图的选项。例如,使用 CURRENT_ROOT_TASK_NAME 获取当前任务图中根任务的名称。以下示例展示了如何根据任务图的根任务开始的时间向表中添加日期戳。

-- Updates the date/time table after the root task completes.
CREATE OR REPLACE TASK "task_date_time_table"
  USER_TASK_TIMEOUT_MS = 60000
  AFTER "task_root"
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
      INSERT INTO date_time_table VALUES('order_date',:value);
    END;
Copy

示例

示例:启动多个任务并报告状态

在以下示例中,根任务启动更新三个不同表的任务。在这三个表更新后,一个任务会将其他三个表中的信息合并到一个汇总销售表中。

流程图显示了启动三个子任务的根任务,每个子任务更新一个表。这三个任务都在另一个子任务之前,子任务将以前的更改合并到另一个表中。
-- Create a notebook in the public schema
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- task_a: Root task. Starts the task graph and sets basic configurations.
CREATE OR REPLACE TASK task_a
  SCHEDULE = '1 MINUTE'
  TASK_AUTO_RETRY_ATTEMPTS = 2
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3
  USER_TASK_TIMEOUT_MS = 60000
  CONFIG='{"environment": "production", "path": "/prod_directory/"}'
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task_a successful');
    END;
;

-- task_customer_table: Updates the customer table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_customer_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT customer_id FROM ref_cust_table
        WHERE cust_name = "Jane Doe";);
      INSERT INTO customer_table VALUES('customer_id',:value);
    END;
;

-- task_product_table: Updates the product table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_product_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT product_id FROM ref_item_table
        WHERE PRODUCT_NAME = "widget";);
      INSERT INTO product_table VALUES('product_id',:value);
    END;
;

-- task_date_time_table: Updates the date/time table.
--   Runs after the root task completes.
CREATE OR REPLACE TASK task_date_time_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
      INSERT INTO "date_time_table" VALUES('order_date',:value);
    END;
;

-- task_sales_table: Aggregates changes from other tables.
--   Runs only after updates are complete to all three other tables.
CREATE OR REPLACE TASK task_sales_table
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_customer_table, task_product_table, task_date_time_table
  AS
    BEGIN
      LET VALUE := (SELECT sales_order_id FROM ORDERS);
      JOIN CUSTOMER_TABLE ON orders.customer_id=customer_table.customer_id;
      INSERT INTO sales_table VALUES('sales_order_id',:value);
    END;
;
Copy

终结器任务示例:发送电子邮件通知

本示例展示了终结器任务如何发送电子邮件,该电子邮件总结了任务图的执行方式。该任务调用两个外部函数:一个汇总有关任务完成状态的信息,另一个使用该信息编写可通过远程消息传递服务发送的电子邮件。

任务序列显示一个指向两个子任务的根任务,子任务又指向另一个任务。终结器任务在底部显示,在所有其他任务完成或无法完成后运行。
CREATE OR REPLACE TASK notify_finalizer
  USER_TASK_TIMEOUT_MS = 60000
  FINALIZE = task_root
AS
  DECLARE
    my_root_task_id STRING;
    my_start_time TIMESTAMP_LTZ;
    summary_json STRING;
    summary_html STRING;
  BEGIN
    --- Get root task ID
    my_root_task_id := (CALL SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_UUID'));
    --- Get root task scheduled time
    my_start_time := (CALL SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'));
    --- Combine all task run info into one JSON string
    summary_json := (SELECT get_task_graph_run_summary(:my_root_task_id, :my_start_time));
    --- Convert JSON into HTML table
    summary_html := (SELECT HTML_FROM_JSON_TASK_RUNS(:summary_json));

    --- Send HTML to email
    CALL SYSTEM$SEND_EMAIL(
        'email_notification',
        'admin@snowflake.com',
        'notification task run summary',
        :summary_html,
        'text/html');
    --- Set return value for finalizer
    CALL SYSTEM$SET_RETURN_VALUE('✅ Graph run summary sent.');
  END

CREATE OR REPLACE FUNCTION get_task_graph_run_summary(my_root_task_id STRING, my_start_time TIMESTAMP_LTZ)
  RETURNS STRING
AS
$$
  (SELECT
    ARRAY_AGG(OBJECT_CONSTRUCT(
      'task_name', name,
      'run_status', state,
      'return_value', return_value,
      'started', query_start_time,
      'duration', duration,
      'error_message', error_message
      )
    ) AS GRAPH_RUN_SUMMARY
  FROM
    (SELECT
      NAME,
      CASE
        WHEN STATE = 'SUCCEED' then '🟢 Succeeded'
        WHEN STATE = 'FAILED' then '🔴 Failed'
        WHEN STATE = 'SKIPPED' then '🔵 Skipped'
        WHEN STATE = 'CANCELLED' then '🔘 Cancelled'
      END AS STATE,
      RETURN_VALUE,
      TO_VARCHAR(QUERY_START_TIME, 'YYYY-MM-DD HH24:MI:SS') AS QUERY_START_TIME,
      CONCAT(TIMESTAMPDIFF('seconds', query_start_time, completed_time),
        ' s') AS DURATION,
      ERROR_MESSAGE
    FROM
      TABLE(my-database.information_schema.task_history(
        ROOT_TASK_ID => my_root_task_id ::STRING,
        SCHEDULED_TIME_RANGE_START => my_start_time,
        SCHEDULED_TIME_RANGE_END => current_timestamp()
      ))
    ORDER BY
      SCHEDULED_TIME)
  )::STRING
$$
;

CREATE OR REPLACE FUNCTION HTML_FROM_JSON_TASK_RUNS(JSON_DATA STRING)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'GENERATE_HTML_TABLE'
AS
$$
  IMPORT JSON

  def GENERATE_HTML_TABLE(JSON_DATA):
    column_widths = ["320px", "120px", "400px", "160px", "80px", "480px"]

  DATA = json.loads(JSON_DATA)
  HTML = f"""
    <img src="https://example.com/logo.jpg"
      alt="Company logo" height="72">
    <p><strong>Task Graph Run Summary</strong>
      <br>Sign in to Snowsight to see more details.</p>
    <table border="1" style="border-color:#DEE3EA"
      cellpadding="5" cellspacing="0">
      <thead>
        <tr>
        """
        headers = ["Task name", "Run status", "Return value", "Started", "Duration", "Error message"]
        for i, header in enumerate(headers):
            HTML += f'<th scope="col" style="text-align:left;
            width: {column_widths[i]}">{header.capitalize()}</th>'

        HTML +="""
        </tr>
      </thead>
      <tbody>
        """
        for ROW_DATA in DATA:
          HTML += "<tr>"
          for header in headers:
            key = header.replace(" ", "_").upper()
            CELL_DATA = ROW_DATA.get(key, "")
            HTML += f'<td style="text-align:left;
            width: {column_widths[headers.index(header)]}">{CELL_DATA}</td>'
          HTML += "</tr>"
        HTML +="""
      </tbody>
    </table>
    """
  return HTML
$$
;
Copy

终结器任务示例:纠正错误

此示例演示了终结器任务如何更正错误。

出于演示目的,这些任务设计为在首次运行时失败。终结器任务更正问题并重新启动任务,这将在以下运行中成功:

显示任务系列的图表。任务 A 显示在左上角。一个箭头从任务 A 向右指向任务 B,任务 B 指向任务 C,任务 C 指向任务 D。在任务 A 下方,箭头指向终结器任务,即任务 F。
-- Configuration
-- By default, the notebook creates the objects in the public schema.
-- USE DATABASE <database name>;
-- USE SCHEMA <schema name>;

-- 1. Set the default configurations.
--    Creates a root task ("task_a"), and sets the default configurations
--    used throughout the task graph.
--    Configurations include:
--    * Each task runs after one minute, with a 60-second timeout.
--    * If a task fails, retry it twice. if it fails twice,
--      the entire task graph is considered as failed.
--    * If the task graph fails consecutively three times, suspend the task.
--    * Other environment values are set.

CREATE OR REPLACE TASK task_a
  SCHEDULE = '1 MINUTE'
  USER_TASK_TIMEOUT_MS = 60000
  TASK_AUTO_RETRY_ATTEMPTS = 2
  SUSPEND_TASK_AFTER_NUM_FAILURES = 3
  AS
    BEGIN
      CALL SYSTEM$SET_RETURN_VALUE('task a successful');
    END;
;

-- 2. Use a runtime reflection variable.
--    Creates a child task ("task_b").
--    By design, this example fails the first time it runs, because
--    it writes to a table ("demo_table") that doesn’t exist.
CREATE OR REPLACE TASK task_b
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_a
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$TASK_RUNTIME_INFO('current_task_name'));
      INSERT INTO demo_table VALUES('task b name',:VALUE);
    END;
;

-- 3. Get a task graph configuration value.
--    Creates the child task ("task_c").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_b") fails.
CREATE OR REPLACE TASK task_c
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_b
  AS
    BEGIN
      CALL SYSTEM$GET_TASK_GRAPH_CONFIG('path');
      LET VALUE := (SELECT SYSTEM$GET_TASK_GRAPH_CONFIG('path'));
      INSERT INTO demo_table VALUES('task c path',:value);
    END;
;

-- 4. Get a value from a predecessor.
--    Creates the child task ("task_d").
--    By design, this example fails the first time it runs, because
--    the predecessor task ("task_c") fails.
CREATE OR REPLACE TASK task_d
  USER_TASK_TIMEOUT_MS = 60000
  AFTER task_c
  AS
    BEGIN
      LET VALUE := (SELECT SYSTEM$GET_PREDECESSOR_RETURN_VALUE('TASK_A'));
      INSERT INTO demo_table VALUES('task d: predecessor return value', :value);
    END;
;

-- 5. Create the finalizer task ("task_f"), which creates the missing demo table.
--    After the finalizer completes, the task should automatically retry
--    (see task_a: task_auto_retry_attempts).
--    On retry, task_b, task_c, and task_d should complete successfully.
CREATE OR REPLACE TASK task_f
  USER_TASK_TIMEOUT_MS = 60000
  FINALIZE = task_a
  AS
    BEGIN
      CREATE TABLE IF NOT EXISTS demo_table(NAME VARCHAR, VALUE VARCHAR);
    END;
;

-- 6. Resume the finalizer. Upon creation, tasks start in a suspended state.
--    Use this command to resume the finalizer.
ALTER TASK task_f RESUME;
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('task_a');

-- 7. Query the task history
SELECT
    name, state, attempt_number, scheduled_from
  FROM
    TABLE(information_schema.task_history(task_name=> 'task_b'))
  LIMIT 5;
;

-- 8. Suspend the task graph to stop incurring costs
--    Note: To stop the task graph, you only need to suspend the root task
--    (task_a). Child tasks don’t run unless the root task is run.
--    If any child tasks are running, they have a limited duration
--    and will end soon.
ALTER TASK task_a SUSPEND;
DROP TABLE demo_table;

-- 9. Check tasks during execution (optional)
--    Run this command to query the demo table during execution
--    to check which tasks have run.
SELECT * FROM demo_table;

-- 10. Demo reset (optional)
--     Run this command to remove the demo table.
--     This causes task_b to fail during its first run.
--     After the task graph retries, task_b will succeed.
DROP TABLE demo_table;
Copy
语言: 中文