设置 Openflow Connector for SQL Server

备注

使用该连接器需遵守 连接器条款

本主题介绍设置 Openflow Connector for SQL Server 的步骤。

先决条件

  1. 确保您已查看 关于 Openflow Connector for SQL Server

  2. 确保您已查看 支持的 SQL 服务器版本

  3. Ensure that you have 设置 Openflow - BYOC or 设置 Openflow - Snowflake 部署 - 任务概述.

  4. 作为数据库管理员,请执行以下任务:

    1. 启用 数据库 (https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server?view=sql-server-ver16#enable-change-tracking-for-a-database) 及 表 (https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server?view=sql-server-ver16#enable-change-tracking-for-a-table) 的变更跟踪。连接器要求在复制开始之前对数据库和表启用变更跟踪。确保您计划复制的每张表都启用了变更跟踪。您还可以在连接器运行时为更多表启用变更跟踪。请参阅以下代码片段:

      ALTER DATABASE <database>
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
      
      ALTER TABLE <schema>.<table>
      ENABLE CHANGE_TRACKING
      WITH (TRACK_COLUMNS_UPDATED = ON);
      
      Copy
    2. 为连接器创建用户。连接器需要一个对复制表拥有 VIEW CHANGE TRACKING 权限的用户。为该用户提供访问连接器配置的密码。

      CREATE LOGIN <user_name> WITH PASSWORD = <password>;
      CREATE USER <user_name> FOR LOGIN <user_name>;
      GRANT SELECT ON <schema>.<table> TO <user_name>;
      GRANT VIEW CHANGE TRACKING ON <schema>.<table> TO <user_name>;
      
      Copy
    3. 通过 SSL 连接。如果您计划使用 SSL 连接到 SQL 服务器,请为数据库服务器准备根证书。在配置期间这是必需的。

  5. 作为 Snowflake 账户管理员,执行以下任务:

    1. 创建一个类型为 SERVICE 的 Snowflake 用户。创建一个用于存储复制数据的数据库,并为该 Snowflake 用户设置在该数据库中创建对象所需的权限,即授予 USAGE 和 CREATE SCHEMA 权限

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
               WAREHOUSE_SIZE = 'MEDIUM'
               AUTO_SUSPEND = 300
               AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. 创建安全密钥对(公钥和私钥)。将用户的私钥存储在文件中,以提供给连接器的配置。将公钥分配给 Snowflake 服务用户:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      有关更多信息,请参阅 密钥对身份验证和密钥对轮换

    3. 指定一个仓库供连接器使用。从 MEDIUM 仓库规模开始,然后根据要复制的表的数量和传输的数据量尝试规模。相较于扩大仓库规模,采用 多集群仓库 通常能更有效地应对海量表数量的扩展需求。

设置连接器

作为数据工程师,执行以下任务以配置连接器:

安装连接器

  1. 导航到 Openflow“Overview”页面。在 Featured connectors 部分中,选择 View more connectors

  2. 在 Openflow 连接器页面上,找到连接器并选择 Add to runtime

  3. Select runtime 对话框中,从 Available runtimes 下拉列表中选择您的运行时。

  4. 选择 Add

    备注

    在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。

  5. 使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。

  6. 使用您的 Snowflake 账户凭据进行运行时身份验证。

此时将显示 Openflow 画布,其中添加了连接器进程组。

配置连接器

您可以为以下用例配置连接器:

实时复制一组表

  1. 右键点击导入的进程组并选择 Parameters

  2. 按照 流参数 中所述填充所需的参数值。

流参数

首先设置 SQLServer 源参数上下文的参数,然后设置 SQLServer 目标参数上下文的参数。完成此操作后,您可以启用连接器。连接器应同时连接到 SQLServer 和Snowflake,并开始运行。但是,在将任何要复制的表明确添加到其配置中之前,连接器不会复制任何数据。

要为复制配置特定的表,请编辑 SQLServer 引入参数上下文。将更改应用到 SQLServer 引入参数上下文后,连接器会选择配置,并且将为每张表启动复制生命周期。

SQLServer 源参数上下文

参数

描述

SQL Server Connection URL

指向源数据库的完整 JDBC URL。

示例:

  • jdbc:sqlserver://example.com:1433;encrypt=false;databaseName=<example_database>

SQL Server JDBC Driver

Select the Reference asset checkbox to upload the SQL Server JDBC driver (https://learn.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server).

SQL Server SSL Mode

启用或禁用 SSL 连接。

SQL Server Root SSL Certificate

数据库的根证书的全部内容。如果已禁用 SSL,则为可选。

SQL Server Username

连接器的用户名。

SQL Server Password

连接器的密码。

SQLServer 目标参数上下文

参数

描述

必填

目标数据库

用于永久保存数据的数据库。它必须已经存在于 Snowflake 中

Snowflake 账户标识符

使用以下方式时:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR: Snowflake account name formatted as [organization-name]-[account-name] where data will be persisted.

Snowflake 身份验证策略

使用以下方式时:

  • Snowflake Openflow 部署: 使用 SNOWFLAKE_SESSION_TOKEN。此令牌由 Snowflake 自动管理。

  • BYOC:使用 KEY_PAIR 作为身份验证策略的值。

Snowflake 私钥

使用以下方式时:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR:必须是用于身份验证的 RSA 私钥。

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined.

Snowflake 私钥文件

使用以下方式时:

  • 会话令牌身份验证策略: 私钥文件必须为空。

  • KEY_PAIR: Upload the file that contains the RSA private key used for authentication to Snowflake, formatted according to PKCS8 standards and including standard PEM headers and footers. The header line begins with -----BEGIN PRIVATE. To upload the private key file, select the Reference asset checkbox.

Snowflake 私钥密码

使用以下方式时:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR: Provide the password associated with the Snowflake Private Key File.

Snowflake 角色

使用以下方式时:

  • 会话令牌身份验证策略:使用运行时角色。您可以在 Openflow UI 中,通过导航至您的运行时的 View Details 来找到您的运行时角色。

  • KEY_PAIR 身份验证策略:使用为您的服务用户配置的有效角色。

Snowflake 用户名

使用以下方式时:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR: Provide the user name used to connect to the Snowflake instance.

Snowflake 仓库

Snowflake warehouse used to run queries.

SQLServer 引入参数上下文

参数

描述

包括表名

以逗号分隔的表路径列表,包括其架构。示例:public.my_table, other_schema.other_table

包括表正则表达式

用于匹配表路径的正则表达式。与表达式匹配的每个路径都将被复制,并且还将自动包括与稍后创建的模式相匹配的新表。示例:public\.auto_.*

筛选器 JSON

一个 JSON,包含一个完全限定的表名列表和一个用于列名的正则表达式模式,这些列名应该包含在复制中。示例:[ {"schema":"public", "table":"table1", "includedPattern":".*name"} ] 将包括 public 架构的 table1 中所有以 name 结尾的列。

合并任务计划 CRON

定义触发从日志到目标表的合并任务的 CRON 表达式。如果您想持续合并或按照计划时间来限制仓库运行时间,请将其设置为 * * * * * ?

例如:

  • 字符串 * 0 * * * ? 表示您要在每小时整点计划合并,持续一分钟

  • 字符串 * 20 14 ? * MON-FRI 表示您计划在周一到周五每天 2:20 PM 触发合并

有关其他信息和示例,请参阅 Quartz 文档 (https://www.quartz-scheduler.org/documentation/quartz-2.2.2/tutorials/tutorial-lesson-06.html) 中的 cron 触发教程

从复制中移除并重新添加表

要从复制中移除表,请确保在复制参数上下文中的 包括表名包括表正则表达式 参数里移除。

如果您想稍后将该表重新添加到复制中,请先在 Snowflake 中删除相应的目标表。然后,再将该表添加回 包括表名包括表正则表达式 参数中。这样可以确保表的复制过程重新开始。

此方法还可用于从失败的表复制场景中恢复复制。

复制表中列的子集

连接器可以将每张表复制的数据筛选到已配置列的子集。

要对列应用筛选器,请在复制参数上下文中修改列筛选器属性,添加一个配置数组,每个条目对应一张需要筛选列的表。

可以按名称或模式包含或排除列。您可以对每张表应用一个条件,也可以合并多个条件,排除项的优先级始终高于包含项。

以下示例显示了可用的字段。schematable 字段是必填字段。必须至少填写以下其中一个字段:includedexcludedincludedPatternexcludedPattern

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

跟踪表中的数据变化

连接器不仅复制源表中数据的当前状态,还复制每行的中间状态。但是,由于使用的变更跟踪机制,无法保证每行的所有中间状态都被复制。

这些数据存储在与目标表相同的架构中创建的日志表中。

日志表名称的格式为:<source table name>_JOURNAL_<schema generation>

这里的 <schema generation> 是一个整数,随着源表的每一次架构更改而递增。这意味着经过架构更改的源表将有多个日志表。

重要

Snowflake 建议您不要以任何方式更改日志表或其中的数据。作为复制过程的一部分,连接器使用它们来更新目标表。

配置合并任务的调度

连接器使用仓库将变更数据捕获 (CDC) 数据合并到目标表中。此操作由 MergeSnowflakeJournalTable 处理器触发。如果没有新的更改,或者 MergeSnowflakeJournalTable 队列中没有新的待处理的 FlowFile,则不会触发合并,仓库会自动暂停。

要限制仓库成本并将合并仅限于预定时间,可以在合并任务计划 CRON 参数中使用 CRON 表达式。它限制了流向 MergeSnowflakeJournalTable 处理器的 FlowFile,并且只能在特定的时间段内触发合并。有关调度的更多信息,请参阅 调度策略 (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy)。

运行流

  1. 右键点击“飞机”图标并选择 Enable all Controller Services

  2. 右键点击导入的进程组并选择 Start。连接器开始数据引入。

语言: 中文