Set up the Openflow Connector for PostgreSQL¶
Note
This connector is subject to the Snowflake Connector Terms.
This topic describes the steps to set up the Openflow Connector for PostgreSQL.
Note
This connector can be configured to immediately start replicating incremental changes for newly added tables, bypassing the snapshot load phase. This option is often useful when reinstalling the connector in an account where previously replicated data exists and you want to continue replication without having to re-snapshot tables.
For details on the incremental load process, see Incremental replication.
For information about restarting table replication for failed tables, see Restart table replication.
先决条件
-
Ensure that you have reviewed About Openflow Connector for PostgreSQL.
-
Ensure that you have reviewed the supported PostgreSQL versions.
-
Recommended: Ensure that you add only one connector instance per runtime.
-
Ensure that you have Set up Openflow - BYOC or Set up Openflow - Snowflake Deployments.
-
If using Openflow - Snowflake Deployments, ensure that you’ve reviewed configuring required domains and have granted access to the required domains for the PostgreSQL connector.
-
作为数据库管理员,请执行以下任务:
- Configure wal_level
- Create a publication
- Ensure that there is enough disk space on your PostgreSQL server for the WAL. This is because once created, a replication slot causes PostgreSQL to retain the WAL data from the position held by the replication slot, until the connector confirms and advances that position.
- Allow at least 1 logical replication slot and 2 WAL senders per Openflow Connector for PostgreSQL connector instance on the server. Set
max_replication_slotsandmax_wal_sendershigh enough to cover that and all other replication traffic on the instance. - 确保启用复制的每张表都有主键。键可以是单个列或组合。
- Set the REPLICA IDENTITY (https://www.postgresql.org/docs/current/sql-altertable.html#SQL-ALTERTABLE-REPLICA-IDENTITY) of tables to
DEFAULT. This ensures that the primary keys are represented in the WAL, and the connector can read them. - Create a user for the connector. The connector requires a user with the
REPLICATIONattribute and permissions to SELECT from every replicated table. Create that user with a password to enter into the connector’s configuration. For more information on replication security, see Security (https://www.postgresql.org/docs/current/logical-replication-security.html).
-
作为 Snowflake 账户管理员,请执行以下任务:
-
Create a Snowflake user with the type as SERVICE. Create a database to store the replicated data, and set up privileges for the Snowflake user to create objects in that database by granting the USAGE and CREATE SCHEMA privileges.
-
Create a pair of secure keys (public and private). Store the private key for the user in a file to use while configuring the connector. Assign the public key to the Snowflake service user:
For more information, see key-pair authentication.
-
Designate a warehouse for the connector to use. Start with the
XSMALLwarehouse size, then experiment with size depending on the amount of tables being replicated, and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than the warehouse size.
-
配置 wal_ level¶
Openflow Connector for PostgreSQL requires wal_level (https://www.postgresql.org/docs/current/runtime-config-wal.html#GUC-WAL-LEVEL) to be set to logical.
根据 PostgreSQL 服务器的托管位置,您可以按如下方式配置 wal_level:
| On premise | Execute following query with superuser or user with |
| RDS | User used by the agent needs to have the 您还需要执行以下操作:
|
| AWS Aurora | Set the rds.logical_replication static parameter to 1. |
| GCP | 设置以下标记:
|
| Azure | Set the replication support to Logical. For more information, see Azure documentation (https://learn.microsoft.com/en-us/azure/postgresql/single-server/concepts-logical#set-up-your-server). |
创建发布
Openflow Connector for PostgreSQL requires a publication (https://www.postgresql.org/docs/current/logical-replication-publication.html#LOGICAL-REPLICATION-PUBLICATION) to be created and configured in PostgreSQL before replication starts. You can create it for all, or a subset of tables, as well as for specific tables with specified columns only. Make sure that every table and column that you plan to have replicated is included in the publication. You can also modify the publication later, while the connector is running. To create and configure a publication, do the following:
-
以具有 CREATE 权限的用户身份登录数据库,然后运行以下查询:
-
For PostgreSQL 13 and later:
The additional
publish_via_partition_rootis needed for correct replication of partitioned tables. To learn more about ingestion of partitioned tables see Replicate a partitioned table. -
For PostgreSQL versions earlier than 13:
-
-
使用以下命令定义数据库代理可见的表:
For partitioned tables, it’s enough to just add the root partition table to the publication. See Replicate a partitioned table for more details.
Important
PostgreSQL 15 and later support configuring publications for a specified subset of table columns. For the connector to support this correctly, you must use the column filtering settings to include the same columns as set on the publication.
如果没有此设置,连接器的行为将如下所示:
- In the destination table, columns that aren’t included in the filter will be suffixed with
__DELETED. All data replicated during the snapshot phase will be retained.- After you add new columns to the publication, the table will be permanently failed, and you will need to restart its replication.
For more information, see ALTER PUBLICATION (https://www.postgresql.org/docs/current/sql-alterpublication.html).
安装连接器
To install the connector, do the following as a data engineer:
-
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
-
On the Openflow connectors page, find the connector and select Add to runtime.
-
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
Note
Before you install the connector, ensure that you have created a database and schema in Snowflake for the connector to store ingested data.
-
Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.
-
Authenticate to the runtime with your Snowflake account credentials.
The Openflow canvas appears with the connector process group added to it.
Runtime sizing¶
The runtime size determines the CPU and memory available to the connector. The available sizes are Small, Medium, and Large. The connector requires Medium or Large. Choose the size when you create the runtime: you can’t change the size of an existing runtime in place.
Choose Large if you expect high replication throughput or if source tables contain wide rows.
Resize a runtime¶
Runtime size is fixed at creation, so to change size you run the connector on a different runtime. You have two options depending on whether you want to preserve the current replication progress.
If you don’t need to keep the progress of the current connector, the simplest path is to create a new runtime at the size you need and install a new connector instance on it. The new connector starts from scratch: it snapshots all configured tables and then captures ongoing changes from that point. The replication progress of the existing connector is discarded.
To keep the progress of the current connector, for example to avoid re-snapshotting tables that took a long time to snapshot initially, migrate the connector to the new runtime. This reuses the existing destination tables and resumes incremental replication from where it left off.
For migration instructions, see Reinstall the connector.
配置连接器
To configure the connector, do the following as a data engineer:
-
Right-click on the imported process group and select Parameters.
-
Populate the required parameter values.
For more information on the required parameter values, see the following sections:
- PostgreSQL Source Parameters: Used to establish a connection with PostgreSQL.
- PostgreSQL Destination Parameters: Used to establish a connection with Snowflake.
- PostgreSQL Ingestion Parameters: Used to specify the tables to replicate.
Start with setting the parameters of the PostgreSQL Source Parameters context, then the PostgreSQL Destination Parameters context. Once this is done, you can enable the connector, and it should connect both to PostgreSQL and Snowflake and start running. However, it won’t replicate any data until any tables are explicitly added to its configuration.
要为复制配置特定的表,请编辑 PostgreSQL 引入参数上下文。在对复制参数上下文应用更改后不久,连接器会检测到这些配置,并为每张表启动复制生命周期。
PostgreSQL Source Parameters¶
| Parameter | Description |
|---|---|
| PostgreSQL Connection URL | The full JDBC URL to the source database. Example: If you are connecting to PostgreSQL replica server, see Replicate tables from a PostgreSQL replica server. |
| PostgreSQL JDBC Driver | The path to the PostgreSQL JDBC driver jar (https://jdbc.postgresql.org/). Download the jar from its website, then select the Reference asset checkbox to upload and attach it. |
| PostgreSQL Username | The username for the connector. |
| PostgreSQL Password | The password for the connector. |
| Publication Name | The name of the publication you created earlier. |
| Replication Slot Name | Optional. When no value is provided, the connector will create a new, uniquely-named slot. When given a value, the connector will use the existing slot, or create a new one with the provided name. Changing the value for a running connector will restart reading the incremental change data capture (CDC) stream from the updated slot’s position. |
PostgreSQL Destination Parameters¶
| Parameter | Description | Required |
|---|---|---|
| Destination Database | The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. | Yes |
| Destination Schema Pattern | A pattern for the names of destination schemas where data is persisted. The connector creates the schemas if they don’t exist. You can customize the pattern per ingested table using these optional variables:
For example, for a table with the qualified name To ingest all tables into a single schema, provide a schema name without any variables,
like Important Don’t change this setting after the connector has begun ingesting data. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance. | Yes |
| Snowflake Authentication Strategy | When using:
| Yes |
| Snowflake Account Identifier | When using:
| Yes |
| Snowflake Connection Strategy | When using KEY_PAIR, specify the strategy for connecting to Snowflake:
| Required for BYOC with KEY_PAIR only, otherwise ignored. |
| Snowflake Private Key | When using:
| No |
| Snowflake Private Key File | When using:
| No |
| Snowflake Private Key Password | When using
| No |
| Snowflake Role | When using
| Yes |
| Snowflake Username | When using
| Yes |
| Oversized Value Strategy | Determines how the connector handles values that exceed its internal size limits (16 MB) during replication. Possible values are:
| No |
| Snowflake Warehouse | Snowflake warehouse used to run queries. | Yes |
PostgreSQL Ingestion Parameters¶
| Parameter | Description |
|---|---|
| Included Table Names | A comma-separated list of table paths, including their schemas. Example: 按名称或正则表达式选择表。如果同时使用这两个选项,则将包含任一选项中所有匹配的表。 Tables being sub-partitions are always excluded from ingestion. See Replicate a partitioned table for more information. |
| Included Table Regex | A regular expression to match against table paths. Every path matching the
expression will be replicated, and new tables matching the pattern that get
created later will also be included automatically. Example: 按名称或正则表达式选择表。如果同时使用这两个选项,则将包含任一选项中所有匹配的表。 Tables being sub-partitions are always excluded from ingestion. See Replicate a partitioned table for more information. |
| Column Filter JSON | Optional. A JSON array of filter objects specifying which columns to include or exclude per table. For syntax details and examples, see 复制表中列的子集. |
| Merge Task Schedule CRON | CRON expression defining periods when merge operations from Journal to Destination
Table will be triggered. Set it to 例如:
For additional information and examples, see the cron triggers tutorial in the Quartz Documentation (https://www.quartz-scheduler.org/documentation/quartz-2.2.2/tutorials/tutorial-lesson-06.html) |
| Object Identifier Resolution | Specifies how source object identifiers such as the names of schemas, tables, and columns are stored and queried in Snowflake. This setting specifies that you must use double quotes in SQL queries. Option 1: Default, case-sensitive. For backwards compatibility.
Note Snowflake recommends using this option if you must preserve source casing for legacy or compatibility reasons.
For example, if the source database includes table names that differ in case only–such as
Note Snowflake recommends using this option if database objects are not expected to have mixed case names. Important Do not change this setting after the connector has begun ingesting data. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance. |
从 PostgreSQL 副本服务器复制表¶
The connector can ingest data from a primary server, a hot standby replica (https://www.postgresql.org/docs/current/hot-standby.html), or subscriber server using logical replication (https://www.postgresql.org/docs/current/logical-replication.html). Before configuring the connector to connect to a PostgreSQL replica, ensure that replication between primary and replica nodes works correctly. When investigating issues with missing data in the connector, first ensure that missing rows are present in replica server used by the connector.
连接到备用副本时的其他注意事项:
- PostgreSQL version of the server must be >= 16. Amazon Aurora is not supported because it doesn’t offer logical decoding from read replicas (https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/AuroraPostgreSQL.Replication.Logical.html).
- Only connecting to hot standby replica is supported. Note that warm standby replicas can’t accept connections from clients until they are promoted to a primary instance.
- The publication needed by the connector must be created on the primary server, not the standby server. The standby server is read-only and doesn’t allow to create publication.
If you connect to a hot standby instance and see Trying to create the replication slot ‘<replication slot>’ timed out. If connecting to a standby instance, ensure there is some traffic on the primary PostgreSQL instance, otherwise the call to create a replication slot will never return. error in the Openflow bulletin, or the Read PostgreSQL CDC Stream processor isn’t starting, log in to the primary PostgreSQL instance and execute the following query:
The error occurs when there are no data changes in the primary server. As such the connector can stall while
creating a replication slot on the replica server. This results from the replica server requiring information about
running transactions from the primary server to be able to create a replication slot. Primary servers won’t send the
information while idle. The pg_log_standby_snapshot() function forces the primary server to send information
about running transactions to the replica server.
复制表中列的子集
The connector can filter the data replicated per table to a subset of configured columns. Primary key columns are always included regardless of exclusions.
To apply column filters, set the Column Filter JSON parameter in the Ingestion Parameters context to a JSON array of filter objects, one per table you want to filter.
Columns can be included or excluded by name or by regular expression pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.
Syntax¶
Each object in the array identifies a table and specifies which columns to include or exclude.
The following rules apply:
- Use
schemaandtablefor exact name matching, orschemaPatternandtablePatternfor regex matching. You cannot use both a field and its pattern variant in the same object (for example,schemaandschemaPatterncannot both appear). - At least one of
included,excluded,includedPattern, orexcludedPatternmust be provided. - When both included and excluded filters are specified, exclusions take precedence.
- When multiple filters match the same table, the last matching filter is used, with exact matches taking precedence over pattern-based filters.
- The value can be an array of objects to apply different filters to different tables.
Examples¶
Include specific columns by name:
Exclude specific columns by name:
Combine an include pattern with a specific exclusion (for example, include all email columns except admin_email):
Mix a schema pattern with an exact table name to apply a filter across schemas:
Pass multiple filter objects to apply different rules to different tables:
Replicate a partitioned table¶
The connector supports replication of partitioned tables for PostgreSQL servers with version >= 15. A PostgreSQL partitioned table will be replicated into Snowflake as a single destination table.
For example, if you have a partitioned table orders, with sub-partitions orders_2023, orders_2024,
and configured the connector to ingest all tables matching orders.* pattern, then only the orders table
will be replicated to Snowflake, and it will include data from all sub-partitions.
To support replication of partitioned tables, ensure that the publication
created in PostgreSQL has the publish_via_partition_root option set to true.
Ingestion of partitioned tables has currently the following limitations:
- When a table is attached as a partition to a partitioned table after ingestion was started, the connector won’t fetch data that existed in the partition table before attaching.
- When a sub-partition table is detached from the partitioned table after ingestion was started, the connector won’t mark the data from this sub-partition as deleted in the root partition table.
- Truncate operation on subpartitions won’t mark affected records as deleted.
跟踪表中的数据变化
连接器不仅复制源表中数据的当前状态,还复制每个变更集中每行的每个状态。这些数据存储在与目标表相同的架构中创建的日志表中。
The journal table names are formatted as: <source_table_name>_JOURNAL_<timestamp>_<schema_generation>
where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema_generation> is an integer increasing with every schema change on the source table.
As a result, source tables that undergo schema changes will have multiple journal tables.
When a table is removed from replication, then added back, the <timestamp> value will change, and <schema_generation> will start again from 1.
Important
Snowflake recommends that you don’t alter the structure of journal tables in any way. They are used by the connector to update the destination table as part of the replication process.
The connector never drops journal tables, but does make use of the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:
- Truncate all journal tables at any time.
- Drop the journal tables related to source tables that were removed from replication.
- Drop all but the latest generation journal tables for actively replicated tables.
For example, if your connector is set to actively replicate source table orders,
and you have earlier removed table customers from replication, you may have
the following journal tables. In this case you can drop all of them except orders_5678_2.
配置合并任务的调度
连接器使用仓库将变更数据捕获 (CDC) 数据合并到目标表中。此操作由 MergeSnowflakeJournalTable 处理器触发。如果没有新的更改,或者 MergeSnowflakeJournalTable 队列中没有新的待处理的 FlowFile,则不会触发合并,仓库会自动暂停。
To limit the warehouse cost and limit merges to only scheduled time, use the CRON expression in the Merge task Schedule CRON parameter. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy).
停止或删除连接器
When stopping or removing the connector, you have to consider the replication slot (https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS) that the connector uses.
The connector creates its own replication slot with a name starting with
snowflake_connector_ followed by a random suffix. As the connector reads the replication stream,
it advances the slot, so that PostgreSQL can trim its WAL log and free up disk space.
When the connector is paused, the slot isn’t advanced, and changes to the source database keep increasing the WAL log size. You should not keep the connector paused for extended periods of time, especially on high-traffic databases.
移除连接器时,无论是将其从 Openflow 画布中删除,还是通过任何其他方式(例如删除整个 Openflow 实例),复制槽都将保持原位,必须手动删除。
If you have multiple connector instances replicating from the same PostgreSQL database,
each instance will create its own uniquely-named replication slot. When dropping a replication slot manually, make sure
it’s the right one. You can see which replication slot is used by a given connector instance by checking the state of the CaptureChangePostgreSQL processor.
运行流
- Right-click on the plane and select Enable all Controller Services.
- Right-click on the imported process group and select Start. The connector starts the data ingestion.