教程:Snowflake Native SDK for Connectors Java 连接器模板¶
简介¶
欢迎来到我们的教程,了解如何借助 Snowflake Native SDK for Connectors 使用连接器模板。本指南将帮助您设置一个简单的连接器本机应用程序。
在本教程中,您将学习如何执行以下操作:
部署连接器本机应用程序
配置模板连接器以引入数据
根据自己的需要自定义模板连接器
模板包含各种有用的代码内注释,以便您更轻松地找到需要修改的特定文件。查找带有以下关键字的评论,它们将指导并帮助您实施自己的连接器:
TODO
TODO: HINT
TODO: IMPLEMENT ME
在开始本教程之前,您应该通过查看以下推荐内容做好准备:
先决条件¶
在开始之前,请确保您满足以下要求:
访问具有
ACCOUNTADMIN
角色的 Snowflake 账户查看 Snowflake Native SDK for Connectors,并在学习本教程时保持其打开状态
查看 教程:Snowflake Native SDK for Connectors 示例 Java 连接器
该教程使用了基于此模板的示例连接器,可参考该示例连接器查看各种组件的实现示例。
准备本地环境¶
在继续之前,您需要先确保计算机上安装了所有必要的软件,并克隆连接器模板。
Java 安装¶
Snowflake Native SDK for Connectors 需要 Java LTS(长期支持)版本 11 或更高版本。如果计算机上未安装所需的最低 Java 版本,则必须安装 Oracle Java 或 OpenJDK。
Oracle Java¶
JDK 的最新 LTS 版本可在 Oracle NFTC 下免费下载和使用。有关下载和安装说明,请访问 Oracle 页面 (https://www.oracle.com/java/technologies/downloads/)。
OpenJDK¶
OpenJDK 是 Java 的一个开源实施。有关下载和安装说明,请访问 openjdk.org (https://openjdk.org/install/) 和 jdk.java.net (https://jdk.java.net/)。
您也可以使用第三方 OpenJDK 版本,如 Eclipse Temurin (https://adoptium.net/temurin/releases/) 或 Amazon Corretto (https://aws.amazon.com/corretto/)。
Snowflake CLI 配置¶
构建、部署和安装连接器需要 Snowflake CLI 工具。如果计算机上没有 Snowflake CLI – 根据`此处 <https://docs.snowflake.cn/en/developer-guide/snowflake-cli/installation/installation>`_ 的说明进行安装。
安装该工具后 – 您需要在 配置文件 中配置与 Snowflake 的连接。
如果您没有配置任何连接 – 请创建一个名为 native_sdk_connection
的新连接。您可以在 deployment/snowflake.toml
文件中找到示例连接。
如果您已经配置了连接,并希望将其与连接器一起使用 – 在本教程中使用此连接时,请使用其名称而不是 native_sdk_connection
。
模板克隆¶
要克隆连接器模板,请使用以下命令:
snow init <project_dir> \
--template-source https://github.com/snowflakedb/connectors-native-sdk \
--template templates/connectors-native-sdk-template
在 <project_dir>
的位置输入创建连接器 Java 项目的目录名称(该名称必须不存在)。
执行命令后,系统会要求您提供有关应用程序实例和暂存区名称配置的更多信息。您可以提供任何名称,只要它们是有效的未加引号的 Snowflake 标识符,也可以点击 enter 以使用方括号中显示的默认值。
示例命令执行,提供自定义应用程序和暂存区名称:
$ snow init my_connector \
--template-source https://github.com/snowflakedb/connectors-native-sdk \
--template templates/connectors-native-sdk-template
Name of the application instance which will be created in Snowflake [connectors-native-sdk-template]: MY_CONNECTOR
Name of the schema in which the connector files stage will be created [TEST_SCHEMA]:
Name of the stage used to store connector files in the application package [TEST_STAGE]: CUSTOM_STAGE_NAME
Initialized the new project in my_connector
连接器的构建、部署和清理¶
即使未作任何修改,也可立即部署模板。以下各节将向您展示如何构建、部署和安装连接器。
构建连接器¶
构建使用 Snowflake Native SDK for Connectors 创建的连接器与构建典型的 Java 应用程序略有不同。除了使用源构建 .jar 存档之外,还必须进行一些其他操作。构建应用程序包括以下步骤:
将自定义内部组件复制到构建目录
将 SDK 组件复制到构建目录
复制内部组件¶
此步骤构建连接器 .jar 文件,然后将该文件(连同 UI、清单和设置文件一起)复制到 sf_build
目录。
要运行此步骤,请执行命令:./gradlew copyInternalComponents
。
复制 SDK 组件¶
此步骤将 SDK .jar 文件(作为依赖项添加到连接器 Gradle 模块)复制到 sf_build
目录,并从 .jar 存档中提取捆绑的 .sql 文件。
这些 .sql 文件允许自定义在应用程序安装期间将创建哪些提供的对象。初次使用时,不建议用户自定义,因为如果操作不正确,省略对象可能会导致某些功能失败。模板连接器应用程序使用 all.sql
文件,该文件创建所有推荐的 SDK 对象。
要运行此步骤,请执行命令:./gradlew copySdkComponents
。
部署连接器¶
要部署 Native App,需要在 Snowflake 内创建应用程序包。之后,需要将 sf_build
目录中的所有文件上传到 Snowflake。
请注意 – 出于开发目的,可以选择版本创建,可以直接从暂存文件创建应用程序实例。此方法允许您查看大多数连接器文件中的变化,无需重新创建版本和应用程序实例。
将执行以下操作:
创建新的应用程序包(如果不存在)
在包中创建架构和文件暂存区
将文件从
sf_build
目录上传到暂存区(此步骤可能需要一些时间)
要部署连接器,请执行命令 snow app deploy --connection=native_sdk_connection
。
有关 snow app deploy
命令的更多信息,请参阅 snow app deploy。
创建的应用程序包现在将在 App packages
选项卡、Data products
类别、账户的 Snowflake UI 中可见。
安装连接器¶
安装应用程序是该过程的最后一步。该步骤会使用之前创建的应用程序包创建应用程序。
要安装连接器,请执行命令:snow app run --connection=native_sdk_connection
。
有关 snow app run
命令的更多信息,请参阅 Snow 应用程序运行。
安装的应用程序现在将在 Installed apps
选项卡、Data products
类别、账户的 Snowflake UI 中可见。
更新连接器文件¶
如果在任何时候希望修改任何连接器文件,您可以轻松地将修改的文件上传到应用程包暂存区。上传命令取决于更新了哪些文件。
在运行任何更新命令之前,您必须通过运行以下命令将连接器的新文件复制到 sf_build
目录中:./gradlew copyInternalComponents
UI .py files or connector .java files¶
使用 snow app deploy --connection=native_sdk_connection
命令,当前应用程序实例将使用新文件,无需重新安装。
setup.sql 或 manifest.yml 文件¶
使用 snow app run --connection=native_sdk_connection
命令,将在新文件上传到暂存区后重新安装当前应用程序实例。
清理¶
完成教程后,或者出于某种原因想要删除应用程序及其包,可以使用以下命令将其从账户中彻底删除:
snow app teardown --connection=native_sdk_connection --cascade --force
需要使用 --cascade
选项才能移除目标数据库,而不将所有权转移给账户管理员。在真正的连接器中,请勿移除数据库,以保留引入的数据,所有权应归属于账户管理员,或者在卸载前转移所有权。
请注意 – 即使没有配置引入,连接器也会使用 credit,直到暂停或移除连接器!
先决条件步骤¶
安装后,连接器立即进入向导阶段。此阶段包括几个步骤,指导最终用户完成所有必要的配置。
第一步是先决条件步骤。它是可选的,并且可能不是每个连接器都需要。先决条件通常是应用程序之外的用户所需操作,例如在 SQL 工作表中运行查询、在源系统端进行配置等。
阅读有关先决条件的更多信息:先决条件
每个先决条件的内容直接从位于连接器内部的 STATE.PREREQUISITES
中检索。它们可以通过 setup.sql
脚本自定义。然而,请记住每次安装、升级和降级应用程序时都会执行 setup.sql
脚本。插入必须是幂等的,因此建议使用合并查询,如下例所示:
MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
('1',
'Sample prerequisite',
'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
'https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/prerequisites',
NULL,
NULL,
1
)
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);
连接器配置步骤¶
向导阶段的下一步是连接器配置步骤。在此步骤中,您可以配置连接器所需的数据库对象和权限。此步骤允许指定以下配置属性:
warehouse
operational_warehouse
cortex_warehouse
destination_database
destination_schema
global_schedule
data_owner_role
cortex_user_role
agent_username
agent_role
如果您需要任何其他自定义属性,可以在向导阶段的后续步骤之一中进行配置。有关每个属性的更多信息,请参阅:连接器配置
此外,Streamlit 组件 (streamlit/wizard/connector_config.py
) 模板中展示了如何触发 Native Apps Permission SDK 并向最终用户请求授权。只要可用的属性满足连接器的需求,就不需要覆盖任何后端类,尽管这仍然可以像配置的后续步骤中的组件一样。
有关内部过程和 Java 对象的更多信息,请参阅:连接器配置参考
所提供的 Streamlit 示例允许请求 manifest.yml
文件中配置的账户级权限 – CREATE DATABASE
和 EXECUTE TASKS
。它还允许用户通过 Permission SDK 弹出窗口指定仓库参考。
在模板中,只要求用户提供 destination_database
和 destination_schema
。然而,streamlit/wizard/connector_configuration.py
中的 TODO
注释包含注释代码,可以重复使用以在 Streamlit UI 中显示更多输入字段。
# TODO: Here you can add additional fields in connector configuration.
# For example:
st.subheader("Operational warehouse")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="operational_warehouse", label_visibility="collapsed")
st.caption("Name of the operational warehouse to be used")
连接配置步骤¶
向导阶段的下一步是连接配置步骤。此步骤允许最终用户配置连接器的外部连接参数。该配置可能包括密钥、集成等对象的标识符。
由于此信息取决于连接器引入的数据的源系统,所以这是在源代码中进行更多自定义的第一个位置。
有关连接配置的更多信息,请参阅:
从 Streamlit UI 侧 (streamlit/wizard/connection_config.py
) 开始,您需要为所有需要的参数添加文本输入。已为您实施了一个示例文本输入,如果您在此文件中搜索代码,您可以找到带有新字段注释代码的 TODO
。
# TODO: Additional configuration properties can be added to the UI like this:
st.subheader("Additional connection parameter")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="additional_connection_property", label_visibility="collapsed")
st.caption("Some description of the additional property")
将属性添加到表单后,需要将它们传递到连接器的后端层。为此,必须修改 Streamlit 文件中的另外两个位置。第一个是 streamlit/wizard/connection_config.py
文件中的 finish_config
功能。必须在这里读取新添加的文本输入的状态。此外,如果需要,还可以进行验证,然后传递给 set_connection_configuration
功能。
例如,如果添加了 additional_connection_property
,则编辑后看起来如下:
def finish_config():
try:
# TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function.
# The properties can also be validated, for example, check whether they are not blank strings etc.
response = set_connection_configuration(
custom_connection_property=st.session_state["custom_connection_property"],
additional_connection_property=st.session_state["additional_connection_property"],
)
# rest of the method without changes
然后必须编辑 set_connection_configuration
函数,以便在 streamlit/native_sdk_api/connection_config.py
文件中将其找到。此函数是 Streamlit UI 以及底层 SQL 过程之间的代理,它是连接器后端的入口点。
def set_connection_configuration(custom_connection_property: str, additional_connection_property: str):
# TODO: this part of the code sends the config to the backend so all custom properties need to be added here
config = {
"custom_connection_property": escape_identifier(custom_connection_property),
"additional_connection_property": escape_identifier(additional_connection_property),
}
return call_procedure(
"PUBLIC.SET_CONNECTION_CONFIGURATION",
[variant_argument(config)]
)
完成此操作后,新属性将保存在包含配置的内部连接器表中。然而,这并不意味着可能的自定义结束。一些后端组件也可以自定义,可以在代码中查找以下注释来找到它们:
TODO: IMPLEMENT ME connection configuration validate
TODO: IMPLEMENT ME connection callback
TODO: IMPLEMENT ME test connection
验证部分允许进行对从 UI 接收到的任何额外验证。它还可以转换数据,例如更改字符大小写、修剪所提供的数据,或检查具有所提供名称的对象是否确实存在于 Snowflake 中。
连接回调是允许您根据配置执行任何附加操作的部分,例如,使用 外部集成设置参考 中描述的解决方案,调整需要使用外部访问集成的过程。
测试连接是连接配置的最后一个组件,用于检查连接器和源系统之间是否可以建立连接。
有关这些内部组件的更多信息,请参阅:
示例实施可能如下所示:
public class TemplateConfigurationInputValidator implements ConnectionConfigurationInputValidator {
private static final String ERROR_CODE = "INVALID_CONNECTION_CONFIGURATION";
@Override
public ConnectorResponse validate(Variant config) {
// TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input
// requires some additional validation this is the place to implement this logic.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
var integrationCheck = checkParameter(config, INTEGRATION_PARAM, false);
if (!integrationCheck.isOk()) {
return integrationCheck;
}
var secretCheck = checkParameter(config, SECRET_PARAM, true);
if (!secretCheck.isOk()) {
return ConnectorResponse.error(ERROR_CODE);
}
return ConnectorResponse.success();
}
}
public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback {
private static final String[] EXTERNAL_SOURCE_PROCEDURE_SIGNATURES = {
asVarchar(format("%s.%s()", PUBLIC_SCHEMA, TEST_CONNECTION_PROCEDURE)),
asVarchar(format("%s.%s(VARIANT)", PUBLIC_SCHEMA, FINALIZE_CONNECTOR_CONFIGURATION_PROCEDURE)),
asVarchar(format("%s.%s(NUMBER, STRING)", PUBLIC_SCHEMA, WORKER_PROCEDURE))
};
private final Session session;
public TemplateConnectionConfigurationCallback(Session session) {
this.session = session;
}
@Override
public ConnectorResponse execute(Variant config) {
// TODO: If you need to alter some procedures with external access you can use
// configureProcedure method or implement a similar method on your own.
// TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application
// to be done after connection configuration, like altering procedures with external access.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
var response = configureProceduresWithReferences();
if (response.isNotOk()) {
return response;
}
return ConnectorResponse.success();
}
private ConnectorResponse configureProceduresWithReferences() {
return callProcedure(
session,
PUBLIC_SCHEMA,
SETUP_EXTERNAL_INTEGRATION_WITH_NAMES_PROCEDURE,
EXTERNAL_SOURCE_PROCEDURE_SIGNATURES);
}
}
public class TemplateConnectionValidator {
private static final String ERROR_CODE = "TEST_CONNECTION_FAILED";
public static Variant testConnection(Session session) {
// TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to
// the source system here. This usually requires connection to some webservice or other external
// system. It is suggested to perform only the basic connectivity validation here.
// If that's the case then this procedure must be altered in TemplateConnectionConfigurationCallback first.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
return test().toVariant();
}
private static ConnectorResponse test() {
try {
var response = SourceSystemHttpHelper.testEndpoint();
if (isSuccessful(response.statusCode())) {
return ConnectorResponse.success();
} else {
return ConnectorResponse.error(ERROR_CODE, "Connection to source system failed");
}
} catch (Exception exception) {
return ConnectorResponse.error(ERROR_CODE, "Test connection failed");
}
}
}
完成配置步骤¶
完成连接器配置步骤是向导阶段的最后一步。此步骤有多项职责:
允许用户指定连接器所需的任何其他配置
根据需要为引入的数据创建接收数据库、架构以及附加表和视图
初始化内部组件,例如调度程序和任务反应器
有关配置完成的更多信息,请参阅:
有关任务反应器和调度的更多信息,请参阅:
与连接配置步骤类似,可以使用 Streamlit UI 开始自定义。streamlit/wizard/finalize_config.py
文件包含具有示例属性的表单。根据连接器的需要,可以添加更多属性。要添加另一个属性,请寻找 TODO
注释,包含在所提及的文件中添加新属性的示例代码。
# TODO: Here you can add additional fields in finalize connector configuration.
# For example:
st.subheader("Some additional property")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="some_additional_property", label_visibility="collapsed")
st.caption("Description of some new additional property")
为文本输入添加新属性后,需要将其传递到后端。为此,需要修改同一文件中的 finalize_configuration
功能:
def finalize_configuration():
try:
st.session_state["show_main_error"] = False
# TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function.
response = finalize_connector_configuration(
st.session_state.get("custom_property"),
st.session_state.get("some_additional_property")
)
接下来,打开 streamlit/native_sdk_api/finalize_config.py
文件,在以下函数中添加新属性:
def finalize_connector_configuration(custom_property: str, some_additional_property: str):
# TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure.
config = {
"custom_property": custom_property,
"some_additional_property": some_additional_property,
}
return call_procedure(
"PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION",
[variant_argument(config)]
)
同样,与连接配置步骤类似,此步骤也允许自定义各种后端组件,可以在源代码中使用以下注释找到这些组件:
TODO: IMPLEMENT ME validate source
TODO: IMPLEMENT ME finalize internal
验证源部分负责对源系统执行更复杂的验证。如果前面的测试连接仅检查是否可以建立连接,那么验证源可以检查对系统中特定数据的访问,例如,提取单条数据记录。
Finalize Internal 是一个内部过程,负责初始化任务反应器和调度器、创建接收数据库和任何必要的嵌套对象。它还可用于保存在完成步骤中提供的配置(默认情况下不保存此配置)。
有关内部组件的更多信息,请参阅:
此外,还可以使用 FinalizeConnectorInputValidator
接口并将其提供给最终处理程序来验证输入 – 检查 TemplateFinalizeConnectorConfigurationCustomHandler
文件。有关使用构建器的更多信息,请参阅:存储过程和处理程序自定义。
验证源的示例实施可能如下所示:
public class SourceSystemAccessValidator implements SourceValidator {
@Override
public ConnectorResponse validate(Variant variant) {
// TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source
// system. In some cases this can be the same validation that happened in
// TemplateConnectionValidator.
// However, it is suggested to perform more complex validations, like specific access rights to
// some specific resources here.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/finalize_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/finalize_configuration
var finalizeProperties = Configuration.fromCustomConfig(variant);
var httpResponse = SourceSystemHttpHelper.validateSource(finalizeProperties.get("custom_property"));
return prepareConnectorResponse(httpResponse.statusCode());
}
private ConnectorResponse prepareConnectorResponse(int statusCode) {
switch (statusCode) {
case 200:
return ConnectorResponse.success();
case 401:
return ConnectorResponse.error("Unauthorized error");
case 404:
return ConnectorResponse.error("Not found error");
default:
return ConnectorResponse.error("Unknown error");
}
}
}
创建资源¶
向导阶段完成后,连接器即可开始引入数据。但首先,必须实施和配置资源。资源是描述源系统中特定数据集的抽象概念,例如表、端点、文件等。
不同的源系统可能需要有关资源的不同信息,因此需要根据具体需求自定义资源定义。要进行此操作,请访问 streamlit/daily_use/data_sync_page.py
文件。您可以在其中找到有关为资源参数添加文本输入的 TODO
注释。资源参数应该允许从源系统识别和检索数据。然后可以在引入过程中提取这些参数。
# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
"Resource name",
key="resource_name",
)
st.text_input(
"Some resource parameter",
key="some_resource_parameter"
)
将所有必要的属性添加到表单后,就可以将这些属性传递给后端。首先,必须提取文本字段的状态并将其传递给 streamlit/daily_use/data_sync_page.py
文件中的 API 级别的 queue_resource
方法:
def queue_resource():
# TODO: add additional properties here and pass them to create_resource function
resource_name = st.session_state.get("resource_name")
some_resource_parameter = st.session_state.get("some_resource_parameter")
if not resource_name:
st.error("Resource name cannot be empty")
return
result = create_resource(resource_name, some_resource_parameter)
if result.is_ok():
st.success("Resource created")
else:
st.error(result.get_message())
然后,需要更新 streamlit/native_sdk_api/resource_management.py
文件中的 create_resource
函数:
def create_resource(resource_name, some_resource_parameter):
ingestion_config = [{
"id": "ingestionConfig",
"ingestionStrategy": "INCREMENTAL",
# TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition.
"scheduleType": "INTERVAL",
"scheduleDefinition": "60m"
}]
# TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique.
resource_id = {
"resource_name": resource_name,
}
id = f"{resource_name}_{random_suffix()}"
# TODO: if you specified some additional resource parameters then you need to put them inside resource metadata:
resource_metadata = {
"some_resource_parameter": some_resource_parameter
}
return call_procedure("PUBLIC.CREATE_RESOURCE",
[
varchar_argument(id),
variant_argument(resource_id),
variant_list_argument(ingestion_config),
varchar_argument(id),
"true",
variant_argument(resource_metadata)
])
定制 CREATE_RESOURCE() 过程逻辑¶
PUBLIC.CREATE_RESOURCE()
过程允许开发人员通过实现插入到主执行流程多个位置的自定义逻辑来自定义其执行。SDK 允许开发人员执行以下操作:
在创建资源之前对其进行验证。逻辑应该在
PUBLIC.CREATE_RESOURCE_VALIDATE()
过程中实施。创建资源前执行自定义操作。逻辑应该在
PUBLIC.PRE_CREATE_RESOURCE()
过程中实施。创建资源后执行自定义操作。逻辑应该在
PUBLIC.POST_CREATE_RESOURCE()
过程中实施。
更多有关 PUBLIC.CREATE_RESOURCE()
过程自定义的信息可参阅此处:
TemplateCreateResourceHandler.java¶
该类是 PUBLIC.CREATE_RESOURCE()
过程的处理程序。在这里,您可以注入前面提到的回调过程的处理程序的 Java 实施。默认情况下,模板提供回调处理程序的模拟 Java 实现,以摆脱调用 SQL 存储过程的麻烦,因为这将延长存储过程的执行时间 – Java 实现使执行速度更快。这些模拟的实施除了返回成功响应之外不执行任何操作。您可以为模板准备的回调类提供自定义实施,也可以从头开始创建这些回调并将它们注入到处理程序构建器中的主要过程执行流中。
为了实现默认调用的回调方法的自定义逻辑,请在代码中查找以下注释:
TODO: IMPLEMENT ME create resource validate
TODO: IMPLEMENT ME pre create resource callback
TODO: IMPLEMENT ME post create resource callback
引入¶
要执行数据引入,您需要实现一个类来处理与源系统的连接,并根据资源配置检索数据。调度器和任务反应器模块将负责引入任务的触发和排队。
从 TemplateIngestion
类调用引入逻辑。查找代码中的 TODO: IMPLEMENT ME ingestion
注释,将随机数据生成替换为从源系统中检索数据。如果在资源定义中添加了自定义属性,则可以使用 ResourceIngestionDefinitionRepository
和 TemplateWorkItem
中提供的属性从内部连接器表中获取这些属性:
resourceIngestionDefinitionId
ingestionConfigurationId
从网络服务检索数据的示例 可能 如下所示:
public final class SourceSystemHttpHelper {
private static final String DATA_URL = "https://source_system.com/data/%s";
private static final SourceSystemHttpClient sourceSystemClient = new SourceSystemHttpClient();
private static final ObjectMapper objectMapper = new ObjectMapper();
private static List<Variant> fetchData(String resourceId) {
var response = sourceSystemClient.get(String.format(url, resourceId));
var body = response.body();
try {
return Arrays.stream(objectMapper.readValue(body, Map[].class))
.map(Variant::new)
.collect(Collectors.toList());
} catch (JsonProcessingException e) {
throw new RuntimeException("Cannot parse json", e);
}
}
}
public class SourceSystemHttpClient {
private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(15);
private final HttpClient client;
private final String secret;
public SourceSystemHttpClient() {
this.client = HttpClient.newHttpClient();
this.secret =
SnowflakeSecrets.newInstance()
.getGenericSecretString(ConnectionConfiguration.TOKEN_NAME);
}
public HttpResponse<String> get(String url) {
var request =
HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.header("Authorization", format("Bearer %s", secret))
.header("Content-Type", "application/json")
.timeout(REQUEST_TIMEOUT)
.build();
try {
return client.send(request, HttpResponse.BodyHandlers.ofString());
} catch (IOException | InterruptedException ex) {
throw new RuntimeException(format("HttpRequest failed: %s", ex.getMessage()), ex);
}
}
}
管理资源生命周期¶
一旦创建资源和引入资源的逻辑得以实现,就可以通过调用以下过程来管理其生命周期:
PUBLIC.ENABLE_RESOURCE()
启用特定资源,这意味着它将被安排用于引入PUBLIC.DISABLE_RESOURCE()
禁用特定资源,这意味着其引入调度将停止PUBLIC.UPDATE_RESOURCE()
允许您更新特定资源的引入配置。默认情况下,Streamlit UI 中未进行实施,因为有时开发人员可能不希望允许连接器用户自定义引入配置(撤销授予应用程序角色ADMIN
对此过程的权限以完全禁止其使用)。
所有这些过程都有 Java 处理程序,并通过回调进行扩展,使您可以自定义其执行。您可以使用这些处理程序的构建器注入回调的自定义实现。默认情况下,模板提供回调处理程序的模拟 Java 实现。这些模拟的实施除了返回成功响应之外不执行任何操作。您可以为模板准备的回调类提供自定义实施,也可以从头开始创建这些回调并将它们注入到处理程序构建器中的主要过程执行流中。
TemplateEnableResourceHandler.java¶
此类为 PUBLIC.ENABLE_RESOURCE()
过程的处理程序,可以使用专用于以下操作的回调进行扩展:
在启用资源之前对其进行验证。在代码中查找
TODO: IMPLEMENT ME enable resource validate
注释,以提供自定义实现。在启用资源前执行自定义操作。在代码中查找
TODO: IMPLEMENT ME pre enable resource
注释,以提供自定义实现。在启用资源后执行自定义操作。在代码中查找
TODO: IMPLEMENT ME post enable resource
注释,以提供自定义实现。
通过 PUBLIC.ENABLE_RESOURCE()
过程详细文档了解更多信息:
TemplateDisableResourceHandler.java¶
此类为 PUBLIC.DISABLE_RESOURCE()
过程的处理程序,可以使用专用于以下操作的回调进行扩展:
在禁用资源之前对其进行验证。在代码中查找
TODO: IMPLEMENT ME disable resource validate
注释,以提供自定义实现。在禁用资源前执行自定义操作。在代码中查找
TODO: IMPLEMENT ME pre disable resource
注释,以提供自定义实现。
通过 PUBLIC.DISABLE_RESOURCE()
过程详细文档了解更多信息:
TemplateUpdateResourceHandler.java¶
此类为 PUBLIC.UPDATE_RESOURCE()
过程的处理程序,可以使用专用于以下操作的回调进行扩展:
在更新资源之前对其进行验证。在代码中查找
TODO: IMPLEMENT ME update resource validate
注释,以提供自定义实现。在更新资源前执行自定义操作。在代码中查找
TODO: IMPLEMENT ME pre update resource
注释,以提供自定义实现。在更新资源后执行自定义操作。在代码中查找
TODO: IMPLEMENT ME post update resource
注释,以提供自定义实现。
通过 PUBLIC.UPDATE_RESOURCE()
过程详细文档了解更多信息:
设置¶
该模板包含一个设置选项卡,可让您查看之前进行的所有配置。但是,如果配置属性是自定义的,那么此视图也需要进行一些自定义。设置选项卡代码可以在 streamlit/daily_use/settings_page.py
文件中找到。
要进行自定义,只需从配置中提取在相应配置中添加的键的值即可。例如,如果之前在连接配置步骤中添加了 additional_connection_property
,那么可以在设置视图中添加它,如下所示:
def connection_config_page():
current_config = get_connection_configuration()
# TODO: implement the display for all the custom properties defined in the connection configuration step
custom_property = current_config.get("custom_connection_property", "")
additional_connection_property = current_config.get("additional_connection_property", "")
st.header("Connector configuration")
st.caption("Here you can see the connector connection configuration saved during the connection configuration step "
"of the Wizard. If some new property was introduced it has to be added here to display.")
st.divider()
st.text_input(
"Custom connection property:",
value=custom_property,
disabled=True
)
st.text_input(
"Additional connection property:",
value=additional_connection_property,
disabled=True
)
st.divider()