触发型发任务

使用触发型任务,在 发生变化时运行任务。这消除了在新数据的可用性不可预测时需要频繁轮询源的问题。它还减少了延迟,因为数据是立即处理的。

触发型任务在触发事件之前不会使用计算资源。

注意事项

  • 对于托管在目录表上的流,需要刷新目录表,然后触发的任务才能检测到更改。要检测更改,您可以执行以下任一操作:

  • 不支持外部表和混合表上的流。

  • 为了让使用者在共享表或安全视图上创建流,数据提供商必须对打算在其账户中共享的表和视图(即 ALTER VIEW <view_name> SET CHANGE_TRACKING = TRUE;)启用更改跟踪。如果不启用变更跟踪,使用者就无法在共享数据上创建流。有关更多信息,请参阅 共享对象上的流

创建触发任务

使用 CREATE TASK 并设置以下参数:

  • 使用 WHEN 子句定义目标流。(请勿包括 SCHEDULE 参数。)

    处理多个数据流时,可以使用条件参数:WHEN ... ANDWHEN ... OR

  • 基于 计算资源 的其他要求:

    • 要创建无服务器任务,必须包含 TARGET_COMPLETION_INTERVAL 参数。请勿包括 WAREHOUSE 参数。Snowflake 使用目标完成间隔来估算所需的资源,并进行调整以在这段时间内完成任务。

    该图显示了无服务器触发型任务在 Snowflake 中的工作原理。
    • 要创建在用户管理的仓库上运行的任务,请添加 WAREHOUSE 参数并定义仓库。

将现有任务从计划任务迁移到触发任务

  1. 暂停任务。

  2. 取消设置 SCHEDULE 参数,然后添加 WHEN 子句来定义目标流。

  3. 恢复任务。

ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
Copy

将现有的用户管理的触发任务迁移到无服务器触发任务

  1. 暂停任务。

  2. 移除 WAREHOUSE 参数并设置 TARGET_COMPLETION_INTERVAL 参数。

  3. 恢复任务。

ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
Copy

有关更多信息,请参阅 无服务器任务

允许触发的任务运行

当您创建触发任务时,它会以暂停状态启动。

要开始监控流,请执行以下步骤:

该任务在以下条件下运行:

  • 当您首次恢复触发的任务时,该任务会检查流中自上次任务运行以来的变化。如果发生了更改,则任务将运行;否则,会在不使用计算资源的情况下跳过任务。

  • 如果任务正在运行且流中有新数据,则该任务将等到当前任务完成。Snowflake 确保一次只执行一个任务实例。

  • 任务完成后,Snowflake 会再次检查流中是否有更改。如果流中有更改,任务将再次运行;如果没有,则跳过任务。

  • 每当在流中检测到新数据时,任务就会运行。

  • 如果流数据托管在目录表中,则要检测更改,您可以执行以下任一操作:

  • 该任务每 12 小时运行一次运行状况检查,以防止流过时。如果没有更改,Snowflake 会在不使用计算资源的情况下跳过任务。对于流,任务说明必须在数据保留期到期之前使用流中的数据;否则,流将过时。有关更多信息,请参阅 避免流过期

  • 默认情况下,触发型任务最多每 30 秒运行一次。您可以修改 USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS 参数,以便更频繁地运行任务,最短间隔为 10 秒。

  • 如果通过 视图上的流 触发任务,则通过视图上的流查询引用的表的任何更改也将触发该任务,且不受该查询中任何联接、聚合或筛选操作的影响。

图表显示了触发型任务如何管理新数据,并每 12 小时检查一次更改。

监控触发型任务

  • SHOW TASKSDESC TASK 输出中,触发型任务的 SCHEDULE 属性显示为 NULL

  • 在 information_schema 和 account_usage 架构的 task_history 视图的输出中,SCHEDULED_FROM 列显示为 TRIGGER。

示例

示例 1:创建无服务器任务,每当流中的数据发生变化时,该任务就会运行。

由于任务是无服务器的,因此需要 TARGET_COMPLETION_INTERVAL 参数以允许 Snowflake 估计所需的计算资源。

CREATE TASK my_task
  TARGET_COMPLETION_INTERVAL='120 MINUTES'
  WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS SELECT 1;
Copy

示例 2:创建一个用户管理任务,在两个流中任意一个流的数据发生变化时运行。

CREATE TASK triggered_task_either_of_two_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
    OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO customer_activity
    SELECT customer_id, return_total, return_date, 'return'
    FROM my_return_stream
    UNION ALL
    SELECT customer_id, order_total, order_date, 'order'
    FROM my_order_stream;
Copy

示例 3:创建一个用户管理的任务,当在两个不同的数据流中检测到数据变化时运行。由于任务使用 AND 条件,如果两个流中只有一个有新数据,则会跳过该任务。

CREATE TASK triggered_task_both_streams
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
    AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
  AS
    INSERT INTO completed_promotions
    SELECT order_id, order_total, order_time, promotion_id
    FROM orders_stream
    WHERE promotion_id IS NOT NULL;
Copy

示例 4:创建一个用户管理的任务,该任务在目录表中的数据发生变化时运行。在示例中,流 (my_directory_table_stream) 托管在暂存区 (my_test_stage) 的 目录表 上。

CREATE TASK triggered_task_directory_table
  WAREHOUSE = my_warehouse
  WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
  AS
    INSERT INTO tasks_runs
    SELECT 'trigger_t_internal_stage', relative_path, size,
            last_modified, file_url, etag, metadata$action
    FROM my_directory_table_stream;
Copy

为了验证触发的任务,将数据添加到暂存区中。

COPY INTO @my_test_stage/my_test_file
  FROM (SELECT 100)
  OVERWRITE=TRUE
Copy

然后手动刷新目录表,从而触发任务。

ALTER STAGE my_test_stage REFRESH
Copy
语言: 中文