任务反应器 SQL 参考

数据库对象和过程

以下数据库对象是通过文件 task_reactor.sql 创建的。

TASK_REACTOR SCHEMA

包含连接器中任务反应器的某些数据库对象的版本化架构。

TASK_REACTOR_INSTANCES SCHEMA

包含连接器中任务反应器的某些实例数据库对象的非版本化架构。

TASK_REACTOR_INSTANCES.INSTANCE_REGISTRY

创建此表是为了存储有关任务反应器实例的数据,以便提供在应用程序运行时跟踪和管理现有实例的能力。该表是在 TASK_REACTOR_INSTANCES 架构中创建的。

  • instance_name VARCHAR

  • is_initialized BOOLEAN

  • is_active BOOLEAN

TASK_REACTOR.DISPATCHER(INSTANCE_SCHEMA_NAME VARCHAR)

此过程会调用 Java DispatcherHandler.dispatchWorkItems 并允许调度工作项。

TASK_REACTOR.SET_WORKERS_NUMBER(WORKERS_NUMBER NUMBER, INSTANCE_SCHEMA_NAME VARCHAR)

此过程会调用 Java SetWorkersNumberHandler.setWorkersNumber 并允许设置处理器数。

TASK_REACTOR.CREATE_INSTANCE_OBJECTS

输入参数:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • WORKER_PROCEDURE_NAME VARCHAR

  • WORK_SELECTOR_TYPE VARCHAR

  • WORK_SELECTOR_NAME VARCHAR

  • EXPIRED_WORK_SELECTOR_NAME VARCHAR

过程会创建准确 Task reactor 流所需的所有实例对象,并验证那些不应进行初始化的对象。过程结束时,它会将新的实例注册表记录插入到表中。

可能的错误包括:

  • INSTANCE_NOT_FOUND – 具有此名称的实例不存在。

  • INSTANCE_ALREADY_INITIALIZED – 具有此名称的实例已进行初始化。

  • DEFAULT_PROCEDURE_VALIDATION_EXCEPTION – 找不到过程。

  • SCHEMA_WITH_THE_SAME_NAME_ALREADY_EXISTS – 具有相同名称的架构已存在。

  • CREATING_TR_INSTANCE_EXCEPTION – 创建新任务反应器实例时发生意外错误。尚未创建实例。

TASK_REACTOR.INITIALIZE_INSTANCE

输入参数:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • WAREHOUSE_NAME VARCHAR

  • DT_SHOULD_BE_STARTED BOOLEAN

  • DT_TASK_SCHEDULE VARCHAR

  • DT_ALLOW_OVERLAPPING_EXECUTION BOOLEAN

  • DT_USER_TASK_TIMEOUT_MS VARCHAR

过程会启动同一数据库实例内的所有未初始化实例。它包括检查实例是否存在,或者实例是否尚未初始化,然后创建调度程序任务,并在需要时启动此任务。

以下状态表示过程成功结束:

{
    "response_code": "OK",
    "message": "Instance has been initialized successfully."
}
Copy

可能的错误包括:

  • INSTANCE_NOT_FOUND – 实例不存在。

  • INSTANCE_ALREADY_INITIALIZED – 具有此名称的实例已进行初始化。

TASK_REACTOR.PAUSE_INSTANCE

输入参数:

  • INSTANCE_SCHEMA VARCHAR

过程会启动暂停任务反应器给定实例的过程并返回 OK 响应。它会启动一个作业,该作业会异步停止所有处理器任务和调度程序任务。如果某个处理器任务已经在执行引入,则该任务不会立即停止,而是在引入完成后停止。

备注

暂停连接器 中已经使用了此过程的逻辑,因此不需要将此过程用作停止整个连接器的一部分。

以下状态表示过程成功结束:

{
    "response_code": "OK"
}
Copy

TASK_REACTOR.RESUME_INSTANCE

输入参数:

  • INSTANCE_SCHEMA VARCHAR

过程会启动恢复任务反应器给定实例的过程,并返回 OK 响应。它将恢复调度程序任务并启动一个作业,该作业会异步恢复所有已分配工作的处理器任务。

备注

恢复连接器 中已经使用了此过程的逻辑,因此不需要将此过程用作恢复整个连接器的一部分。

以下状态表示过程成功结束:

{
    "response_code": "OK"
}
Copy

TASK_REACTOR.UPDATE_WAREHOUSE_INSTANCE

输入参数:

  • WAREHOUSE_NAME VARCHAR

  • INSTANCE_SCHEMA VARCHAR

过程会启动任务反应器给定实例的仓库更改过程。它会更改调度程序任务的仓库,然后启动一个作业,该作业会异步更改所有处理器任务的仓库。

备注

更新仓库 中已经使用了此过程的逻辑,因此不需要将此过程用作整个更新连接器的仓库的一部分。

以下状态表示过程成功结束:

{
    "response_code": "OK"
}
Copy

可能的错误包括:

  • INSTANCE_NOT_FOUND – 给定实例不存在。

  • TASK_REACTOR_INSTANCE_IS_ACTIVE – 给定任务反应器实例在使用此过程之前尚未暂停。

内部过程

以下所有过程仅用于 task_reactor 设置脚本的内部使用,不应在外部使用。

TASK_REACTOR.CREATE_INSTANCE_SCHEMA(INSTANCE_SCHEMA_NAME VARCHAR)

此过程使用名为 instance_schema_name 的标识符创建新架构,然后,如果无法创建架构,则会引发新异常。

可能的错误包括:

  • SCHEMA_WITH_THE_SAME_NAME_ALREADY_EXISTS – 具有相同名称的架构已存在。

TASK_REACTOR.VALIDATE_PROCEDURE_EXISTENCE

输入参数:

  • PROCEDURE_NAME VARCHAR

  • PROCEDURE_TYPE VARCHAR

此过程用于验证定义的过程是否不存在,然后引发新异常。

可能的错误包括:

  • WORKER_PROCEDURE_NOT_FOUND_EXCEPTION – 找不到处理器过程。

  • WORK_SELECTOR_PROCEDURE_NOT_FOUND_EXCEPTION – 找不到工作选择过程。

  • DEFAULT_PROCEDURE_VALIDATION_EXCEPTION – 找不到过程。

TASK_REACTOR.CREATE_QUEUE

输入参数:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • TABLE_NAME VARCHAR

  • STREAM_NAME VARCHAR

Task Reactor 的辅助方法,它会提供创建具有名称 instance_schema_name.table_name 和以下列的队列表:

  • ID STRING

  • RESOURCE_ID STRING

  • DISPATCHER_OPTIONS VARIANT

  • WORKER_PAYLOAD VARIANT

  • TIMESTAMP DATETIME

然后,如果名称为 instance_schema_name.stream_name 的流尚不存在,该辅助方法会创建该流。

TASK_REACTOR.CREATE_WORKER_REGISTRY_SEQUENCE

输入参数:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • SEQUENCE_NAME VARCHAR

Task Reactor 的辅助方法,它提供为处理器注册表创建序列的功能,其名称为 instance_schema_name.sequence_name 序列。

TASK_REACTOR.CREATE_WORKER_REGISTRY

输入参数:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • TABLE_NAME VARCHAR

  • SEQUENCE_NAME VARCHAR

Task Reactor 的辅助方法,它提供创建处理器注册表的功能,该注册表由表(名称为 instance_schema_name.table_name)和列组成:

  • 具有默认 instance_schema_name.sequence_name 序列的 WORKER_ID NUMBER

  • CREATED_AT DATETIME

  • UPDATED_AT DATETIME

  • STATUS STRING

TASK_REACTOR.CREATE_WORKER_STATUS_TABLE

输入参数:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • TABLE_NAME VARCHAR

Task Reactor 的辅助方法,它提供为处理器创建状态表的功能,该表的名称为 instance_schema_name.table_name 并具有以下列:

  • WORKER_ID NUMBER

  • TIMESTAMP DATETIME

  • STATUS STRING

TASK_REACTOR.CREATE_CONFIG_TABLE

输入参数:

  • INSTANCE_SCHEMA_NAME VARCHAR

  • TABLE_NAME VARCHAR

  • WORKER_PROCEDURE_NAME VARCHAR

  • WORK_SELECTOR_TYPE VARCHAR

  • WORK_SELECTOR_NAME VARCHAR

  • EXPIRED_WORK_SELECTOR_NAME VARCHAR

  • IS_INSTANCE_REGISTERED BOOLEAN

Task Reactor 的辅助方法,它提供创建名为 instance_schema_name.table_name 的配置表的功能,其中包含键和值列。然后,如果尚未使用以下值注册配置数据,这些数据将插入表中:

  • WORKER_PROCEDURE

  • WORK_SELECTOR_TYPE

  • WORK_SELECTOR

  • SCHEMA

语言: 中文