设置 Openflow Connector for PostgreSQL

备注

使用该连接器需遵守 连接器条款

本主题介绍设置 Openflow Connector for PostgreSQL 的步骤。

先决条件

  1. 确保您已查看 关于 Openflow Connector for PostgreSQL

  2. 确保您已查看 支持的 PostgreSQL 版本

  3. 确保您已 设置 Openflow

  4. 作为数据库管理员,请执行以下任务:

    1. 配置 wal_level

    2. 创建发布

    3. 确保 PostgreSQL 服务器上有足够的磁盘空间用于 WAL。这是因为复制槽创建后,将导致 PostgreSQL 会保留从该复制槽所记录的位置开始的 WAL 数据,直到连接器确认并推进该位置。

    4. 确保启用复制的每张表都有主键。键可以是单个列或组合。

    5. 将表的 REPLICA IDENTITY (https://www.postgresql.org/docs/current/sql-altertable.html#SQL-ALTERTABLE-REPLICA-IDENTITY) 设置为 DEFAULT。这可确保主键在 WAL 中表示,并且连接器可以读取它们。

    6. 为连接器创建用户。连接器要求用户具有 REPLICATION 属性,以及对所有需要复制的表拥有 SELECT 权限。使用密码创建该用户,以进入连接器的配置。有关复制安全的更多信息,请参阅 安全 (https://www.postgresql.org/docs/current/logical-replication-security.html)。

配置 wal_level

Openflow Connector for PostgreSQL 要求将 wal_level (https://www.postgresql.org/docs/current/runtime-config-wal.html#GUC-WAL-LEVEL) 设置为 logical

根据 PostgreSQL 服务器的托管位置,您可以按如下方式配置 wal_level:

本地

以超级用户或具有 ALTER SYSTEM 权限的用户身份执行以下查询:

ALTER SYSTEM SET wal_level = logical;
Copy

RDS

代理使用的用户需要已分配 rds_superuserrds_replication 角色。

您还需要执行以下操作:

  • rds.logical_replication 静态参数设置为 1。

  • 根据数据库和复制设置,设置 max_replication_slotsmax_connectionsmax_wal_senders 参数。

AWS Aurora

rds.logical_replication 静态参数设置为 1。

GCP

设置以下标记:

  • cloudsql.logical_decoding=on

  • cloudsql.enable_pglogical=on

有关更多信息,请参阅 Google Cloud 文档 (https://cloud.google.com/sql/docs/postgres/replication/configure-logical-replication#set-up-logical-replication-with-pglogical)。

Azure

将复制支持设置为 Logical。有关更多信息,请参阅 Azure 文档 (https://learn.microsoft.com/en-us/azure/postgresql/single-server/concepts-logical#set-up-your-server)。

创建发布

在开始复制之前,Openflow Connector for PostgreSQL 要求您在 PostgreSQL 中创建并配置一个 发布 (https://www.postgresql.org/docs/current/logical-replication-publication.html#LOGICAL-REPLICATION-PUBLICATION)。您可以为所有表或表的子集以及仅具有指定列的特定表创建发布。确保您计划复制的每张表和列都包含在发布中。您也可以稍后在连接器运行时修改发布。要创建和配置发布,请执行以下操作:

  1. 以具有 CREATE 权限的用户身份登录数据库,并执行以下查询:

CREATE PUBLICATION <publication name>;
Copy
  1. 使用以下命令定义数据库代理可见的表:

ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Copy

重要

** PostgreSQL 15 及更高版本** 支持为表列的指定子集配置发布。为使连接器正确支持此功能,必须使用 列筛选设置 来包含与发布上设置的相同的列。

如果没有此设置,连接器将显示以下行为:

  • 在目标表中,未包含在筛选器中的列将以 __DELETED 作为后缀。在快照阶段复制的所有数据仍将存在。

  • 一旦向发布添加新列,该表将永久失败,需要重新启动复制。

有关更多信息,请参阅 ALTER PUBLICATION (https://www.postgresql.org/docs/current/sql-alterpublication.html)。

  1. 作为 Snowflake 账户管理员,执行以下任务:

    1. 创建一个类型为 SERVICE 的 Snowflake 用户。创建一个用于存储复制数据的数据库,并为该 Snowflake 用户设置在该数据库中创建对象所需的权限,即授予 USAGE 和 CREATE SCHEMA 权限

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
                WAREHOUSE_SIZE = 'MEDIUM'
                AUTO_SUSPEND = 300
                AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. 创建安全密钥对(公钥和私钥)。将用户的私钥存储在文件中,以提供给连接器的配置。将公钥分配给 Snowflake 服务用户:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      有关更多信息,请参阅 密钥对

    3. 指定一个仓库供连接器使用。从 MEDIUM 仓库规模开始,然后根据要复制的表的数量和传输的数据量尝试规模。相较于扩大仓库规模,采用 多集群仓库 通常能更有效地应对海量表数量的扩展需求。

将连接器定义导入 Openflow

  1. 导航到 Openflow“Overview”页面。在 Featured connectors 部分中,选择 View more connectors

  2. 在 Openflow 连接器页面上,找到连接器并选择 Add to runtime

  3. Select runtime 对话框中,从 Available runtimes 下拉列表中选择您的运行时。

  4. 选择 Add

    备注

    在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。

  5. 使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。

  6. 使用您的 Snowflake 账户凭据进行运行时身份验证。

此时将显示 Openflow 画布,其中添加了连接器进程组。

配置连接器

您可以为以下用例配置连接器:

实时复制一组表

  1. 右键点击导入的进程组并选择 Parameters

  2. 按照 流参数 中所述填充所需的参数值。

流参数

首先设置 PostgreSQL 源参数上下文的参数,然后设置 PostgreSQL 目标参数上下文的参数。完成此操作后,您就可以启用连接器,它应该会同时连接到 PostgreSQL 和 Snowflake,并开始运行。但在未显式将任何表添加到配置之前,它不会复制任何数据。

要为复制配置特定的表,请编辑 PostgreSQL 引入参数上下文。在对复制参数上下文应用更改后不久,连接器会检测到这些配置,并为每张表启动复制生命周期。

PostgreSQL 源参数上下文

参数

描述

Postgres Connection URL

指向源数据库的完整 JDBC URL。示例:jdbc:postgresql://example.com:5432/public

Postgres JDBC Driver

PostgreSQL JDBC 驱动程序 jar (https://jdbc.postgresql.org/) 的路径。从其网站下载 jar,然后选中 Reference asset 复选框将其上传并附加。

Postgres SSL Mode

启用或禁用 SSL 连接。

Postgres Root SSL Certificate

数据库的根证书的全部内容。如果已禁用 SSL,则为可选。

Postgres Username

连接器的用户名。

Postgres Password

连接器的密码。

Publication Name

您之前创建的发布的名称。

PostgreSQL 目标参数上下文

参数

描述

目标数据库

用于永久保存数据的数据库。它必须已经存在于 Snowflake 中。

Snowflake 账户标识符

Snowflake 账户名称格式为 [organization-name]-[account-name],数据永久保存在其中

Snowflake 身份验证策略

对 Snowflake 进行身份验证的策略。可能的值:当您在 SPCS 上运行流时为 SNOWFLAKE_SESSION_TOKEN,当您想使用私钥设置访问权限时为 KEY_PAIR

Snowflake 私钥

用于身份验证的 RSA 私钥。RSA 密钥必须按照 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。请注意,必须定义 Snowflake 私钥文件或 Snowflake 私钥。

Snowflake 私钥文件

该文件包含用于对 Snowflake 进行身份验证的 RSA 私钥,该私钥根据 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。头标记行以 -----BEGIN PRIVATE 开头。选中 Reference asset 复选框,上传私钥文件。

Snowflake 私钥密码

与 Snowflake 私钥文件关联的密码

Snowflake 角色

查询执行期间使用的 Snowflake 角色

Snowflake 用户名

用于连接到 Snowflake 实例的用户名

Snowflake 仓库

用于运行查询的 Snowflake 仓库

PostgreSQL 引入参数上下文

参数

描述

包括表名

以逗号分隔的表路径列表,包括其架构。示例:public.my_table, other_schema.other_table

包括表正则表达式

用于匹配表路径的正则表达式。与表达式匹配的每个路径都将被复制,并且还将自动包括与稍后创建的模式相匹配的新表。示例:public\.auto_.*

筛选器 JSON

一个 JSON,包含一个完全限定的表名列表和一个用于列名的正则表达式模式,这些列名应该包含在复制中。示例:[ {"schema":"public", "table":"table1", "includedPattern":".*name"} ] 将包括 public 架构的 table1 中所有以 name 结尾的列。

合并任务计划 CRON

定义触发从日志到目标表的合并任务的 CRON 表达式。如果您想持续合并或按照计划时间来限制仓库运行时间,请将其设置为 * * * * * ?

例如:

  • 字符串 * 0 * * * ? 表示您要在每小时整点计划合并,持续一分钟

  • 字符串 * 20 14 ? * MON-FRI 表示您计划在周一到周五每天 2:20 PM 触发合并

有关其他信息和示例,请参阅 Quartz 文档 (https://www.quartz-scheduler.org/documentation/quartz-2.2.2/tutorials/tutorial-lesson-06.html) 中的 cron 触发教程

从复制中移除并重新添加表

要从复制中移除表,请确保在复制参数上下文中的 包括表名包括表正则表达式 参数里移除。

如果您想稍后将该表重新添加到复制中,请先在 Snowflake 中删除相应的目标表。然后,再将该表添加回 包括表名包括表正则表达式 参数中。这样可以确保表的复制过程重新开始。

此方法还可用于从失败的表复制场景中恢复复制。

复制表中列的子集

连接器可以将每张表复制的数据筛选到已配置列的子集。

要对列应用筛选器,请在复制参数上下文中修改 Column Filter 属性,添加一个配置数组,每个条目对应一张需要筛选列的表。

可以按名称或模式包含或排除列。您可以对每张表应用一个条件,也可以合并多个条件,排除项的优先级始终高于包含项。

以下示例显示了可用的字段。schematable 是必填项,且必须至少填写以下其中一个字段:includedexcludedincludedPatternexcludedPattern

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

跟踪表中的数据变化

连接器不仅复制源表中数据的当前状态,还复制每个变更集中每行的每个状态。这些数据存储在与目标表相同的架构中创建的日志表中。

日志表名称的格式为:<source table name>_JOURNAL_<timestamp>_<schema generation>

其中,<timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> 是一个整数,随着源表的每一次架构更改而递增。这意味着经过架构更改的源表将有多个日志表。

当表从复制中移除后又重新添加时,<timestamp> value will change, and <schema generation> will start again from 1

重要

Snowflake 建议您不要以任何方式更改日志表或其中的数据。作为复制过程的一部分,连接器使用它们来更新目标表。

连接器永远不会删除日志表,但只会为每个复制的源表积极使用最新的日志。如果您想回收存储空间,可以安全地删除与已从复制中移除的源表相关的日志表,以及除最新一代主动复制表之外的所有日志表。

例如,如果您的连接器设置为主动复制源表 orders,并且您之前已将表 customers 从复制中移除,则可能存在以下日志表:在这种情况下,您可以把它们全部删除,仅保留 orders_5678_2

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

配置合并任务的调度

连接器使用仓库将变更数据捕获 (CDC) 数据合并到目标表中。此操作由 MergeSnowflakeJournalTable 处理器触发。如果没有新的更改,或者 MergeSnowflakeJournalTable 队列中没有新的待处理的 FlowFile,则不会触发合并,仓库会自动暂停。

要限制仓库成本并将合并仅限于预定时间,可以在合并任务计划 CRON 参数中使用 CRON 表达式。它限制了流向 MergeSnowflakeJournalTable 处理器的 FlowFile,并且只能在特定的时间段内触发合并。有关调度的更多信息,请参阅 调度策略 (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy)。

停止或删除连接器

停止或移除连接器时,需要注意连接器使用的 复制槽 (https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS)。

连接器会创建自己的复制槽,其名称以 snowflake_connector_ 开头,后跟随机后缀。当连接器读取复制流时会推进复制槽的位置,使 PostgreSQL 能够清理 WAL 日志并释放磁盘空间。

当连接器暂停时,复制槽不会被推进,对源数据库的更改会不断增加 WAL 日志大小。因此,不建议长时间暂停连接器,尤其是在高流量数据库中。

移除连接器时,无论是将其从 Openflow 画布中删除,还是通过任何其他方式(例如删除整个 Openflow 实例),复制槽都将保持原位,必须手动删除。

如果您有多个连接器实例从同一个 PostgreSQL 数据库进行复制,则每个实例将创建自己的唯一命名的复制槽。手动删除复制槽时,请确保删除的是正确的复制槽。通过检查 CaptureChangePostgreSQL 处理器的状态,您可以查看给定连接器实例使用的是哪个复制槽。

运行流

  1. 右键点击“飞机”图标并选择 Enable all Controller Services

  2. 右键点击导入的进程组并选择 Start。连接器开始数据引入。

语言: 中文