触发型发任务

使用触发型任务,在 发生变化时运行任务。这消除了在新数据的可用性不可预测时需要频繁轮询源的问题。它还减少了延迟,因为数据是立即处理的。

触发型任务在触发事件之前不会使用计算资源。

注意事项

Triggered tasks are supported with the following items:

  • Tables

  • Views

  • Dynamic tables

  • Apache Iceberg™ tables (managed and unmanaged)

  • Data shares

  • Directory tables. A directory table must be refreshed before a triggered task can detect the changes. To detect changes, you can perform either of the following tasks:

Triggered tasks aren't supported with the following items:

  • Hybrid tables

  • Streams on external tables

For consumers to create streams on shared tables or secure views, the data provider must enable change tracking on the tables and views that are intended for sharing in their account; that is, ALTER VIEW <view_name> SET CHANGE_TRACKING = TRUE;. Without change tracking enabled, consumers can't create streams on the shared data. For more information, see 共享对象上的流.

创建触发任务

使用 CREATE TASK 并设置以下参数:

  • 使用 WHEN 子句定义目标流。(请勿包括 SCHEDULE 参数。)

  • 基于 计算资源 的其他要求:

    • 要创建在用户管理的仓库上运行的任务,请添加 WAREHOUSE 参数并定义仓库。

    • 要创建无服务器任务,必须包含 TARGET_COMPLETION_INTERVAL 参数。请勿包括 WAREHOUSE 参数。Snowflake 使用目标完成间隔来估算所需的资源,并进行调整以在这段时间内完成任务。

The following example creates a serverless triggered task that runs whenever data changes in a stream.

Diagram showing a serverless triggered task
CREATE TASK my_triggered_task
  TARGET_COMPLETION_INTERVAL='15 MINUTES'
  WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

将现有任务从计划任务迁移到触发任务

  1. 暂停任务。

  2. Use ALTER TASK to update the task. Unset the SCHEDULE parameter, and then add the WHEN clause to define the target stream.

  3. 恢复任务。

ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Copy

将现有的用户管理的触发任务迁移到无服务器触发任务

  1. 暂停任务。

  2. Use ALTER TASK to update the task. Remove the WAREHOUSE parameter, and then set the TARGET_COMPLETION_INTERVAL parameter.

  3. 恢复任务。

ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Copy

有关更多信息,请参阅 无服务器任务

Allow a triggered task to run

当您创建触发任务时,它会以暂停状态启动。

要开始监控流,请执行以下步骤:

该任务在以下条件下运行:

  • When you first resume a triggered task, the task checks the stream for changes after the last task was run. If there is a change, the task runs; otherwise, it skips the task without using compute resources.

  • If a task is running and the stream has new data, the task pauses until the current task is complete. Snowflake ensures only one instance of a task runs at a time.

  • 任务完成后,Snowflake 会再次检查流中是否有更改。如果流中有更改,任务将再次运行;如果没有,则跳过任务。

  • 每当在流中检测到新数据时,任务就会运行。

  • If the stream data is hosted on a directory table, you detect changes by performing either of the following tasks:

  • 该任务每 12 小时运行一次运行状况检查,以防止流过时。如果没有更改,Snowflake 会在不使用计算资源的情况下跳过任务。对于流,任务说明必须在数据保留期到期之前使用流中的数据;否则,流将过时。有关更多信息,请参阅 避免流过期

  • 默认情况下,触发型任务最多每 30 秒运行一次。您可以修改 USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS 参数,以便更频繁地运行任务,最短间隔为 10 秒。

  • 如果通过 视图上的流 触发任务,则通过视图上的流查询引用的表的任何更改也将触发该任务,且不受该查询中任何联接、聚合或筛选操作的影响。

图表显示了触发型任务如何管理新数据,并每 12 小时检查一次更改。

监控触发型任务

  • SHOW TASKSDESC TASK 输出中,触发型任务的 SCHEDULE 属性显示为 NULL

  • 在 information_schema 和 account_usage 架构的 task_history 视图的输出中,SCHEDULED_FROM 列显示为 TRIGGER。

示例

Example 1: Create a user-managed task that runs whenever data changes in either of two streams.

CREATE TASK triggered_task_either_of_two_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
    OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, return_total, return_date, 'return'
    FROM my_return_stream
    UNION ALL
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

Example 2: Create a user-managed task to run whenever data changes are detected in two different data streams. Because the task uses the AND conditional, the task is skipped if only one of the two streams has new data.

CREATE TASK triggered_task_both_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
    AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO completed_promotions
    SELECT order_id, order_total, order_time, promotion_id
    FROM orders_stream
    WHERE promotion_id IS NOT NULL;
Copy

Example 3: Create a user-managed task that runs whenever data changes in a directory table. In the example, a stream --- my_directory_table_stream --- is hosted on a directory table on a stage called my_test_stage.

CREATE TASK triggered_task_directory_table
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
  AS
    INSERT INTO tasks_runs
    SELECT 'trigger_t_internal_stage', relative_path, size,
            last_modified, file_url, etag, metadata$action
    FROM my_directory_table_stream;
Copy

为了验证触发的任务,将数据添加到暂存区中。

COPY INTO @my_test_stage/my_test_file
  FROM (SELECT 100)
  OVERWRITE=TRUE
Copy

然后手动刷新目录表,从而触发任务。

ALTER STAGE my_test_stage REFRESH
Copy
语言: 中文