任务反应器

提供所有 Snowflake 连接器都会用到的通用元素和功能的库。

要求

在本机应用程序安装过程中,任务反应器至少需要执行以下 sql 文件:

概述

任务反应器是一个独立的模块,它为存储在队列内的工作块提供了一个协调机制,并带有有限的任务集。任务反应器的队列和调度器基于 Snowflake StreamsSnowflake Tasks,由于刷新时间限制,将每一分钟触发一次。任务反应器只有在输入队列中有数据时才会激活,以便让仓库节省一些信用额度。

任务反应器由队列、调度器和处理器三个主要部分组成:

  1. 您的连接器应用程序会将 QueueItems 添加到队列中。

  2. 调度器(Snowflake 任务)每分钟都会从队列中获取待处理的 QueueItems,并将其传递给处理器。

  3. 处理器(Snowflake 任务)每分钟都会在分配的 QueueItems 上并行工作。

连接器配置确定后,任务反应器配置仅限于 3 个步骤:

  1. 创建任务反应器的所有组件

  2. 初始化实例

  3. (可选)更改处理器编号

创建任务反应器的所有组件

要创建实例对象,用户首先需要创建 workerselector 以及可选的 expired selector 实施,然后使用 TASK_REACTOR.CREATE_INSTANCE_OBJECTS 过程将它们整合在一起。

处理器实施

处理器负责执行调度器分配的任务,如提取和引入某些数据。唯一必须的部分是要有一个特定的处理器方法来启动工作。该方法必须可从 Snowpark 过程中调用,返回字符串,并包含以下参数:

  • session – Snowpark 会话对象

  • worker_id – 编号,唯一的处理器 ID

  • task_reactor_schema – 创建任务反应器对象的架构名称。它可用作任务反应器实例的名称。

处理器负责执行调度器分配的任务,例如提取和引入特定数据。我们建议您使用(com.snowflake.connectors.sdk.taskreactor.worker.IngestionWorkercom.snowflake.connectors.sdk.taskreactor.ingestion.Ingestion)Java 类,或者使用(com.snowflake.connectors.sdk.taskreactor.worker.SimpleTaskWorkercom.snowflake.connectors.sdk.taskreactor.ingestion.SimpleTask)来完成更简单的任务,但您也可以使用任何支持编写存储过程处理程序的编程语言来创建处理器。

Java 处理器方法示例:

public static String executeWork(Session session, int workerId, String taskReactorSchema) {
  FakeIngestion fakeIngestion = new FakeIngestion(session, workerId);
  WorkerId workerIdentifier = new WorkerId(workerId);
  Identifier schemaIdentifier = Identifier.fromWithAutoQuoting(taskReactorSchema);
  try {
    IngestionWorker.from(session, fakeIngestion, workerIdentifier, schemaIdentifier).run();
  } catch (WorkerException e) {
    // handle the exception...
    throw new RuntimeException(e);
  }
  return "Worker procedure executed.";
}
Copy

对于已经创建的处理器方法,用户需要将其集成到 CONNECTOR.WORKER_PROCEDURE 中。过程应调用自己的处理器方法。它必须在应用程序架构中创建,返回 STRING,并包含以下参数:

  • worker_id – 编号

  • task_reactor_schema – 字符串

示例过程,调用处理器的 Java 实施:

CREATE OR REPLACE PROCEDURE CONNECTOR.WORKER_PROCEDURE(worker_id number, task_reactor_schema string)
    RETURNS STRING
    LANGUAGE JAVA
    RUNTIME_VERSION = '11'
    PACKAGES = ('com.snowflake:snowpark:1.11.0', 'com.snowflake:telemetry:0.0.1')
    IMPORTS = ('@jars/myconnector-1.0.0.jar')
    HANDLER = 'com.snowflake.myconnector.WorkerImpl.executeWork';
Copy

遥测库需要收集记录到事件表中的指标。

选择器的实施

选择器的工作是决定哪些队列任务应由任务反应器处理。与处理器实施类似 – 可以用 Snowpark 支持的任何语言创建。任务选择器可以作为数据库存储过程或数据库视图来实施。选择器(存储过程或视图)必须作为参数传递给 TASK_REACTOR.CREATE_NEW_INSTANCE 存储过程。

存储过程必须可以从 Snowpark 存储过程中调用,返回字符串,并包含以下参数:

  • session – Snowpark 会话

  • queueItems – String[](单个 JSON 字符串的数组,每个字符串描述一个 QueueItem)

Java 选择器方法示例:

public static String selectWork(Session session, String[] queueItems) {
  Variant[] sorted =
      Arrays.stream(queueItems)
          .map(Variant::new)
          .filter(
              queueItem ->
                  !queueItem.asMap().get("resourceId").asString().equals("filter-out-resource"))
          .sorted(comparing(queueItem -> queueItem.asMap().get("resourceId").asString()))
          .toArray(Variant[]::new);
  return new Variant(sorted).asJsonString();
}
Copy

不使用选择器方法,仍然可以创建一个视图,对现有队列中的任务进行过滤和排序。调度器可以使用示例查询从新创建的视图中检索新任务:

CREATE VIEW CONNECTOR_SCHEMA.WORK_SELECTOR_VIEW AS SELECT * FROM TASK_REACTOR.QUEUE ORDER BY RESOURCE_ID;
Copy

对于已经创建的选择器方法,用户需要将其集成到 CONNECTOR.WORK_SELECTOR 中。过程应调用必须使用的工作选择器方法。它必须在应用程序架构中创建,返回 ARRAY,并包含以下参数:

  • work_items - array

过程示例,调用工作选择器的 Java 实施:

CREATE OR REPLACE PROCEDURE CONNECTOR.WORK_SELECTOR(work_items array)
    RETURNS ARRAY
    LANGUAGE JAVA
    RUNTIME_VERSION = '11'
    PACKAGES = ('com.snowflake:snowpark:1.11.0')
    IMPORTS = ('@jars/myconnector-1.0.0.jar')
    HANDLER = 'com.snowflake.myconnector.WorkSelector.selectWork';
Copy

过期选择器的实施

过期选择器的工作是决定哪些队列项目应从任务反应器的队列中删除。可能需要删除某些项目,因为选择器永远无法获取某些项目,这些项目会永远留在队列中。此外,有些在队列中等待的项目可能在很久之前就已创建,再处理它们就没有意义了。过期选择器可作为数据库视图来实现。选择器视图必须作为 TASK_REACTOR.CREATE_NEW_INSTANCE 过程的实参来传递。如果不需要从队列中删除项目,则可以使用默认实施 TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR

使用下面的查询可以创建一个过期选择器视图,选择 3 天前创建的项目:

CREATE VIEW CONNECTOR_SCHEMA.EXPIRED_WORK_SELECTOR_VIEW
    AS SELECT * FROM TASK_REACTOR.QUEUE q
        WHERE DATEDIFF(day, q.timestamp, sysdate()) > 3;
Copy

整合实例对象

TASK_REACTOR.CREATE_INSTANCE_OBJECTS 可让用户在初始化创建的实例之前一起配置所有实例。每个架构只能执行一次该存储过程,因此以后调用该存储过程不会影响任何更改。我们建议将初始化调用放在 setup.sql 文件中,以防止过程被多次执行或根本不被调用。

必填参数:

  • instance_schema_name VARCHAR – 每个实例都有一个唯一的架构,用于存储实例工作的数据库对象。

  • worker_procedure_name VARCHARWorker Implementation 部分描述的处理器过程名称。

  • work_selector_type VARCHAR – 指定新任务应使用视图或过程的值。可能的值:VIEW、PROCEDURE。

  • work_selector_name VARCHARSelector Implementation 部分描述的选择器过程/视图的名称。

可选参数:

  • expired_work_selector_name VARCHARExpired Selector Implementation 部分所述过期选择器视图的名称。如果没有提供该值,TASK_REACTOR.EMPTY_EXPIRED_WORK_SELECTOR 将用作默认实施,不返回任何内容。

初始化实例

要初始化和运行任务反应器中的所有配置,用户必须调用 INITIALIZE_INSTANCE。过程的输入参数如下:

  • instance_schema_name –(必填)架构名称,用于存储实例工作所在的数据库对象。

  • ``warehouse_name``(必填)运行实例所在仓库的名称。

  • dt_should_be_started``(可选)– 默认值:``TRUE。是否应在创建新实例时启动调度器任务。

  • dt_task_schedule``(可选)– 默认值:``1 MINUTE。运行调度器任务的频率。

  • dt_allow_overlapping_execution``(可选)– 默认值:``FALSE。允许 DAG 并行运行。

  • ``dt_user_task_timeout_ms``(可选)– 指定任务超时前单次运行的时间限制(以毫秒为单位)。

备注

如果处理器过程耗时超过为处理器任务 (USER_TASK_TIMEOUT_MS) 设置的超时时间,那么过程就会因超时错误而中止。安排任务时一定要注意不要超过 Snowflake 任务的超时时间。

在提供了所需的最少参数后,Task Reactor 将根据所提供的配置进行初始化,并使用 TASK_REACTOR.DISPATCHER 过程调度处理器。

设置处理器数量

可通过调用 TASK_REACTOR.SET_WORKERS_NUMBER 过程手动更改处理器数,参数如下:

  • WORKERS_NUMBER – 新处理器数量。

  • TR_INSTANCE_SCHEMA_NAME – 实例架构名称

衡量标准

任务反应器包含一个度量机制。它基于 Snowflake Trace Events。度量指标会被记录到事件表中,因此必须启用事件表才能使度量指标发挥作用。

目前,引入了以下度量标准:

  • worker working time (TASK_REACTOR_WORKER_WORKING_TIME) – 显示处理器实际处理资源的时间。计时器在处理器任务开始时启动,在处理器任务结束时结束。

  • worker idle time (TASK_REACTOR_WORKER_IDLE_TIME) – 与 worker working time 相反。它显示了处理器的休眠时间:要么在等待新工作,要么在等待其任务的后续安排。计时器从处理器完成任务时开始,到处理器任务再次开始时结束。

要查看所有记录的度量事件,可使用以下查询:

SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';

SELECT
        event.record:name::string AS EVENT_NAME,
        span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
        span.record_attributes:worker_id AS WORKER_ID,
        event.record_attributes:value AS DURATION
    FROM IDENTIFIER($EVENT_TABLE) event
    JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
    WHERE
        event.resource_attributes:"snow.database.name" = $APP_NAME
    ORDER BY event.timestamp DESC;
Copy

为了只选择一种度量,请在查询的 where 子句中添加 event.record:name = <度量名称 >

SET EVENT_TABLE = 'TOOLS.PUBLIC.EVENTS';
SET APP_NAME = 'YOUR_APP_NAME';

SELECT
        event.record:name::string AS EVENT_NAME,
        span.record_attributes:task_reactor_instance::string AS INSTANCE_NAME,
        span.record_attributes:worker_id AS WORKER_ID,
        event.record_attributes:value AS DURATION
    FROM IDENTIFIER($EVENT_TABLE) event
    JOIN IDENTIFIER($EVENT_TABLE) span ON event.trace:span_id = span.trace:span_id AND event.record_type = 'SPAN_EVENT' AND span.record_type = 'SPAN'
    WHERE
        event.resource_attributes:"snow.database.name" = $APP_NAME
        AND event.record:name = 'TASK_REACTOR_WORKER_IDLE_TIME'
    ORDER BY event.timestamp DESC;
Copy
语言: 中文