- 类别:
SYSTEM$TASK_RUNTIME_INFO¶
返回有关当前任务运行的信息。如果在任务运行之外调用此函数,则会失败并显示错误。
语法¶
SYSTEM$TASK_RUNTIME_INFO('<arg_name>')
实参¶
'arg_name'
指定要返回的信息类型。您可以指定以下值中的一个:
值
描述
'CURRENT_TASK_NAME'
返回当前任务的名称。
'CURRENT_ROOT_TASK_NAME'
返回当前任务图中根任务的名称。
'CURRENT_ROOT_TASK_UUID'
返回一个通用唯一标识符 (UUID),该标识符表示当前任务图形中的根任务。
'CURRENT_TASK_GRAPH_RUN_GROUP_ID'
返回一个通用唯一标识符 (UUID),该标识符表示当前图形运行组。
'CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'
返回当前图形运行组中根任务的原始计划时间戳。
对于重试的图形,返回值是当前组中初始图形运行的原始计划时间戳。
'LAST_SUCCESSFUL_TASK_GRAPH_RUN_GROUP_ID'
返回一个通用唯一标识符 (UUID),该标识符表示最近成功的图形运行组。
该值在整个图形运行组中是一致的,并在初始图形运行的根任务开始时确定。
'LAST_SUCCESSFUL_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'
返回最近成功的图形运行组中根任务的原始计划时间戳。
该值在整个图形运行组中是一致的,并在初始图形运行的根任务开始时确定。
返回¶
返回包含请求信息的 STRING 或 TEXT。
示例¶
将 CURRENT_TASK_GRAPH_RUN_GROUP_ID 和 CURRENT_ROOT_TASK_NAME 结合使用,以进行调试并创建唯一的输出目录或文件:
CREATE OR REPLACE TASK my_task ... AS ... -- Inside Python UDF query_result = session.sql("""select SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_NAME') root_name, SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID') run_id""").collect() current_root_task_name, current_graph_run_id = result.ROOT_NAME, result.RUN_ID -- Logging information here logger.debug(f"start training for {current_root_task_name} at run {current_graph_run_id}") -- Create a unique output directory to store intermediate information output_dir_name = f"{current_root_task_name}/{current_graph_run_id}/preprocessing.out" with open(output_dir_name, "rw+") as f: .... ...;
将 CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP 和 LAST_SUCCESSFUL_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP 结合使用,以处理来自流式输入源的数据:
CREATE OR REPLACE TASK my_task ... AS ... INSERT INTO my_output_table SELECT * FROM my_source_table WHERE TRUE ... AND TIMESTAMP BETWEEN COALESCE( SYSTEM$TASK_RUNTIME_INFO(‘LAST_SUCCESSFUL_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP’), '2023-07-01' ) AND SYSTEM$TASK_RUNTIME_INFO(‘CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP’) ...;
使用 LAST_SUCCESSFUL_TASK_GRAPH_RUN_GROUP_ID 生成唯一的输出目录和日志行:
CREATE OR REPLACE TASK my_task ... AS ... -- Inside Python UDF query_result = session.sql("select SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_NAME') root_name, SYSTEM$TASK_RUNTIME_INFO('LAST_SUCCESSFUL_TASK_GRAPH_RUN_GROUP_ID') last_run_id").collect() current_root_task_name, last_graph_run_id = result.ROOT_NAME,result.LAST_RUN_ID logger.log(f"graph name: {current_root_task_name}, last successful run: {last_graph_run_id}") ...;