触发型发任务¶
使用触发型任务,在 流 发生变化时运行任务。这消除了在新数据的可用性不可预测时需要频繁轮询源的问题。它还减少了延迟,因为数据是立即处理的。
触发型任务在触发事件之前不会使用计算资源。
注意事项¶
对于托管在目录表上的流,需要刷新目录表,然后触发的任务才能检测到更改。要检测更改,您可以执行以下任一操作:
将 目录表设置为自动刷新。
使用 ALTER STAGE name REFRESH 命令手动刷新目录表。
不支持外部表和混合表上的流。
为了让使用者在共享表或安全视图上创建流,数据提供商必须对打算在其账户中共享的表和视图(即
ALTER VIEW <view_name> SET CHANGE_TRACKING = TRUE;
)启用更改跟踪。如果不启用变更跟踪,使用者就无法在共享数据上创建流。有关更多信息,请参阅 共享对象上的流。
创建触发任务¶
使用 CREATE TASK 并设置以下参数:
使用
WHEN
子句定义目标流。(请勿包括SCHEDULE
参数。)处理多个数据流时,可以使用条件参数:
WHEN ... AND
和WHEN ... OR
。基于 计算资源 的其他要求:
要创建无服务器任务,必须包含
TARGET_COMPLETION_INTERVAL
参数。请勿包括WAREHOUSE
参数。Snowflake 使用目标完成间隔来估算所需的资源,并进行调整以在这段时间内完成任务。
要创建在用户管理的仓库上运行的任务,请添加
WAREHOUSE
参数并定义仓库。
将现有任务从计划任务迁移到触发任务¶
暂停任务。
取消设置
SCHEDULE
参数,然后添加WHEN
子句来定义目标流。恢复任务。
ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task SET WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
将现有的用户管理的触发任务迁移到无服务器触发任务¶
暂停任务。
移除
WAREHOUSE
参数并设置TARGET_COMPLETION_INTERVAL
参数。恢复任务。
ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
有关更多信息,请参阅 无服务器任务。
允许触发的任务运行¶
当您创建触发任务时,它会以暂停状态启动。
要开始监控流,请执行以下步骤:
使用 ALTER TASK ... RESUME 继续执行任务。
该任务在以下条件下运行:
当您首次恢复触发的任务时,该任务会检查流中自上次任务运行以来的变化。如果发生了更改,则任务将运行;否则,会在不使用计算资源的情况下跳过任务。
如果任务正在运行且流中有新数据,则该任务将等到当前任务完成。Snowflake 确保一次只执行一个任务实例。
任务完成后,Snowflake 会再次检查流中是否有更改。如果流中有更改,任务将再次运行;如果没有,则跳过任务。
每当在流中检测到新数据时,任务就会运行。
如果流数据托管在目录表中,则要检测更改,您可以执行以下任一操作:
该任务每 12 小时运行一次运行状况检查,以防止流过时。如果没有更改,Snowflake 会在不使用计算资源的情况下跳过任务。对于流,任务说明必须在数据保留期到期之前使用流中的数据;否则,流将过时。有关更多信息,请参阅 避免流过期。
默认情况下,触发型任务最多每 30 秒运行一次。您可以修改 USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS 参数,以便更频繁地运行任务,最短间隔为 10 秒。
如果通过 视图上的流 触发任务,则通过视图上的流查询引用的表的任何更改也将触发该任务,且不受该查询中任何联接、聚合或筛选操作的影响。
监控触发型任务¶
在
SHOW TASKS
和DESC TASK
输出中,触发型任务的SCHEDULE
属性显示为NULL
。在 information_schema 和 account_usage 架构的 task_history 视图的输出中,SCHEDULED_FROM 列显示为 TRIGGER。
示例¶
示例 1:创建无服务器任务,每当流中的数据发生变化时,该任务就会运行。
由于任务是无服务器的,因此需要 TARGET_COMPLETION_INTERVAL
参数以允许 Snowflake 估计所需的计算资源。
CREATE TASK my_task
TARGET_COMPLETION_INTERVAL='120 MINUTES'
WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS SELECT 1;
示例 2:创建一个用户管理任务,在两个流中任意一个流的数据发生变化时运行。
CREATE TASK triggered_task_either_of_two_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, return_total, return_date, 'return'
FROM my_return_stream
UNION ALL
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
示例 3:创建一个用户管理的任务,当在两个不同的数据流中检测到数据变化时运行。由于任务使用 AND 条件,如果两个流中只有一个有新数据,则会跳过该任务。
CREATE TASK triggered_task_both_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO completed_promotions
SELECT order_id, order_total, order_time, promotion_id
FROM orders_stream
WHERE promotion_id IS NOT NULL;
示例 4:创建一个用户管理的任务,该任务在目录表中的数据发生变化时运行。在示例中,流 (my_directory_table_stream) 托管在暂存区 (my_test_stage) 的 目录表 上。
CREATE TASK triggered_task_directory_table
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
AS
INSERT INTO tasks_runs
SELECT 'trigger_t_internal_stage', relative_path, size,
last_modified, file_url, etag, metadata$action
FROM my_directory_table_stream;
为了验证触发的任务,将数据添加到暂存区中。
COPY INTO @my_test_stage/my_test_file
FROM (SELECT 100)
OVERWRITE=TRUE
然后手动刷新目录表,从而触发任务。
ALTER STAGE my_test_stage REFRESH