类别:

系统函数

SYSTEM$TASK_RUNTIME_INFO

返回有关当前任务运行的信息。如果在任务运行之外调用此函数,则会失败并显示错误。

语法

SYSTEM$TASK_RUNTIME_INFO('<arg_name>')
Copy

实参

'arg_name'

指定要返回的信息类型。您可以指定以下值中的一个:

描述

'CURRENT_TASK_NAME'

返回当前任务的名称。

'CURRENT_ROOT_TASK_NAME'

返回当前任务图中根任务的名称。

'CURRENT_ROOT_TASK_UUID'

返回一个通用唯一标识符 (UUID),该标识符表示当前任务图形中的根任务。

'CURRENT_TASK_GRAPH_RUN_GROUP_ID'

返回一个通用唯一标识符 (UUID),该标识符表示当前图形运行组。

'CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'

返回当前图形运行组中根任务的原始计划时间戳。

对于重试的图形,返回值是当前组中初始图形运行的原始计划时间戳。

'LAST_SUCCESSFUL_TASK_GRAPH_RUN_GROUP_ID'

返回一个通用唯一标识符 (UUID),该标识符表示最近成功的图形运行组。

该值在整个图形运行组中是一致的,并在初始图形运行的根任务开始时确定。

'LAST_SUCCESSFUL_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP'

返回最近成功的图形运行组中根任务的原始计划时间戳。

该值在整个图形运行组中是一致的,并在初始图形运行的根任务开始时确定。

返回

返回包含请求信息的 STRING 或 TEXT。

示例

将 CURRENT_TASK_GRAPH_RUN_GROUP_ID 和 CURRENT_ROOT_TASK_NAME 结合使用,以进行调试并创建唯一的输出目录或文件:

CREATE OR REPLACE TASK my_task ...
  AS
  ...

  -- Inside Python UDF

  query_result = session.sql("""select
        SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_NAME')
        root_name,
        SYSTEM$TASK_RUNTIME_INFO('CURRENT_TASK_GRAPH_RUN_GROUP_ID')
        run_id""").collect()
  current_root_task_name, current_graph_run_id = result.ROOT_NAME, result.RUN_ID

  -- Logging information here

  logger.debug(f"start training for {current_root_task_name} at run {current_graph_run_id}")

  -- Create a unique output directory to store intermediate information

  output_dir_name = f"{current_root_task_name}/{current_graph_run_id}/preprocessing.out"
  with open(output_dir_name, "rw+") as f:
    ....
...;
Copy

将 CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP 和 LAST_SUCCESSFUL_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP 结合使用,以处理来自流式输入源的数据:

CREATE OR REPLACE TASK my_task ...
  AS
  ...
  INSERT INTO my_output_table
    SELECT * FROM my_source_table
      WHERE TRUE
        ...
        AND TIMESTAMP BETWEEN
          COALESCE(
            SYSTEM$TASK_RUNTIME_INFO(‘LAST_SUCCESSFUL_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP’),
            '2023-07-01'
          ) AND SYSTEM$TASK_RUNTIME_INFO(‘CURRENT_TASK_GRAPH_ORIGINAL_SCHEDULED_TIMESTAMP’)
   ...;
Copy

使用 LAST_SUCCESSFUL_TASK_GRAPH_RUN_GROUP_ID 生成唯一的输出目录和日志行:

CREATE OR REPLACE TASK my_task ...
  AS
  ...

  -- Inside Python UDF

  query_result = session.sql("select
      SYSTEM$TASK_RUNTIME_INFO('CURRENT_ROOT_TASK_NAME') root_name, SYSTEM$TASK_RUNTIME_INFO('LAST_SUCCESSFUL_TASK_GRAPH_RUN_GROUP_ID') last_run_id").collect()
  current_root_task_name, last_graph_run_id = result.ROOT_NAME,result.LAST_RUN_ID
  logger.log(f"graph name: {current_root_task_name}, last successful run: {last_graph_run_id}")
  ...;
Copy
语言: 中文