设置 Openflow Connector for SQL Server

备注

This connector is subject to the Snowflake Connector Terms.

This topic describes how to set up the Openflow Connector for SQL Server.

For information on the incremental load process, see Incremental replication.

先决条件

Before setting up the connector, ensure that you have completed the following prerequisites:

  1. 确保您已查看 关于 Openflow Connector for SQL Server

  2. 确保您已查看 支持的 SQL 服务器版本

  3. Ensure that you have set up your runtime deployment. For more information, see the following topics:

  4. If you use Openflow - Snowflake Deployments, ensure that you have reviewed configuring required domains and have granted access to the required domains for the SQL Server connector.

Set up your SQL Server instance

Before setting up the connector, perform the following tasks in your SQL Server environment:

备注

You must perform these tasks as a database administrator.

  1. Enable change tracking on the databases (https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server?view=sql-server-ver16#enable-change-tracking-for-a-database) and tables (https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-tracking-sql-server?view=sql-server-ver16#enable-change-tracking-for-a-table) that you plan to replicate, as shown in the following SQL Server example:

    ALTER DATABASE <database>
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
    
    ALTER TABLE <schema>.<table>
      ENABLE CHANGE_TRACKING;
    

    备注

    Run these commands for every database and table that you plan to replicate.

    The connector requires that change tracking is enabled on the databases and tables before replication starts. Ensure that every table that you plan to replicate has enabled change tracking. You can also enable change tracking on additional tables while the connector is running.

  2. Create a login for the SQL Server instance:

    CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
    

    This login is used to create users for the databases you plan to replicate.

  3. Create a user for each database you are replicating by running the following SQL Server command in each database:

    USE <source_database>;
    CREATE USER <user_name> FOR LOGIN <user_name>;
    
  4. Grant the SELECT and VIEW CHANGE TRACKING permissions to the user for each database that you are replicating:

    GRANT SELECT ON <database>.<schema>.<table> TO <user_name>;
    GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
    

    Run these commands in each database for every table that you plan to replicate. These permissions must be granted to the user of each database that you created in a previous step.

  5. (Optional) Grant the VIEW DEFINITION privilege on the User Defined Data Types (UDDT).

    If your tables contain columns that use User Defined Data Types (UDDT), and the UDDT is owned by a different user than the connector user, you must grant the VIEW DEFINITION permission to the connector user as shown in the following SQL Server example:

    GRANT VIEW DEFINITION TO <user_name>;
    

    Without this permission, columns using UDDT are silently excluded from replication.

  6. (Optional) Configure SSL connection.

    If you use an SSL connection to connect SQL Server, create the root certificate for your database server. This is required when configuring the connector.

Set up your Snowflake environment

As a Snowflake administrator, perform the following tasks:

  1. Create a destination database in Snowflake to store the replicated data:

    CREATE DATABASE <destination_database>;
    
  2. Create a Snowflake service user:

    CREATE USER <openflow_user>
      TYPE = SERVICE
      COMMENT='Service user for automated access of Openflow';
    
  3. Create a Snowflake role for the connector and grant the required privileges:

    CREATE ROLE <openflow_role>;
    GRANT ROLE <openflow_role> TO USER <openflow_user>;
    GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
    GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
    

    Use this role to manage the connector's access to the Snowflake database.

    To create objects in the destination database, you must grant the USAGE and CREATE SCHEMA privileges on the database to the role used to manage access.

  4. Create a Snowflake warehouse for the connector and grant the required privileges:

    CREATE WAREHOUSE <openflow_warehouse> WITH
      WAREHOUSE_SIZE = 'XSMALL'
      AUTO_SUSPEND = 300
      AUTO_RESUME = TRUE;
    GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
    

    Snowflake recommends starting with a XSMALL warehouse size, then experimenting with size depending on the number of tables being replicated and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than a larger warehouse size. For more information, see multi-cluster warehouses.

  5. Set up the public and private keys for key pair authentication:

    1. Create a pair of secure keys (public and private).

    2. Store the private key for the user in a file to supply to the connector's configuration.

    3. Assign the public key to the Snowflake service user:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      

      有关更多信息,请参阅 密钥对身份验证和密钥对轮换

安装连接器

To install the connector, do the following as a data engineer:

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. 在 Openflow 连接器页面上,找到连接器并选择 Add to runtime

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    备注

    在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。

  4. 使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。

  5. 使用您的 Snowflake 账户凭据进行运行时身份验证。

此时将显示 Openflow 画布,其中添加了连接器进程组。

配置连接器

To configure the connector, do the following as a data engineer:

  1. 右键点击导入的进程组并选择 Parameters

  2. Populate the required parameter values as described in 流参数.

流参数

Start by setting the parameters of the SQLServer Source Parameters context, then the SQLServer Destination Parameters context. After you complete this, enable the connector. The connector connects to both SQLServer and Snowflake and starts running. However, the connector does not replicate any data until any tables to be replicated are explicitly added to its configuration.

要为复制配置特定的表,请编辑 SQLServer 引入参数上下文。将更改应用到 SQLServer 引入参数上下文后,连接器会选择配置,并且将为每张表启动复制生命周期。

SQLServer 源参数上下文

参数

描述

SQLServer Connection URL

指向源数据库的完整 JDBC URL。

示例:

  • jdbc:sqlserver://example.com:1433;encrypt=false;

SQLServer JDBC Driver

Select the Reference asset checkbox to upload the SQL Server JDBC driver (https://learn.microsoft.com/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server).

SQLServer Username

The user name for the connector.

SQLServer Password

连接器的密码。

SQLServer 目标参数上下文

参数

描述

必填

目标数据库

The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

Snowflake 身份验证策略

使用以下方式时:

  • Snowflake Openflow Deployment or BYOC: Use SNOWFLAKE_MANAGED_TOKEN. This token is managed automatically by Snowflake. BYOC deployments must have previously configured runtime roles to use SNOWFLAKE_MANAGED_TOKEN.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Snowflake 账户标识符

使用以下方式时:

  • 会话令牌身份验证策略:必须留空。

  • KEY_PAIR: Snowflake account name formatted as [organization-name]-[account-name] where data is persisted.

Snowflake Connection Strategy

When using KEY_PAIR, specify the strategy for connecting to Snowflake:

  • STANDARD (default): Connect using standard public routing to Snowflake services.

  • PRIVATE_CONNECTIVITY: Connect using private addresses associated with the supporting cloud platform such as AWS PrivateLink.

Required for BYOC with KEY_PAIR only, otherwise ignored.

Snowflake Object Identifier Resolution

Specifies how source object identifiers such as schemas, tables, and columns names are stored and queried in Snowflake. This setting dictates whether you must use double quotes in SQL queries.

Option 1: Default, case-insensitive (recommended).

  • Transformation: All identifiers are converted to uppercase. For example, My_Table becomes MY_TABLE.

  • Queries: SQL queries are case-insensitive and don't require SQL double quotes.

    For example SELECT * FROM my_table; returns the same results as SELECT * FROM MY_TABLE;.

备注

Snowflake recommends using this option if database objects are not expected to have mixed case names.

重要

Do not change this setting after connector ingestion has begun. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance.

Option 2: case-sensitive.

  • Transformation: Case is preserved. For example, My_Table remains My_Table.

  • Queries: SQL queries must use double quotes to match the exact case for database objects. For example, SELECT * FROM "My_Table";.

备注

Snowflake recommends using this option if you must preserve source casing for legacy or compatibility reasons. For example, if the source database includes table names that differ in case only, such as MY_TABLE and my_table, that result in a name collision when using case-insensitive comparisons.

Snowflake 私钥

使用以下方式时:

  • 会话令牌身份验证策略:必须留空。

  • KEY_PAIR:必须是用于身份验证的 RSA 私钥。

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

Snowflake 私钥文件

使用以下方式时:

  • Session token authentication strategy: The private key file must be blank.

  • KEY_PAIR:上传包含用于向 Snowflake 进行身份验证的 RSA 私钥的文件,该文件应根据 PKCS8 标准格式化,并包含标准的 PEM 页眉和页脚。页眉行以 -----BEGIN PRIVATE 开头。要上传私钥文件,请选中 Reference asset 复选框。

Snowflake 私钥密码

使用以下方式时:

  • 会话令牌身份验证策略:必须留空。

  • KEY_PAIR:提供与 Snowflake 私钥文件关联的密码。

Snowflake 角色

使用以下方式时:

  • Session Token Authentication Strategy: Use Snowflake Role assigned to the runtime or child role granted to this Snowflake Role. You can find your runtime Snowflake Role in the Openflow UI, by expanding the More Options [⋮] button for your runtime and selecting Set Snowflake role.

  • KEY_PAIR 身份验证策略:使用为您的服务用户配置的有效角色。

Snowflake 用户名

使用以下方式时:

  • 会话令牌身份验证策略:必须留空。

  • KEY_PAIR:提供用于连接到 Snowflake 实例的用户名。

Snowflake 仓库

用于运行查询的 Snowflake 仓库。

SQLServer 引入参数上下文

参数

描述

包括表名

A comma-separated list of source table paths, including their databases and schemas, for example:

database_1.public.table_1, database_2.schema_2.table_2

包括表正则表达式

A regular expression to match against table paths, including database and schema names. Every path matching the expression is replicated, and new tables matching the pattern that are created later are also included automatically, for example:

database_name\.public\.auto_.*

筛选器 JSON

A JSON containing a list of fully-qualified table names and a regex pattern for column names that should be included into replication.

The following example includes all columns that end with name in table1 from the public schema in the my_db database:

[ {"database":"my_db", "schema":"public", "table":"table1", "includedPattern":".*name"} ]

合并任务计划 CRON

定义触发从日志到目标表的合并任务的 CRON 表达式。如果您想持续合并或按照计划时间来限制仓库运行时间,请将其设置为 * * * * * ?

例如:

  • 字符串 * 0 * * * ? 表示您要在每小时整点计划合并,持续一分钟

  • 字符串 * 20 14 ? * MON-FRI 表示您计划在周一到周五每天 2:20 PM 触发合并

有关其他信息和示例,请参阅 Quartz 文档 (https://www.quartz-scheduler.org/documentation/quartz-2.2.2/tutorials/tutorial-lesson-06.html) 中的 cron 触发教程

Replicate tables from a SQL Server replica server

The connector can ingest data from a primary server or from a subscriber server using transactional replication (https://learn.microsoft.com/en-us/sql/relational-databases/replication/transactional/transactional-replication). Before configuring the connector to connect to a SQL Server replica, ensure that replication between the primary and replica nodes works correctly. When investigating issues with missing data in the connector, first ensure that missing rows and change tracking events are present in the replica server used by the connector.

To ensure continuity, make sure that the same connection user is available on both the primary and replica servers and has access to the data and change tracking tables.

To configure the connector to read from a subscriber server instead of the publisher, specify the subscriber server URL in the SQLServer Connection URL parameter.

警告

Do not change the database server after replication has started. Each database maintains its own change tracking state independently, so switching to a different server would cause the connector to lose track of which changes have already been processed, and may result in data loss.

Restart table replication

A table in FAILED state --- for example, due to a missing primary key or unsupported schema change --- does not restart automatically. If a table enters a FAILED state or you need to restart replication from scratch, use the following procedure to remove and re-add the table to replication.

备注

If the failure was caused by an issue in the source table such as a missing primary key, resolve that issue in the source database before continuing.

  1. Remove the table from flow parameters: In the Ingestion Parameters context, either remove the table from the Included Table Names or modify the Included Table Regex so the table is no longer matched.

  2. Verify the table has been removed:

    1. In the Openflow runtime canvas, right-click a processor group and choose Controller Services.

    2. In the table listing controller services, locate the Table State Store row, click the three vertical dots on the right side of the row, then choose View State.

    重要

    You must wait until the table's state is fully removed from this list before proceeding. Do not continue until this configuration change has completed.

  3. Clean up the destination: Once the table's state shows as fully removed, manually DROP the destination table in Snowflake. Note that the connector will not overwrite an existing destination table during the snapshot phase; if the table still exists, replication will fail again. Optionally, the journal table and stream can also be removed if they are no longer needed.

  4. Re-add the table: Update the Included Table Names or Included Table Regex parameters to include the table again.

  5. Verify the restart: Check the Table State Store using the instructions given previously. The state of the table should appear with the status NEW, then transition to SNAPSHOT_REPLICATION, and finally INCREMENTAL_REPLICATION.

复制表中列的子集

The connector filters the data replicated per table to a subset of configured columns.

要对列应用筛选器,请在复制参数上下文中修改列筛选器属性,添加一个配置数组,每个条目对应一张需要筛选列的表。

Include or exclude columns by name or pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.

以下示例显示了可用的字段。schematable 字段是必填字段。必须至少填写以下其中一个字段:includedexcludedincludedPatternexcludedPattern

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]

Replicate a partitioned table

The connector supports replication of partitioned tables. A SQL Server partitioned table is replicated into Snowflake as a single destination table, containing data from all partitions.

To replicate a partitioned table, ensure that change tracking is enabled on the partitioned table, as described in Set up your SQL Server instance.

跟踪表中的数据变化

The connector replicates the current state of data from the source tables, as well as detected changes from each polling interval. This data is stored in journal tables created in the same schema as the destination table.

备注

Because the connector uses SQL Server Change Tracking, multiple updates to the same row between polling intervals are rolled up into a single change. Journal tables reflect the net result of changes, not every intermediate state. For more information, see 关于 Openflow Connector for SQL Server.

The journal table names are formatted as: <source_table_name>_JOURNAL_<timestamp>_<schema_generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema_generation> is an integer increasing with every schema change on the source table. As a result, source tables that undergo schema changes will have multiple journal tables.

When you remove a table from replication, then add it back, the <timestamp> value changes, and <schema_generation> starts again from 1.

重要

Snowflake recommends not altering the structure of journal tables in any way. The connector uses them to update the destination table as part of the replication process.

The connector never drops journal tables, but uses the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:

  • Truncate all journal tables at any time.

  • Drop the journal tables related to source tables that were removed from replication.

  • Drop all but the latest generation journal tables for actively replicated tables.

For example, if your connector is set to actively replicate source table orders, and you have earlier removed table customers from replication, you may have the following journal tables. In this case you can drop all of them except orders_5678_2.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

配置合并任务的调度

连接器使用仓库将变更数据捕获 (CDC) 数据合并到目标表中。此操作由 MergeSnowflakeJournalTable 处理器触发。如果没有新的更改,或者 MergeSnowflakeJournalTable 队列中没有新的待处理的 FlowFile,则不会触发合并,仓库会自动暂停。

Use the CRON expression in the Merge task Schedule CRON parameter to limit the warehouse cost and limit merges to only scheduled time. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy).

运行流

  1. 右键点击“飞机”图标并选择 Enable all Controller Services

  2. 右键点击导入的进程组并选择 Start。连接器开始数据引入。