设置 Openflow Connector for PostgreSQL

备注

This connector is subject to the Snowflake Connector Terms.

本主题介绍设置 Openflow Connector for PostgreSQL 的步骤。

备注

This connector can be configured to immediately start replicating incremental changes for newly added tables, bypassing the snapshot load phase. This option is often useful when reinstalling the connector in an account where previously replicated data exists and you want to continue replication without having to re-snapshot tables.

For details on the incremental load process, see Incremental replication.

先决条件

  1. 确保您已查看 关于 Openflow Connector for PostgreSQL

  2. 确保您已查看 支持的 PostgreSQL 版本

  3. Recommended: Ensure that you add only one connector instance per runtime.

  4. Ensure that you have 设置 Openflow - BYOC or Set up Openflow - Snowflake Deployments.

  5. If using Openflow - Snowflake Deployments, ensure that you've reviewed configuring required domains and have granted access to the required domains for the PostgreSQL connector.

  6. 作为数据库管理员,请执行以下任务:

    1. 配置 wal_level

    2. 创建发布

    3. 确保 PostgreSQL 服务器上有足够的磁盘空间用于 WAL。这是因为复制槽创建后,将导致 PostgreSQL 会保留从该复制槽所记录的位置开始的 WAL 数据,直到连接器确认并推进该位置。

    4. 确保启用复制的每张表都有主键。键可以是单个列或组合。

    5. 将表的 REPLICA IDENTITY (https://www.postgresql.org/docs/current/sql-altertable.html#SQL-ALTERTABLE-REPLICA-IDENTITY) 设置为 DEFAULT。这可确保主键在 WAL 中表示,并且连接器可以读取它们。

    6. 为连接器创建用户。连接器要求用户具有 REPLICATION 属性,以及对所有需要复制的表拥有 SELECT 权限。使用密码创建该用户,以进入连接器的配置。有关复制安全的更多信息,请参阅 安全 (https://www.postgresql.org/docs/current/logical-replication-security.html)。

  7. 作为 Snowflake 账户管理员,请执行以下任务:

    1. 创建一个类型为 SERVICE 的 Snowflake 用户。创建一个用于存储复制数据的数据库,并为该 Snowflake 用户设置在该数据库中创建对象所需的权限,即授予 USAGE 和 CREATE SCHEMA 权限

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
        WITH
          WAREHOUSE_SIZE = 'XSMALL'
          AUTO_SUSPEND = 300
          AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. 创建安全密钥对(公钥和私钥)。将用户的私钥存储在文件中,以便在配置连接器时使用。将公钥分配给 Snowflake 服务用户:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      有关更多信息,请参阅 密钥对身份验证

    3. Designate a warehouse for the connector to use. Start with the XSMALL warehouse size, then experiment with size depending on the amount of tables being replicated, and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than the warehouse size.

配置 wal_level

Openflow Connector for PostgreSQL 要求将 wal_level (https://www.postgresql.org/docs/current/runtime-config-wal.html#GUC-WAL-LEVEL) 设置为 logical

根据 PostgreSQL 服务器的托管位置,您可以按如下方式配置 wal_level:

本地

以超级用户或具有 ALTER SYSTEM 权限的用户身份执行以下查询:

ALTER SYSTEM SET wal_level = logical;
Copy

RDS

代理使用的用户需要已分配 rds_superuserrds_replication 角色。

您还需要执行以下操作:

  • rds.logical_replication 静态参数设置为 1。

  • 根据数据库和复制设置,设置 max_replication_slotsmax_connectionsmax_wal_senders 参数。

AWS Aurora

rds.logical_replication 静态参数设置为 1。

GCP

设置以下标记:

  • cloudsql.logical_decoding=on

  • cloudsql.enable_pglogical=on

有关更多信息,请参阅 Google Cloud 文档 (https://cloud.google.com/sql/docs/postgres/replication/configure-logical-replication#set-up-logical-replication-with-pglogical)。

Azure

将复制支持设置为 Logical。有关更多信息,请参阅 Azure 文档 (https://learn.microsoft.com/en-us/azure/postgresql/single-server/concepts-logical#set-up-your-server)。

创建发布

在开始复制之前,Openflow Connector for PostgreSQL 要求您在 PostgreSQL 中创建并配置一个 发布 (https://www.postgresql.org/docs/current/logical-replication-publication.html#LOGICAL-REPLICATION-PUBLICATION)。您可以为所有表或表的子集以及仅具有指定列的特定表创建发布。确保您计划复制的每张表和列都包含在发布中。您也可以稍后在连接器运行时修改发布。要创建和配置发布,请执行以下操作:

  1. 以具有 CREATE 权限的用户身份登录数据库,然后运行以下查询:

    • For PostgreSQL 13 and later:

      CREATE PUBLICATION <publication name> WITH (publish_via_partition_root = true);
      
      Copy

      The additional publish_via_partition_root is needed for correct replication of partitioned tables. To learn more about ingestion of partitioned tables see Replicate a partitioned table.

    • For PostgreSQL versions earlier than 13:

      CREATE PUBLICATION <publication name>;
      
      Copy
  2. 使用以下命令定义数据库代理可见的表:

ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Copy

For partitioned tables, it's enough to just add the root partition table to the publication. See Replicate a partitioned table for more details.

重要

** PostgreSQL 15 及更高版本** 支持为表列的指定子集配置发布。为使连接器正确支持此功能,必须使用 列筛选设置 来包含与发布上设置的相同的列。

如果没有此设置,连接器的行为将如下所示:

  • 在目标表中,未包含在筛选器中的列将以 __DELETED 作为后缀。在快照阶段复制的所有数据都将保留下来。

  • After you add new columns to the publication, the table will be permanently failed, and you will need to restart its replication.

有关更多信息,请参阅 ALTER PUBLICATION (https://www.postgresql.org/docs/current/sql-alterpublication.html) 。

安装连接器

To install the connector, do the following as a data engineer:

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. 在 Openflow 连接器页面上,找到连接器并选择 Add to runtime

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    备注

    在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。

  4. 使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。

  5. 使用您的 Snowflake 账户凭据进行运行时身份验证。

此时将显示 Openflow 画布,其中添加了连接器进程组。

配置连接器

To configure the connector, do the following as a data engineer:

  1. 右键点击导入的进程组并选择 Parameters

  2. Populate the required parameter values as described in 流参数.

流参数

Start with setting the parameters of the PostgreSQL Source Parameters context, then the PostgreSQL Destination Parameters context. Once this is done, you can enable the connector, and it should connect both to PostgreSQL and Snowflake and start running. However, it will not replicate any data until any tables are explicitly added to its configuration.

要为复制配置特定的表,请编辑 PostgreSQL 引入参数上下文。在对复制参数上下文应用更改后不久,连接器会检测到这些配置,并为每张表启动复制生命周期。

PostgreSQL 源参数上下文

参数

描述

PostgreSQL Connection URL

指向源数据库的完整 JDBC URL。示例:jdbc:postgresql://example.com:5432/public

如果您要连接到 PostgreSQL 副本服务器,请参阅 从 PostgreSQL 副本服务器复制表

PostgreSQL JDBC Driver

PostgreSQL JDBC 驱动程序 jar (https://jdbc.postgresql.org/) 的路径。从其网站下载 jar,然后选中 Reference asset 复选框将其上传并附加。

PostgreSQL Username

连接器的用户名。

PostgreSQL Password

连接器的密码。

Publication Name

The name of the publication you created earlier.

Replication Slot Name

Optional. When no value is provided, the connector will create a new, uniquely-named slot. When given a value, the connector will use the existing slot, or create a new one with the provided name.

Changing the value for a running connector will restart reading the incremental change data capture (CDC) stream from the updated slot's position.

PostgreSQL 目标参数上下文

参数

描述

必填

目标数据库

用于永久保存数据的数据库。它必须已存在于 Snowflake 中。名称区分大小写。对于未加引号的标识符,请以大写形式提供名称。

Snowflake 身份验证策略

使用以下方式时:

  • Snowflake Openflow Deployment or BYOC: Use SNOWFLAKE_MANAGED_TOKEN. This token is managed automatically by Snowflake. BYOC deployments must have previously configured runtime roles to use SNOWFLAKE_MANAGED_TOKEN.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Snowflake 账户标识符

使用以下方式时:

  • 会话令牌身份验证策略:必须留空。

  • KEY_PAIR:Snowflake 账户名称格式为 [organization-name]-[account-name],数据永久保存在其中。

Snowflake Connection Strategy

When using KEY_PAIR, specify the strategy for connecting to Snowflake:

  • STANDARD (default): Connect using standard public routing to Snowflake services.

  • PRIVATE_CONNECTIVITY: Connect using private addresses associated with the supporting cloud platform such as AWS PrivateLink.

Required for BYOC with KEY_PAIR only, otherwise ignored.

Snowflake 私钥

使用以下方式时:

  • 会话令牌身份验证策略:必须留空。

  • KEY_PAIR:必须是用于身份验证的 RSA 私钥。

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

Snowflake 私钥文件

使用以下方式时:

  • 会话令牌身份验证策略:私钥文件必须为空。

  • KEY_PAIR:上传包含用于向 Snowflake 进行身份验证的 RSA 私钥的文件,该文件应根据 PKCS8 标准格式化,并包含标准的 PEM 页眉和页脚。页眉行以 -----BEGIN PRIVATE 开头。要上传私钥文件,请选中 Reference asset 复选框。

Snowflake 私钥密码

使用以下方式时:

  • 会话令牌身份验证策略:必须留空。

  • KEY_PAIR:提供与 Snowflake 私钥文件关联的密码。

Snowflake 角色

使用以下方式时:

  • Session Token Authentication Strategy: Use Snowflake Role assigned to the runtime or child role granted to this Snowflake Role. You can find your runtime Snowflake Role in the Openflow UI, by expanding the More Options [⋮] button for your runtime and selecting Set Snowflake role.

  • KEY_PAIR 身份验证策略:使用为您的服务用户配置的有效角色。

Snowflake 用户名

使用以下方式时:

  • 会话令牌身份验证策略:必须留空。

  • KEY_PAIR:提供用于连接到 Snowflake 实例的用户名。

Oversized Value Strategy

Determines how the connector handles values that exceed its internal size limits (16 MB) during replication. Possible values are:

  • Fail Table (default): The table is marked as permanently failed, and replication stops for that table.

  • Set Null: The value is replaced with NULL in the destination table. Use this to prevent table failures when it is acceptable to lose data in tables beyond the oversized value.

Snowflake 仓库

用于运行查询的 Snowflake 仓库。

PostgreSQL 引入参数上下文

参数

描述

包括表名

以逗号分隔的表路径列表,包括其架构。示例:public.my_table, other_schema.other_table

按名称或正则表达式选择表。如果同时使用这两个选项,则将包含任一选项中所有匹配的表。

Tables being sub-partitions are always excluded from ingestion. See Replicate a partitioned table for more information.

包括表正则表达式

用于匹配表路径的正则表达式。与表达式匹配的每个路径都将被复制,并且还将自动包括与稍后创建的模式相匹配的新表。示例:public\.auto_.*

按名称或正则表达式选择表。如果同时使用这两个选项,则将包含任一选项中所有匹配的表。

Tables being sub-partitions are always excluded from ingestion. See Replicate a partitioned table for more information.

列筛选器 JSON

可选。一个 JSON,包含一个完全限定的表名列表和一个用于列名的正则表达式模式,这些列名应该包含在复制中。示例:[ {"schema":"public", "table":"table1", "includedPattern":".*name"} ] 将包含 public 架构的 table1 中所有以 name 结尾的列。

合并任务计划 CRON

定义触发从日志到目标表的合并任务的 CRON 表达式。如果您想持续合并或按照计划时间来限制仓库运行时间,请将其设置为 * * * * * ?

例如:

  • 字符串 * 0 * * * ? 表示您要在每小时整点计划合并,持续一分钟

  • 字符串 * 20 14 ? * MON-FRI 表示您计划在周一到周五每天 2:20 PM 触发合并

有关其他信息和示例,请参阅 Quartz 文档 (https://www.quartz-scheduler.org/documentation/quartz-2.2.2/tutorials/tutorial-lesson-06.html) 中的 cron 触发教程

Object Identifier Resolution

Specifies how source object identifiers such as the names of schemas, tables, and columns are stored and queried in Snowflake. This setting specifies that you must use double quotes in SQL queries.

Option 1: Default, case-sensitive. For backwards compatibility.

  • Transformation: Case is preserved. For example, My_Table remains My_Table.

  • Queries: SQL queries must use double quotes to match the exact case for database objects. For example, SELECT * FROM "My_Table";.

备注

Snowflake recommends using this option if you must preserve source casing for legacy or compatibility reasons. For example, if the source database includes table names that differ in case only--such as MY_TABLE and my_table--that would result in a name collision when using when using case-insensitive comparisons.

Option 2: Recommended, case-insensitive

  • Transformation: All identifiers are converted to uppercase. For example, My_Table becomes MY_TABLE.

  • Queries: SQL queries are case-insensitive and don't require SQL double quotes. For example, SELECT * FROM my_table; returns the same results as SELECT * FROM MY_TABLE;.

备注

Snowflake recommends using this option if database objects are not expected to have mixed case names.

重要

Do not change this setting after the connector has begun ingesting data. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance.

从 PostgreSQL 副本服务器复制表

连接器可以使用 逻辑复制 (https://www.postgresql.org/docs/current/logical-replication.html) 从主服务器、热备用副本 (https://www.postgresql.org/docs/current/hot-standby.html),或订阅者服务器引入数据。在配置连接器以连接到 PostgreSQL 副本之前,确保主节点和副本节点之间的复制正常工作。在调查连接器中缺失数据的问题时,首先确保连接器使用的副本服务器中存在缺失的行。

连接到备用副本时的其他注意事项:

  • 仅支持连接到热备用副本。请注意,在提升为主实例之前,热备用副本无法接受来自客户端的连接。

  • 服务器的 PostgreSQL 版本必须 >= 16。

  • 连接器所需的 发布 必须在主服务器上创建,而不是在备用服务器上创建。备用服务器是只读的,不允许创建发布。

如果您连接到热备用实例并看到 Trying to create the replication slot '<replication slot>' timed out.If connecting to a standby instance, ensure there is some traffic on the primary PostgreSQL instance, otherwise the call to create a replication slot will never return. 错误,或者 Read PostgreSQL CDC Stream 处理器未启动,请登录主 PostgreSQL 实例并执行以下查询:

SELECT pg_log_standby_snapshot();
Copy

当主服务器中没有数据变更时,会发生此错误。因此,在副本服务器上创建复制槽时,连接器可能会出现卡顿。这是因为副本服务器需要有关主服务器正在运行的事务的信息,才能创建复制槽。主服务器在空闲时不会发送信息。pg_log_standby_snapshot() 函数强制主服务器将有关正在运行的事务的信息发送到副本服务器。

Restart table replication

A table in FAILED state --- for example, due to a missing primary key or unsupported schema change --- does not restart automatically. If a table enters a FAILED state or you need to restart replication from scratch, use the following procedure to remove and re-add the table to replication.

备注

If the failure was caused by an issue in the source table such as a missing primary key, resolve that issue in the source database before continuing.

  1. Remove the table from flow parameters: In the Ingestion Parameters context, either remove the table from the Included Table Names or modify the Included Table Regex so the table is no longer matched.

  2. Verify the table has been removed:

    1. In the Openflow runtime canvas, right-click a processor group and choose Controller Services.

    2. In the table listing controller services, locate the Table State Store row, click the three vertical dots on the right side of the row, then choose View State.

    重要

    You must wait until the table's state is fully removed from this list before proceeding. Do not continue until this configuration change has completed.

  3. Clean up the destination: Once the table's state shows as fully removed, manually DROP the destination table in Snowflake. Note that the connector will not overwrite an existing destination table during the snapshot phase; if the table still exists, replication will fail again. Optionally, the journal table and stream can also be removed if they are no longer needed.

  4. Re-add the table: Update the Included Table Names or Included Table Regex parameters to include the table again.

  5. Verify the restart: Check the Table State Store using the instructions given previously. The state of the table should appear with the status NEW, then transition to SNAPSHOT_REPLICATION, and finally INCREMENTAL_REPLICATION.

复制表中列的子集

连接器可以将每张表复制的数据筛选到已配置列的子集。

要对列应用筛选器,请在复制参数上下文中修改 Column Filter 属性,添加一个配置数组,每个条目对应一张需要筛选列的表。

可以按名称或模式包含或排除列。您可以对每张表应用一个条件,也可以合并多个条件,排除项的优先级始终高于包含项。

以下示例显示了可用的字段。schematable 是必填项,且必须至少填写以下其中一个字段:includedexcludedincludedPatternexcludedPattern

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Replicate a partitioned table

The connector supports replication of partitioned tables for PostgreSQL servers with version >= 15. A PostgreSQL partitioned table will be replicated into Snowflake as a single destination table.

For example, if you have a partitioned table orders, with sub-partitions orders_2023, orders_2024, and configured the connector to ingest all tables matching orders.* pattern, then only the orders table will be replicated to Snowflake, and it will include data from all sub-partitions.

To support replication of partitioned tables, ensure that the publication created in PostgreSQL has the publish_via_partition_root option set to true.

Ingestion of partitioned tables has currently the following limitations:

  • When a table is attached as a partition to a partitioned table after ingestion was started, the connector won't fetch data that existed in the partition table before attaching.

  • When a sub-partition table is detached from the partitioned table after ingestion was started, the connector won't mark the data from this sub-partition as deleted in the root partition table.

  • Truncate operation on subpartitions will not mark affected records as deleted.

跟踪表中的数据变化

连接器不仅复制源表中数据的当前状态,还复制每个变更集中每行的每个状态。这些数据存储在与目标表相同的架构中创建的日志表中。

The journal table names are formatted as: <source_table_name>_JOURNAL_<timestamp>_<schema_generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema_generation> is an integer increasing with every schema change on the source table. As a result, source tables that undergo schema changes will have multiple journal tables.

When a table is removed from replication, then added back, the <timestamp> value will change, and <schema_generation> will start again from 1.

重要

Snowflake recommends that you do not alter the structure of journal tables in any way. They are used by the connector to update the destination table as part of the replication process.

The connector never drops journal tables, but does make use of the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:

  • Truncate all journal tables at any time.

  • Drop the journal tables related to source tables that were removed from replication.

  • Drop all but the latest generation journal tables for actively replicated tables.

例如,如果您的连接器设置为主动复制源表 orders,并且您之前已将表 customers 从复制中移除,则可能存在以下日志表:在这种情况下,您可以把它们全部删除,仅保留 orders_5678_2

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

配置合并任务的调度

连接器使用仓库将变更数据捕获 (CDC) 数据合并到目标表中。此操作由 MergeSnowflakeJournalTable 处理器触发。如果没有新的更改,或者 MergeSnowflakeJournalTable 队列中没有新的待处理的 FlowFile,则不会触发合并,仓库会自动暂停。

要限制仓库成本并将合并仅限于预定时间,可以在合并任务计划 CRON 参数中使用 CRON 表达式。它限制了流向 MergeSnowflakeJournalTable 处理器的 FlowFile,并且只能在特定的时间段内触发合并。有关调度的更多信息,请参阅 调度策略 (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy)。

停止或删除连接器

停止或移除连接器时,需要注意连接器使用的 复制槽 (https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS)。

连接器会创建自己的复制槽,其名称以 snowflake_connector_ 开头,后跟随机后缀。当连接器读取复制流时会推进复制槽的位置,使 PostgreSQL 能够清理 WAL 日志并释放磁盘空间。

当连接器暂停时,复制槽不会被推进,对源数据库的更改会不断增加 WAL 日志大小。因此,不建议长时间暂停连接器,尤其是在高流量数据库中。

移除连接器时,无论是将其从 Openflow 画布中删除,还是通过任何其他方式(例如删除整个 Openflow 实例),复制槽都将保持原位,必须手动删除。

如果您有多个连接器实例从同一个 PostgreSQL 数据库进行复制,则每个实例将创建自己的唯一命名的复制槽。手动删除复制槽时,请确保删除的是正确的复制槽。通过检查 CaptureChangePostgreSQL 处理器的状态,您可以查看给定连接器实例使用的是哪个复制槽。

运行流

  1. 右键点击“飞机”图标并选择 Enable all Controller Services

  2. 右键点击导入的进程组并选择 Start。连接器开始数据引入。