存储过程和处理程序自定义

Snowflake Native SDK for Connectors 提供了连接器的总体结构,但是,它可以根据源系统和开发者的实际需求进行一些自定义操作。出于这个原因,一些功能的基本实施为空,可以用自定义逻辑将其覆盖。此外,可以根据特定需求启用和禁用组件,更多信息请参阅“选择组件”章节。

存储过程

SDK 提供的存储过程可以分为两类:

  1. 在 Java 中实现的逻辑的高级别入口点

  2. 范围较小的内部过程

因为它们有不同的职责,所以定制过程也不同。

高级别过程定制

高级别过程仅作为在 Java 中实施的实际逻辑的入口点。因此,要更改底层逻辑,需要在重新创建存储过程时指定新处理程序的路径。需要将此过程作为自定义代码添加到 setup.sql 脚本中。这需要提供新的 Java 实施,可以从头开始编写,也可以使用在 SDK``builders`` 中提供的实施,如下所述:

CREATE OR REPLACE PROCEDURE PUBLIC.CONFIGURE_CONNECTOR(input VARIANT)
    RETURNS VARIANT
    LANGUAGE JAVA
    RUNTIME_VERSION = '11'
    PACKAGES = ('com.snowflake:snowpark:1.11.0')
    IMPORTS = ('/jar_with_custom_code.jar')
    HANDLER = 'com.custom.handler.CustomHandler.configure';
Copy

较小范围的过程定制

Snowflake Native SDK for Connectors 提供的某些过程的逻辑很少,可以仅使用 SQL 来轻松编写。对于这些过程,可以仅使用 SQL 来替换默认实施。例如,一些带有 _VALIDATE_INTERNAL 后缀的过程可以通过这种方式重新实施。所有这些过程也可以仅使用 Java 的方法,通过 Builders 进行自定义。下文解释了这种方法。也可以将一个只使用纯 SQL 的过程替换为使用处理程序的过程。在这种情况下,它将与上述高级别存储过程相同。

CREATE OR REPLACE PROCEDURE PUBLIC.CONFIGURE_CONNECTOR_INTERNAL(config VARIANT)
    RETURNS VARIANT
    LANGUAGE SQL
    EXECUTE AS OWNER
    AS
    BEGIN
        -- input some custom logic here
        RETURN OBJECT_CONSTRUCT('response_code', 'OK');
    END;
Copy

处理程序

Snowflake Native SDK for Connectors 定义了存储过程的默认处理程序。它们可以按原样使用、自定义使用或完全替换。对于后者,整个自定义实施不需要遵循 SDK 定义的标准,并且自定义实施需要在 SQL 中指定,如前文自定义高级别过程所述。

但是,如果您希望遵循由 SDK 定义的连接器流程,则可以仅自定义流程中的某些部分。每个现有处理程序都使用多个底层对象,在大多数情况下,这些对象是 validatorcallbackhelper 类。它们分别适合某种接口,可以用接口的自定义实施替换默认实施。

生成器

为了在自定义过程中在连接器中保留 SDK 定义的流程,提供了名为 builders 的辅助程序对象。每个 handler 类都捆绑了自己的 builder。它们允许用户提供底层 Java 对象的自定义实施。这样,开发者无需接触连接器的内部流程,可以专注于定制所需部分。使用 builders 时有一个小问题,这种方法还要求开发者指定将在存储过程中引用的新入口点方法。

例如,使用 builder 并采用自定义 validatorhandler 如下所示:

class CustomConnectionConfigurationInputValidator implements ConnectionConfigurationInputValidator {
    @Override
    public ConnectorResponse validate(Variant config) {
        // CUSTOM LOGIC
        return ConnectorResponse.success();
    }
}

class CustomHandler {

    // Path to this method needs to be specified in the PUBLIC.SET_CONNECTION_CONFIGURATION procedure using SQL
    public static Variant configureConnection(Session session, Variant configuration) {
            //Using builder
        var handler = ConnectionConfigurationHandler.builder(session)
            .withInputValidator(new CustomConnectionConfigurationInputValidator())
            .build();
        return handler.connectionConfiguration(configuration).toVariant();
    }
}
Copy

然后,如下所示指定 SQL 中的入口点方法:

CREATE OR REPLACE PROCEDURE PUBLIC.SET_CONNECTION_CONFIGURATION(input VARIANT)
    RETURNS VARIANT
    LANGUAGE JAVA
    RUNTIME_VERSION = '11'
    PACKAGES = ('com.snowflake:snowpark:1.11.0')
    IMPORTS = ('/jar_with_custom_code.jar')
    HANDLER = 'com.custom.handler.CustomHandler.configureConnection';
Copy
语言: 中文