设置 Openflow Connector for PostgreSQL

备注

This connector is subject to the Snowflake Connector Terms.

本主题介绍设置 Openflow Connector for PostgreSQL 的步骤。

先决条件

  1. 确保您已查看 关于 Openflow Connector for PostgreSQL

  2. 确保您已查看 支持的 PostgreSQL 版本

  3. Recommended: Ensure that you add only one connector instance per runtime.

  4. Ensure that you have 设置 Openflow - BYOC or Set up Openflow - Snowflake Deployments.

  5. If using Openflow - Snowflake Deployments, ensure that you've reviewed configuring required domains and have granted access to the required domains for the PostgreSQL connector.

  6. 作为数据库管理员,请执行以下任务:

    1. 配置 wal_level

    2. 创建发布

    3. 确保 PostgreSQL 服务器上有足够的磁盘空间用于 WAL。这是因为复制槽创建后,将导致 PostgreSQL 会保留从该复制槽所记录的位置开始的 WAL 数据,直到连接器确认并推进该位置。

    4. 确保启用复制的每张表都有主键。键可以是单个列或组合。

    5. 将表的 REPLICA IDENTITY (https://www.postgresql.org/docs/current/sql-altertable.html#SQL-ALTERTABLE-REPLICA-IDENTITY) 设置为 DEFAULT。这可确保主键在 WAL 中表示,并且连接器可以读取它们。

    6. 为连接器创建用户。连接器要求用户具有 REPLICATION 属性,以及对所有需要复制的表拥有 SELECT 权限。使用密码创建该用户,以进入连接器的配置。有关复制安全的更多信息,请参阅 安全 (https://www.postgresql.org/docs/current/logical-replication-security.html)。

  7. 作为 Snowflake 账户管理员,请执行以下任务:

    1. 创建一个类型为 SERVICE 的 Snowflake 用户。创建一个用于存储复制数据的数据库,并为该 Snowflake 用户设置在该数据库中创建对象所需的权限,即授予 USAGE 和 CREATE SCHEMA 权限

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
        WITH
          WAREHOUSE_SIZE = 'MEDIUM'
          AUTO_SUSPEND = 300
          AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. 创建安全密钥对(公钥和私钥)。将用户的私钥存储在文件中,以便在配置连接器时使用。将公钥分配给 Snowflake 服务用户:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      有关更多信息,请参阅 密钥对身份验证

    3. 指定一个仓库供连接器使用。从 MEDIUM 仓库规模开始,然后根据要复制的表的数量和传输的数据量尝试规模。相较于扩大仓库规模,采用 多集群仓库 通常能更有效地应对海量表数量的扩展需求。

配置 wal_level

Openflow Connector for PostgreSQL 要求将 wal_level (https://www.postgresql.org/docs/current/runtime-config-wal.html#GUC-WAL-LEVEL) 设置为 logical

根据 PostgreSQL 服务器的托管位置,您可以按如下方式配置 wal_level:

本地

以超级用户或具有 ALTER SYSTEM 权限的用户身份执行以下查询:

ALTER SYSTEM SET wal_level = logical;
Copy

RDS

代理使用的用户需要已分配 rds_superuserrds_replication 角色。

您还需要执行以下操作:

  • rds.logical_replication 静态参数设置为 1。

  • 根据数据库和复制设置,设置 max_replication_slotsmax_connectionsmax_wal_senders 参数。

AWS Aurora

rds.logical_replication 静态参数设置为 1。

GCP

设置以下标记:

  • cloudsql.logical_decoding=on

  • cloudsql.enable_pglogical=on

有关更多信息,请参阅 Google Cloud 文档 (https://cloud.google.com/sql/docs/postgres/replication/configure-logical-replication#set-up-logical-replication-with-pglogical)。

Azure

将复制支持设置为 Logical。有关更多信息,请参阅 Azure 文档 (https://learn.microsoft.com/en-us/azure/postgresql/single-server/concepts-logical#set-up-your-server)。

创建发布

在开始复制之前,Openflow Connector for PostgreSQL 要求您在 PostgreSQL 中创建并配置一个 发布 (https://www.postgresql.org/docs/current/logical-replication-publication.html#LOGICAL-REPLICATION-PUBLICATION)。您可以为所有表或表的子集以及仅具有指定列的特定表创建发布。确保您计划复制的每张表和列都包含在发布中。您也可以稍后在连接器运行时修改发布。要创建和配置发布,请执行以下操作:

  1. 以具有 CREATE 权限的用户身份登录数据库,然后运行以下查询:

    • For PostgreSQL 13 and later:

      CREATE PUBLICATION <publication name> WITH (publish_via_partition_root = true);
      
      Copy

      The additional publish_via_partition_root is needed for correct replication of partitioned tables. To learn more about ingestion of partitioned tables see Replicate a partitioned table.

    • For PostgreSQL versions earlier than 13:

      CREATE PUBLICATION <publication name>;
      
      Copy
  2. 使用以下命令定义数据库代理可见的表:

ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Copy

For partitioned tables, it's enough to just add the root partition table to the publication. See Replicate a partitioned table for more details.

重要

** PostgreSQL 15 及更高版本** 支持为表列的指定子集配置发布。为使连接器正确支持此功能,必须使用 列筛选设置 来包含与发布上设置的相同的列。

如果没有此设置,连接器的行为将如下所示:

  • 在目标表中,未包含在筛选器中的列将以 __DELETED 作为后缀。在快照阶段复制的所有数据都将保留下来。

  • After you add new columns to the publication, the table will be permanently failed, and you will need to restart its replication.

有关更多信息,请参阅 ALTER PUBLICATION (https://www.postgresql.org/docs/current/sql-alterpublication.html) 。

安装连接器

  1. 导航到 Openflow“Overview”页面。在 Featured connectors 部分中,选择 View more connectors

  2. 在 Openflow 连接器页面上,找到连接器并选择 Add to runtime

  3. Select runtime 对话框中,从 Available runtimes 下拉列表中选择您的运行时。

  4. 选择 Add

    备注

    在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。

  5. 使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。

  6. 使用您的 Snowflake 账户凭据进行运行时身份验证。

此时将显示 Openflow 画布,其中添加了连接器进程组。

配置连接器

您可以为以下用例配置连接器:

实时复制一组表

  1. 右键点击导入的进程组并选择 Parameters

  2. 按照 流参数 中所述填充所需的参数值。

流参数

Start with setting the parameters of the PostgreSQL Source Parameters context, then the PostgreSQL Destination Parameters context. Once this is done, you can enable the connector, and it should connect both to PostgreSQL and Snowflake and start running. However, it will not replicate any data until any tables are explicitly added to its configuration.

要为复制配置特定的表,请编辑 PostgreSQL 引入参数上下文。在对复制参数上下文应用更改后不久,连接器会检测到这些配置,并为每张表启动复制生命周期。

PostgreSQL 源参数上下文

参数

描述

PostgreSQL Connection URL

指向源数据库的完整 JDBC URL。示例:jdbc:postgresql://example.com:5432/public

如果您要连接到 PostgreSQL 副本服务器,请参阅 从 PostgreSQL 副本服务器复制表

PostgreSQL JDBC Driver

PostgreSQL JDBC 驱动程序 jar (https://jdbc.postgresql.org/) 的路径。从其网站下载 jar,然后选中 Reference asset 复选框将其上传并附加。

PostgreSQL Username

连接器的用户名。

PostgreSQL Password

连接器的密码。

Publication Name

The name of the publication you created earlier.

PostgreSQL 目标参数上下文

参数

描述

必填

目标数据库

The database where data will be persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

Snowflake 账户标识符

使用以下方式时:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR: Snowflake account name formatted as [organization-name]-[account-name] where data will be persisted.

Snowflake 身份验证策略

使用以下方式时:

  • Snowflake Openflow 部署:使用 SNOWFLAKE_SESSION_TOKEN。此令牌由 Snowflake 自动管理。

  • BYOC:使用 KEY_PAIR 作为身份验证策略的值。

Snowflake 私钥

使用以下方式时:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR:必须是用于身份验证的 RSA 私钥。

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

Snowflake 私钥文件

使用以下方式时:

  • 会话令牌身份验证策略:私钥文件必须为空。

  • KEY_PAIR: Upload the file that contains the RSA private key used for authentication to Snowflake, formatted according to PKCS8 standards and including standard PEM headers and footers. The header line begins with -----BEGIN PRIVATE. To upload the private key file, select the Reference asset checkbox.

Snowflake 私钥密码

使用以下方式时:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR: Provide the password associated with the Snowflake Private Key File.

Snowflake 角色

使用以下方式时:

  • 会话令牌身份验证策略:使用运行时角色。您可以在 Openflow UI 中,通过导航至您的运行时的 View Details 来找到您的运行时角色。

  • KEY_PAIR 身份验证策略:使用为您的服务用户配置的有效角色。

Snowflake 用户名

使用以下方式时:

  • Session Token Authentication Strategy: Must be blank.

  • KEY_PAIR: Provide the user name used to connect to the Snowflake instance.

Snowflake 仓库

Snowflake warehouse used to run queries.

PostgreSQL 引入参数上下文

参数

描述

包括表名

A comma-separated list of table paths, including their schemas. Example: public.my_table, other_schema.other_table.

按名称或正则表达式选择表。如果同时使用这两个选项,则将包含任一选项中所有匹配的表。

Tables being sub-partitions are always excluded from ingestion. See Replicate a partitioned table for more information.

包括表正则表达式

用于匹配表路径的正则表达式。与表达式匹配的每个路径都将被复制,并且还将自动包括与稍后创建的模式相匹配的新表。示例:public\.auto_.*

按名称或正则表达式选择表。如果同时使用这两个选项,则将包含任一选项中所有匹配的表。

Tables being sub-partitions are always excluded from ingestion. See Replicate a partitioned table for more information.

Column Filter JSON

Optional. A JSON containing a list of fully-qualified table names and a regex pattern for column names that should be included into replication. Example: [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ] will include all columns that end with name in table1 from the public schema.

合并任务计划 CRON

定义触发从日志到目标表的合并任务的 CRON 表达式。如果您想持续合并或按照计划时间来限制仓库运行时间,请将其设置为 * * * * * ?

例如:

  • 字符串 * 0 * * * ? 表示您要在每小时整点计划合并,持续一分钟

  • 字符串 * 20 14 ? * MON-FRI 表示您计划在周一到周五每天 2:20 PM 触发合并

有关其他信息和示例,请参阅 Quartz 文档 (https://www.quartz-scheduler.org/documentation/quartz-2.2.2/tutorials/tutorial-lesson-06.html) 中的 cron 触发教程

Object Identifier Resolution

Specifies how source object identifiers such as schemas, tables, and columns names are stored and queried in Snowflake. This setting dictates whether you will need to use double quotes in SQL queries.

Option 1: Default, case-sensitive. For backwards compatibility.

  • Transformation: Case is preserved. For example, My_Table remains My_Table.

  • Queries: SQL queries must use double quotes to match the exact case for database objects. For example, SELECT * FROM "My_Table";.

备注

Snowflake recommends using this option if you must preserve source casing for legacy or compatibility reasons. For example, if the source database includes table names that differ in case only such as MY_TABLE and my_table that would result in a name collision when using when using case-insensitive comparisons.

Option 2: Recommended, case-insensitive

  • Transformation: All identifiers are converted to uppercase. For example, My_Table becomes MY_TABLE.

  • Queries: SQL queries are case-insensitive and don't require SQL double quotes. For example SELECT * FROM my_table; returns the same results as SELECT * FROM MY_TABLE;.

备注

Snowflake recommends using this option if database objects are not expected to have mixed case names.

重要

Do not change this setting after connector ingestion has begun. Changing this setting after ingestion has begun will break the existing ingestion. If you must change this setting, create a new connector instance.

从 PostgreSQL 副本服务器复制表

连接器可以使用 逻辑复制 (https://www.postgresql.org/docs/current/logical-replication.html) 从主服务器、热备用副本 (https://www.postgresql.org/docs/current/hot-standby.html),或订阅者服务器引入数据。在配置连接器以连接到 PostgreSQL 副本之前,确保主节点和副本节点之间的复制正常工作。在调查连接器中缺失数据的问题时,首先确保连接器使用的副本服务器中存在缺失的行。

连接到备用副本时的其他注意事项:

  • 仅支持连接到热备用副本。请注意,在提升为主实例之前,热备用副本无法接受来自客户端的连接。

  • 服务器的 PostgreSQL 版本必须 >= 16。

  • 连接器所需的 发布 必须在主服务器上创建,而不是在备用服务器上创建。备用服务器是只读的,不允许创建发布。

如果您连接到热备用实例并看到 Trying to create the replication slot '<replication slot>' timed out.If connecting to a standby instance, ensure there is some traffic on the primary PostgreSQL instance, otherwise the call to create a replication slot will never return. 错误,或者 Read PostgreSQL CDC Stream 处理器未启动,请登录主 PostgreSQL 实例并执行以下查询:

SELECT pg_log_standby_snapshot();
Copy

当主服务器中没有数据变更时,会发生此错误。因此,在副本服务器上创建复制槽时,连接器可能会出现卡顿。这是因为副本服务器需要有关主服务器正在运行的事务的信息,才能创建复制槽。主服务器在空闲时不会发送信息。pg_log_standby_snapshot() 函数强制主服务器将有关正在运行的事务的信息发送到副本服务器。

从复制中移除并重新添加表

要从复制中移除表,请确保在复制参数上下文中的 包括表名包括表正则表达式 参数里移除。

如果您想稍后将该表重新添加到复制中,请先在 Snowflake 中删除相应的目标表。然后,再将该表添加回 包括表名包括表正则表达式 参数中。这样可以确保表的复制过程重新开始。

此方法还可用于从失败的表复制场景中恢复复制。

复制表中列的子集

连接器可以将每张表复制的数据筛选到已配置列的子集。

要对列应用筛选器,请在复制参数上下文中修改 Column Filter 属性,添加一个配置数组,每个条目对应一张需要筛选列的表。

可以按名称或模式包含或排除列。您可以对每张表应用一个条件,也可以合并多个条件,排除项的优先级始终高于包含项。

以下示例显示了可用的字段。schematable 是必填项,且必须至少填写以下其中一个字段:includedexcludedincludedPatternexcludedPattern

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Replicate a partitioned table

The connector supports replication of partitioned tables for PostgreSQL servers with version >= 15. A PostgreSQL partitioned table will be replicated into Snowflake as a single destination table.

For example, if you have a partitioned table orders, with sub-partitions orders_2023, orders_2024, and configured the connector to ingest all tables matching orders.* pattern, then only the orders table will be replicated to Snowflake, and it will include data from all sub-partitions.

To support replication of partitioned tables, ensure that the publication created in PostgreSQL has the publish_via_partition_root option set to true.

Ingestion of partitioned tables has currently the following limitations:

  • When a table is attached as a partition to a partitioned table after ingestion was started, the connector won't fetch data that existed in the partition table before attaching.

  • When a sub-partition table is detached from the partitioned table after ingestion was started, the connector won't mark the data from this sub-partition as deleted in the root partition table.

  • Truncate operation on subpartitions will not mark affected records as deleted.

跟踪表中的数据变化

连接器不仅复制源表中数据的当前状态,还复制每个变更集中每行的每个状态。这些数据存储在与目标表相同的架构中创建的日志表中。

The journal table names are formatted as: <source table name>_JOURNAL_<timestamp>_<schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> is an integer increasing with every schema change on the source table. As a result, source tables that undergo schema changes will have multiple journal tables.

当表从复制中移除后又重新添加时,<timestamp> value will change, and <schema generation> will start again from 1

重要

Snowflake recommends that you do not alter the structure of journal tables in any way. They are used by the connector to update the destination table as part of the replication process.

The connector never drops journal tables, but does make use of the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:

  • Truncate all journal tables at any time.

  • Drop the journal tables related to source tables that were removed from replication.

  • Drop all but the latest generation journal tables for actively replicated tables.

例如,如果您的连接器设置为主动复制源表 orders,并且您之前已将表 customers 从复制中移除,则可能存在以下日志表:在这种情况下,您可以把它们全部删除,仅保留 orders_5678_2

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

配置合并任务的调度

连接器使用仓库将变更数据捕获 (CDC) 数据合并到目标表中。此操作由 MergeSnowflakeJournalTable 处理器触发。如果没有新的更改,或者 MergeSnowflakeJournalTable 队列中没有新的待处理的 FlowFile,则不会触发合并,仓库会自动暂停。

要限制仓库成本并将合并仅限于预定时间,可以在合并任务计划 CRON 参数中使用 CRON 表达式。它限制了流向 MergeSnowflakeJournalTable 处理器的 FlowFile,并且只能在特定的时间段内触发合并。有关调度的更多信息,请参阅 调度策略 (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy)。

停止或删除连接器

停止或移除连接器时,需要注意连接器使用的 复制槽 (https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS)。

连接器会创建自己的复制槽,其名称以 snowflake_connector_ 开头,后跟随机后缀。当连接器读取复制流时会推进复制槽的位置,使 PostgreSQL 能够清理 WAL 日志并释放磁盘空间。

当连接器暂停时,复制槽不会被推进,对源数据库的更改会不断增加 WAL 日志大小。因此,不建议长时间暂停连接器,尤其是在高流量数据库中。

移除连接器时,无论是将其从 Openflow 画布中删除,还是通过任何其他方式(例如删除整个 Openflow 实例),复制槽都将保持原位,必须手动删除。

如果您有多个连接器实例从同一个 PostgreSQL 数据库进行复制,则每个实例将创建自己的唯一命名的复制槽。手动删除复制槽时,请确保删除的是正确的复制槽。通过检查 CaptureChangePostgreSQL 处理器的状态,您可以查看给定连接器实例使用的是哪个复制槽。

运行流

  1. 右键点击“飞机”图标并选择 Enable all Controller Services

  2. 右键点击导入的进程组并选择 Start。连接器开始数据引入。

语言: 中文