触发型发任务¶
使用触发型任务,在 流 发生变化时运行任务。这消除了在新数据的可用性不可预测时需要频繁轮询源的问题。它还减少了延迟,因为数据是立即处理的。
触发型任务在触发事件之前不会使用计算资源。
注意事项¶
Triggered tasks are supported with the following items:
表
视图
动态表
Apache Iceberg™ tables (managed and unmanaged)
Data shares
对于托管在目录表上的流,需要刷新目录表,然后触发的任务才能检测到更改。要检测更改,您可以执行以下任一操作:
将 目录表设置为自动刷新。
使用 ALTER STAGE name REFRESH 命令手动刷新目录表。
Triggered tasks aren't supported with the following items:
混合表
Streams on external tables
为了让使用者在共享表或安全视图上创建流,数据提供商必须对打算在其账户中共享的表和视图(即 ALTER VIEW <view_name> SET CHANGE_TRACKING = TRUE;)启用更改跟踪。如果不启用变更跟踪,使用者就无法在共享数据上创建流。有关更多信息,请参阅 共享对象上的流。
创建触发任务¶
使用 CREATE TASK 并设置以下参数:
使用
WHEN子句定义目标流。(请勿包括SCHEDULE参数。)基于 计算资源 的其他要求:
要创建在用户管理的仓库上运行的任务,请添加
WAREHOUSE参数并定义仓库。要创建无服务器任务,必须包含
TARGET_COMPLETION_INTERVAL参数。请勿包括WAREHOUSE参数。Snowflake 使用目标完成间隔来估算所需的资源,并进行调整以在这段时间内完成任务。
示例 1:创建无服务器任务,每当流中的数据发生变化时,该任务就会运行。
CREATE TASK my_triggered_task
TARGET_COMPLETION_INTERVAL='15 MINUTES'
WHEN SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
将现有任务从计划任务迁移到触发任务¶
暂停任务。
取消设置
SCHEDULE参数,然后添加WHEN子句来定义目标流。恢复任务。
ALTER TASK task SUSPEND;
ALTER TASK task UNSET SCHEDULE;
ALTER TASK task MODIFY WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream');
ALTER TASK task RESUME;
将现有的用户管理的触发任务迁移到无服务器触发任务¶
暂停任务。
移除
WAREHOUSE参数并设置TARGET_COMPLETION_INTERVAL参数。恢复任务。
ALTER TASK task SUSPEND;
ALTER TASK task UNSET WAREHOUSE;
ALTER TASK task RESUME;
有关更多信息,请参阅 无服务器任务。
允许触发的任务运行¶
当您创建触发任务时,它会以暂停状态启动。
要开始监控流,请执行以下步骤:
使用 ALTER TASK ... RESUME 继续执行任务。
该任务在以下条件下运行:
当您首次恢复触发的任务时,该任务会检查流中自上次任务运行以来的变化。如果发生了更改,则任务将运行;否则,会在不使用计算资源的情况下跳过任务。
如果任务正在运行且流中有新数据,则该任务将等到当前任务完成。Snowflake 确保一次只执行一个任务实例。
任务完成后,Snowflake 会再次检查流中是否有更改。如果流中有更改,任务将再次运行;如果没有,则跳过任务。
每当在流中检测到新数据时,任务就会运行。
如果流数据托管在目录表中,则要检测更改,您可以执行以下任一操作:
If a task hasn't run for 12 hours, Snowflake schedules a health check to prevent streams from becoming stale. The timing of this health check isn't guaranteed. If Snowflake detects no changes, the task is skipped without using compute resources. Task instructions must consume stream data before data retention expires; otherwise, the stream becomes stale. For more information, see Avoiding stream staleness.
Triggered tasks run at most every 30 seconds by default. If a task gets triggered again while running, the next run starts 30 seconds after the previous one was scheduled. You can lower this interval to 10 seconds by setting the USER_TASK_MINIMUM_TRIGGER_INTERVAL_IN_SECONDS parameter.
如果通过 Streams on views 触发任务,则通过视图上的流查询引用的表的任何更改也将触发该任务,且不受该查询中任何联接、聚合或筛选操作的影响。
Monitor triggered tasks¶
在
SHOW TASKS和DESC TASK输出中,触发型任务的SCHEDULE属性显示为NULL。在 information_schema 和 account_usage 架构的 task_history 视图的输出中,SCHEDULED_FROM 列显示为 TRIGGER。
示例¶
示例 2:创建一个用户管理任务,在两个流中任意一个流的数据发生变化时运行。
CREATE TASK triggered_task_either_of_two_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_return_stream')
OR SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO customer_activity
SELECT customer_id, return_total, return_date, 'return'
FROM my_return_stream
UNION ALL
SELECT customer_id, order_total, order_date, 'order'
FROM my_order_stream;
示例 3:创建一个用户管理的任务,当在两个不同的数据流中检测到数据变化时运行。由于任务使用 AND 条件,如果两个流中只有一个有新数据,则会跳过该任务。
CREATE TASK triggered_task_both_streams
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AND SYSTEM$STREAM_HAS_DATA('my_order_stream')
AS
INSERT INTO completed_promotions
SELECT order_id, order_total, order_time, promotion_id
FROM orders_stream
WHERE promotion_id IS NOT NULL;
示例 4:创建一个用户管理的任务,该任务在目录表中的数据发生变化时运行。在示例中,流 (my_directory_table_stream) 托管在暂存区 (my_test_stage) 的 目录表 上。
CREATE TASK triggered_task_directory_table
WAREHOUSE = my_warehouse
WHEN SYSTEM$STREAM_HAS_DATA('my_directory_table_stream')
AS
INSERT INTO tasks_runs
SELECT 'trigger_t_internal_stage', relative_path, size,
last_modified, file_url, etag, metadata$action
FROM my_directory_table_stream;
为了验证触发的任务,将数据添加到暂存区中。
COPY INTO @my_test_stage/my_test_file
FROM (SELECT 100)
OVERWRITE=TRUE
然后手动刷新目录表,从而触发任务。
ALTER STAGE my_test_stage REFRESH