Openflow Connector for PostgreSQL Maintenance

Note

This connector is subject to the Snowflake Connector Terms.

This topic describes important maintenance considerations and best practices for maintaining the Openflow Connector for PostgreSQL when making changes to the source PostgreSQL database. In addition, this topic describes how to restart table replication and reinstall the connector.

Check the replication status of a table

Interim failures, such as connection errors, do not prevent table replication. However, permanent failures, such as unsupported data types, prevent table replication.

To troubleshoot replication issues or verify that a table has been successfully removed from the replication flow, check the Table State Store:

  1. In the Openflow runtime canvas, right-click a processor group and choose Controller Services. A table listing controller services displays.
  2. Locate the row labeled Table State Store, click the More Three vertical dots indicating more options button on the right side of the row, and then choose View State.

A list of tables and their current states displays. Type in the search box to filter the list by table name. The possible states are:

  • NEW: The table is scheduled for replication but replication hasn’t started.
  • SNAPSHOT_REPLICATION: The connector is copying existing data. This status displays until all records are stored in the destination table.
  • INCREMENTAL_REPLICATION: The connector is actively replicating changes. This status displays after snapshot replication ends and continues to display indefinitely until a table is either removed from replication or replication fails.
  • FAILED: Replication has permanently stopped due to an error.

Note

The Openflow runtime canvas doesn’t display table status changes — only the current table status. However, table status changes are recorded in logs when they occur. Look for the following log message:

Replication state for table <database_name>.<schema_name>.<table_name> changed from <old_state> to <new_state>

If a permanent failure prevents table replication, remove the table from replication. After you address the problem that caused the failure, you can add the table back to replication. For more information, see Restart table replication.

Restart table replication

A table in FAILED state — for example, due to a missing primary key or unsupported schema change — does not restart automatically. If a table enters a FAILED state or you need to restart replication from scratch, use the following procedure to remove and re-add the table to replication.

Note

If the failure was caused by an issue in the source table such as a missing primary key, resolve that issue in the source database before continuing.

  1. Remove the table from flow parameters: In the Ingestion Parameters context, either remove the table from the Included Table Names or modify the Included Table Regex so the table is no longer matched.

  2. Verify the table has been removed:

    1. In the Openflow runtime canvas, right-click a processor group and choose Controller Services.
    2. In the table listing controller services, locate the Table State Store row, click the three vertical dots on the right side of the row, then choose View State.

    Important

    You must wait until the table’s state is fully removed from this list before proceeding. Do not continue until this configuration change has completed.

  3. Clean up the destination: Once the table’s state shows as fully removed, manually DROP the destination table in Snowflake. Note that the connector will not overwrite an existing destination table during the snapshot phase; if the table still exists, replication will fail again. Optionally, the journal table and stream can also be removed if they are no longer needed.

  4. Re-add the table: Update the Included Table Names or Included Table Regex parameters to include the table again.

  5. Verify the restart: Check the Table State Store using the instructions given previously. The state of the table should appear with the status NEW, then transition to SNAPSHOT_REPLICATION, and finally INCREMENTAL_REPLICATION.

Upgrading PostgreSQL

Upgrading the connector requires a different approach depending on whether PostgreSQL is being upgraded to the next minor or major version.

Minor version upgrades

  • Are data safe.
  • Require no special treatment.
  • Require stopping the connector for the duration of the upgrade to avoid reporting connectivity issues.
  • Continue replicating, after the upgrade, with no data loss.

Major version upgrades

  • Require the PostgreSQL server to drop replication slots, including any used by the connector.
  • Cannot preserve, or migrate replication slots to the new version. See also PostgresSQL 17 and later versions upgrades.
  • Restart replicating all tables from the prior snapshot phase.

To perform a minor version upgrade, do the following:

  1. Stop the connector, including all Processors and Controller Services.
  2. Upgrade PostgreSQL.
  3. Restart the connector.

To perform a major version upgrade, do the following:

  1. Remove all tables from replication in the connector.
  2. Wait until all queues in the connector are empty.
  3. Stop the connector, including all Processors and Controller Services.
  4. Open the Incremental Load group in the connector.
  5. Right-click the top Processor in the group, Read PostgreSQL CDC Stream, and select View state.
  6. Click Clear state.
  7. Click Close.
  8. Upgrade PostgreSQL.
  9. Restart the connector. A new replication slot will be created.
  10. Re-add all tables to begin replication.

PostgresSQL 17 and later versions upgrades

PostgreSQL 17 improved upgrading such that it no longer requires dropping replication slots when upgrading to later versions such as 17.1 » 18.0. Upgrading to PostgreSQL 17.0 or later from prior versions (16 and earlier) drops replications slots and should be treated as a major upgrade. Future versions of PostgreSQL may also improve the upgrade process further.

Reinstall the connector

This section describes how to reinstall the connector. It covers situations where the new connector is installed in the same runtime, or when it is moved to a new runtime. Reinstall is often used in conjunction with Incremental replication without snapshots.

Warning

For the connector to be able to continue replicating from the same CDC stream position where it stopped before reinstallation, the source database must retain the WAL long enough to cover the time since the old connector is stopped and the new connector is started. Ensure the max_wal_size parameter of the PostgreSQL server is high enough, depending on your traffic, and keep the reinstallation time to a minimum.

Prerequisites

Review and note connector parameter context values. If you’re reinstalling the connector in the same runtime, you can reuse the existing context. If the new instance will be located in a different runtime, you will have to re-enter all parameters.

To reinstall the connector:

  1. Finish processing all in-flight FlowFiles in the existing connector, and then stop the connector.

    1. Sign in to Snowsight.

    2. In the navigation menu, select Ingestion » Openflow.

    3. Select Launch Openflow.

    4. In the Openflow pane select the Runtimes tab.

    5. Select the runtime containing the connector.

    6. Select the connector.

    7. Stop the topmost processor Set Tables for Replication in the Snapshot Load group.

    8. Stop the topmost processor Read PostgreSQL CDC Stream in the Incremental Load group.

    9. If you changed the value of the Merge Task Schedule CRON parameter, return it to * * * * * ?, otherwise queues won’t be emptied until the next scheduled run.

      Wait until all FlowFiles in the connector have been processed, and all queues are empty. When all FlowFiles have been processed, the Queued value on the connector’s processor group becomes zero. If there are any items left in the original connector’s queues, there may be data gaps when the new connector starts.

    10. Stop all processors and controller services in the connector.

  2. Find and copy the name of the replication slot used by the original connector, by viewing the state of the topmost processor in the Incremental Load group with name Read PostgreSQL CDC Stream. The replication slot name is stored under the key replication.slot.name. Copy the value of the key to a text editor.

  3. Create a new instance of the connector. If you’re using the same runtime as the original connector, you can choose to keep the existing parameter contexts, and reuse the settings.

    Caution

    The existing connector can remain in the runtime and doesn’t interfere with the new instance, as long as it remains stopped.

  4. If you’re installing into a different runtime, or you deleted the previous parameter contexts, enter all the configuration settings into the new parameter contexts, including the table names and patterns as described in Set up the Openflow Connector for PostgreSQL.

  5. Open the PostgreSQL Ingestion Parameters context, and set Ingestion Type parameter to incremental. For more information on the concerns see Enable incremental replication without snapshots.

  6. Open the PostgreSQL Source Parameters context, and set the Replication Slot Name parameter to the value you copied earlier.

  7. Start the new connector.

Usage notes

The new connector will use the same, existing destination tables that created by the original connector, but will create new journal tables.