Openflow Connector for PostgreSQL Maintenance

Note

This connector is subject to the Snowflake Connector Terms.

This topic describes important maintenance considerations and best practices for maintaining the Openflow Connector for PostgreSQL when making changes to the source PostgreSQL database. In addition, this topic describes how to restart table replication and reinstall the connector.

Check the replication status of a table

Interim failures, such as connection errors, do not prevent table replication. However, permanent failures, such as unsupported data types, prevent table replication.

To troubleshoot replication issues or verify that a table has been successfully removed from the replication flow, check the Table State Store:

  1. In the Openflow runtime canvas, right-click a processor group and choose Controller Services. A table listing controller services displays.
  2. Locate the row labeled Table State Store, click the More Three vertical dots indicating more options button on the right side of the row, and then choose View State.

A list of tables and their current states displays. Type in the search box to filter the list by table name. The possible states are:

  • NEW: The table is scheduled for replication but replication hasn’t started.
  • SNAPSHOT_REPLICATION: The connector is copying existing data. This status displays until all records are stored in the destination table.
  • INCREMENTAL_REPLICATION: The connector is actively replicating changes. This status displays after snapshot replication ends and continues to display indefinitely until a table is either removed from replication or replication fails.
  • FAILED: Replication has permanently stopped due to an error.

Note

The Openflow runtime canvas doesn’t display table status changes — only the current table status. However, table status changes are recorded in logs when they occur. Look for the following log message:

Replication state for table <database_name>.<schema_name>.<table_name> changed from <old_state> to <new_state>

If a permanent failure prevents table replication, remove the table from replication. After you address the problem that caused the failure, you can add the table back to replication. For more information, see Restart table replication.

Restart table replication

A table in FAILED state — for example, due to a missing primary key or unsupported schema change — does not restart automatically. If a table enters a FAILED state or you need to restart replication from scratch, use the following procedure to remove and re-add the table to replication.

Note

If the failure was caused by an issue in the source table such as a missing primary key, resolve that issue in the source database before continuing.

  1. Remove the table from flow parameters: In the Ingestion Parameters context, either remove the table from the Included Table Names or modify the Included Table Regex so the table is no longer matched.

  2. Verify the table has been removed:

    1. In the Openflow runtime canvas, right-click a processor group and choose Controller Services.
    2. In the table listing controller services, locate the Table State Store row, click the three vertical dots on the right side of the row, then choose View State.

    Important

    You must wait until the table’s state is fully removed from this list before proceeding. Do not continue until this configuration change has completed.

  3. Clean up the destination: Once the table’s state shows as fully removed, manually DROP the destination table in Snowflake. Note that the connector will not overwrite an existing destination table during the snapshot phase; if the table still exists, replication will fail again. Optionally, the journal table and stream can also be removed if they are no longer needed.

  4. Re-add the table: Update the Included Table Names or Included Table Regex parameters to include the table again.

  5. Verify the restart: Check the Table State Store using the instructions given previously. The state of the table should appear with the status NEW, then transition to SNAPSHOT_REPLICATION, and finally INCREMENTAL_REPLICATION.

升级 PostgreSQL

升级该连接器需要不同的方法,具体取决于是要将 PostgreSQL 升级到下一个次要版本还是主要版本。

次要版本升级

  • 数据是否安全。
  • 无需特殊处理。
  • 在升级期间需要停止连接器,以避免报告连接问题。
  • 升级后继续复制,不会丢失数据。

主要版本升级

  • 需要 PostgreSQL 服务器删除复制槽,包括连接器使用的任何复制槽。
  • Cannot preserve, or migrate replication slots to the new version. See also PostgresSQL 17 及更高版本升级.
  • 重新开始复制前一快照阶段的所有表。

要执行次要版本升级,请执行以下操作:

  1. 停止连接器,包括所有处理器和控制器服务。
  2. 升级 PostgreSQL。
  3. 重新启动连接器。

要执行主要版本升级,请执行以下操作:

  1. 从连接器内的副本中移除所有表。
  2. 等到连接器中的所有队列均为空。
  3. 停止连接器,包括所有处理器和控制器服务。
  4. Open the Incremental Load group in the connector.
  5. Right-click the top Processor in the group, Read PostgreSQL CDC Stream, and select View state.
  6. Click Clear state.
  7. Click Close.
  8. 升级 PostgreSQL。
  9. 重新启动连接器。系统将创建一个新的复制槽。
  10. 重新添加所有表以开始复制。

PostgresSQL 17 及更高版本升级

PostgreSQL 17 improved upgrading such that it no longer requires dropping replication slots when upgrading to later versions such as 17.1 » 18.0. Upgrading to PostgreSQL 17.0 or later from prior versions (16 and earlier) drops replications slots and should be treated as a major upgrade. Future versions of PostgreSQL may also improve the upgrade process further.

Reinstall the connector

This section describes how to reinstall the connector. It covers situations where the new connector is installed in the same runtime, or when it is moved to a new runtime. Reinstall is often used in conjunction with Incremental replication without snapshots.

Warning

For the connector to be able to continue replicating from the same CDC stream position where it stopped before reinstallation, the source database must retain the WAL long enough to cover the time since the old connector is stopped and the new connector is started. Ensure the max_wal_size parameter of the PostgreSQL server is high enough, depending on your traffic, and keep the reinstallation time to a minimum.

Prerequisites

Review and note connector parameter context values. If you’re reinstalling the connector in the same runtime, you can reuse the existing context. If the new instance will be located in a different runtime, you will have to re-enter all parameters.

To reinstall the connector:

  1. Finish processing all in-flight FlowFiles in the existing connector, and then stop the connector.

    1. Sign in to Snowsight.

    2. In the navigation menu, select Ingestion » Openflow.

    3. Select Launch Openflow.

    4. In the Openflow pane select the Runtimes tab.

    5. Select the runtime containing the connector.

    6. Select the connector.

    7. Stop the topmost processor Set Tables for Replication in the Snapshot Load group.

    8. Stop the topmost processor Read PostgreSQL CDC Stream in the Incremental Load group.

    9. If you changed the value of the Merge Task Schedule CRON parameter, return it to * * * * * ?, otherwise queues won’t be emptied until the next scheduled run.

      Wait until all FlowFiles in the connector have been processed, and all queues are empty. When all FlowFiles have been processed, the Queued value on the connector’s processor group becomes zero. If there are any items left in the original connector’s queues, there may be data gaps when the new connector starts.

    10. Stop all processors and controller services in the connector.

  2. Find and copy the name of the replication slot used by the original connector, by viewing the state of the topmost processor in the Incremental Load group with name Read PostgreSQL CDC Stream. The replication slot name is stored under the key replication.slot.name. Copy the value of the key to a text editor.

  3. Create a new instance of the connector. If you’re using the same runtime as the original connector, you can choose to keep the existing parameter contexts, and reuse the settings.

    Caution

    The existing connector can remain in the runtime and doesn’t interfere with the new instance, as long as it remains stopped.

  4. If you’re installing into a different runtime, or you deleted the previous parameter contexts, enter all the configuration settings into the new parameter contexts, including the table names and patterns as described in Set up the Openflow Connector for PostgreSQL.

  5. Open the PostgreSQL Ingestion Parameters context, and set Ingestion Type parameter to incremental. For more information on the concerns see Enable incremental replication without snapshots.

  6. Open the PostgreSQL Source Parameters context, and set the Replication Slot Name parameter to the value you copied earlier.

  7. Start the new connector.

Usage notes

The new connector will use the same, existing destination tables that created by the original connector, but will create new journal tables.