设置 Openflow Connector for SQL Server¶
备注
使用该连接器需遵守 连接器条款。
本主题介绍设置 Openflow Connector for SQL Server 的步骤。
先决条件¶
确保您已查看 支持的 SQL 服务器版本。
Ensure that you have 设置 Openflow - BYOC or 设置 Openflow - Snowflake 部署 - 任务概述.
作为数据库管理员,请执行以下任务:
启用 数据库 (https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server?view=sql-server-ver16#enable-change-tracking-for-a-database) 及 表 (https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server?view=sql-server-ver16#enable-change-tracking-for-a-table) 的变更跟踪。连接器要求在复制开始之前对数据库和表启用变更跟踪。确保您计划复制的每张表都启用了变更跟踪。您还可以在连接器运行时为更多表启用变更跟踪。请参阅以下代码片段:
ALTER DATABASE <database> SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON); ALTER TABLE <schema>.<table> ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON);
为连接器创建用户。连接器需要一个对复制表拥有 VIEW CHANGE TRACKING 权限的用户。为该用户提供访问连接器配置的密码。
CREATE LOGIN <user_name> WITH PASSWORD = <password>; CREATE USER <user_name> FOR LOGIN <user_name>; GRANT SELECT ON <schema>.<table> TO <user_name>; GRANT VIEW CHANGE TRACKING ON <schema>.<table> TO <user_name>;
通过 SSL 连接。如果您计划使用 SSL 连接到 SQL 服务器,请为数据库服务器准备根证书。在配置期间这是必需的。
作为 Snowflake 账户管理员,执行以下任务:
创建一个类型为 SERVICE 的 Snowflake 用户。创建一个用于存储复制数据的数据库,并为该 Snowflake 用户设置在该数据库中创建对象所需的权限,即授予 USAGE 和 CREATE SCHEMA 权限。
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
创建安全密钥对(公钥和私钥)。将用户的私钥存储在文件中,以提供给连接器的配置。将公钥分配给 Snowflake 服务用户:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
有关更多信息,请参阅 密钥对身份验证和密钥对轮换。
指定一个仓库供连接器使用。从
MEDIUM
仓库规模开始,然后根据要复制的表的数量和传输的数据量尝试规模。相较于扩大仓库规模,采用 多集群仓库 通常能更有效地应对海量表数量的扩展需求。
设置连接器¶
作为数据工程师,执行以下任务以配置连接器:
安装连接器¶
导航到 Openflow“Overview”页面。在 Featured connectors 部分中,选择 View more connectors。
在 Openflow 连接器页面上,找到连接器并选择 Add to runtime。
在 Select runtime 对话框中,从 Available runtimes 下拉列表中选择您的运行时。
选择 Add。
备注
在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。
使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。
使用您的 Snowflake 账户凭据进行运行时身份验证。
此时将显示 Openflow 画布,其中添加了连接器进程组。
配置连接器¶
您可以为以下用例配置连接器:
实时复制一组表¶
右键点击导入的进程组并选择 Parameters。
按照 流参数 中所述填充所需的参数值。
流参数¶
首先设置 SQLServer 源参数上下文的参数,然后设置 SQLServer 目标参数上下文的参数。完成此操作后,您可以启用连接器。连接器应同时连接到 SQLServer 和Snowflake,并开始运行。但是,在将任何要复制的表明确添加到其配置中之前,连接器不会复制任何数据。
要为复制配置特定的表,请编辑 SQLServer 引入参数上下文。将更改应用到 SQLServer 引入参数上下文后,连接器会选择配置,并且将为每张表启动复制生命周期。
SQLServer 源参数上下文¶
参数 |
描述 |
---|---|
SQL Server Connection URL |
指向源数据库的完整 JDBC URL。 示例:
|
SQL Server JDBC Driver |
Select the Reference asset checkbox to upload the SQL Server JDBC driver (https://learn.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server). |
SQL Server SSL Mode |
启用或禁用 SSL 连接。 |
SQL Server Root SSL Certificate |
数据库的根证书的全部内容。如果已禁用 SSL,则为可选。 |
SQL Server Username |
连接器的用户名。 |
SQL Server Password |
连接器的密码。 |
SQLServer 目标参数上下文¶
参数 |
描述 |
必填 |
---|---|---|
目标数据库 |
用于永久保存数据的数据库。它必须已经存在于 Snowflake 中 |
是 |
Snowflake 账户标识符 |
使用以下方式时:
|
是 |
Snowflake 身份验证策略 |
使用以下方式时:
|
是 |
Snowflake 私钥 |
使用以下方式时:
|
否 |
Snowflake 私钥文件 |
使用以下方式时:
|
否 |
Snowflake 私钥密码 |
使用以下方式时:
|
否 |
Snowflake 角色 |
使用以下方式时:
|
是 |
Snowflake 用户名 |
使用以下方式时:
|
是 |
Snowflake 仓库 |
Snowflake warehouse used to run queries. |
是 |
SQLServer 引入参数上下文¶
参数 |
描述 |
---|---|
包括表名 |
以逗号分隔的表路径列表,包括其架构。示例: |
包括表正则表达式 |
用于匹配表路径的正则表达式。与表达式匹配的每个路径都将被复制,并且还将自动包括与稍后创建的模式相匹配的新表。示例: |
筛选器 JSON |
一个 JSON,包含一个完全限定的表名列表和一个用于列名的正则表达式模式,这些列名应该包含在复制中。示例: |
合并任务计划 CRON |
定义触发从日志到目标表的合并任务的 CRON 表达式。如果您想持续合并或按照计划时间来限制仓库运行时间,请将其设置为 例如:
有关其他信息和示例,请参阅 Quartz 文档 (https://www.quartz-scheduler.org/documentation/quartz-2.2.2/tutorials/tutorial-lesson-06.html) 中的 cron 触发教程 |
从复制中移除并重新添加表¶
要从复制中移除表,请确保在复制参数上下文中的 包括表名
或 包括表正则表达式
参数里移除。
如果您想稍后将该表重新添加到复制中,请先在 Snowflake 中删除相应的目标表。然后,再将该表添加回 包括表名
或 包括表正则表达式
参数中。这样可以确保表的复制过程重新开始。
此方法还可用于从失败的表复制场景中恢复复制。
复制表中列的子集¶
连接器可以将每张表复制的数据筛选到已配置列的子集。
要对列应用筛选器,请在复制参数上下文中修改列筛选器属性,添加一个配置数组,每个条目对应一张需要筛选列的表。
可以按名称或模式包含或排除列。您可以对每张表应用一个条件,也可以合并多个条件,排除项的优先级始终高于包含项。
以下示例显示了可用的字段。schema
和 table
字段是必填字段。必须至少填写以下其中一个字段:included
、excluded
、includedPattern
、excludedPattern
。
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
跟踪表中的数据变化¶
连接器不仅复制源表中数据的当前状态,还复制每行的中间状态。但是,由于使用的变更跟踪机制,无法保证每行的所有中间状态都被复制。
这些数据存储在与目标表相同的架构中创建的日志表中。
日志表名称的格式为:<source table name>_JOURNAL_<schema generation>
这里的 <schema generation> 是一个整数,随着源表的每一次架构更改而递增。这意味着经过架构更改的源表将有多个日志表。
重要
Snowflake 建议您不要以任何方式更改日志表或其中的数据。作为复制过程的一部分,连接器使用它们来更新目标表。
配置合并任务的调度¶
连接器使用仓库将变更数据捕获 (CDC) 数据合并到目标表中。此操作由 MergeSnowflakeJournalTable 处理器触发。如果没有新的更改,或者 MergeSnowflakeJournalTable 队列中没有新的待处理的 FlowFile,则不会触发合并,仓库会自动暂停。
要限制仓库成本并将合并仅限于预定时间,可以在合并任务计划 CRON 参数中使用 CRON 表达式。它限制了流向 MergeSnowflakeJournalTable 处理器的 FlowFile,并且只能在特定的时间段内触发合并。有关调度的更多信息,请参阅 调度策略 (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy)。
运行流¶
右键点击“飞机”图标并选择 Enable all Controller Services。
右键点击导入的进程组并选择 Start。连接器开始数据引入。