设置 Openflow Connector for PostgreSQL¶
备注
使用该连接器需遵守 连接器条款。
本主题介绍设置 Openflow Connector for PostgreSQL 的步骤。
先决条件¶
确保您已查看 支持的 PostgreSQL 版本。
确保您已 设置 Openflow。
作为数据库管理员,请执行以下任务:
确保 PostgreSQL 服务器上有足够的磁盘空间用于 WAL。这是因为复制槽创建后,将导致 PostgreSQL 会保留从该复制槽所记录的位置开始的 WAL 数据,直到连接器确认并推进该位置。
确保启用复制的每张表都有主键。键可以是单个列或组合。
将表的 REPLICA IDENTITY (https://www.postgresql.org/docs/current/sql-altertable.html#SQL-ALTERTABLE-REPLICA-IDENTITY) 设置为
DEFAULT
。这可确保主键在 WAL 中表示,并且连接器可以读取它们。为连接器创建用户。连接器要求用户具有
REPLICATION
属性,以及对所有需要复制的表拥有 SELECT 权限。使用密码创建该用户,以进入连接器的配置。有关复制安全的更多信息,请参阅 安全 (https://www.postgresql.org/docs/current/logical-replication-security.html)。
配置 wal_level¶
Openflow Connector for PostgreSQL 要求将 wal_level (https://www.postgresql.org/docs/current/runtime-config-wal.html#GUC-WAL-LEVEL) 设置为 logical
。
根据 PostgreSQL 服务器的托管位置,您可以按如下方式配置 wal_level:
本地 |
以超级用户或具有
|
RDS |
代理使用的用户需要已分配 您还需要执行以下操作:
|
AWS Aurora |
将 |
GCP |
设置以下标记:
|
Azure |
将复制支持设置为 |
创建发布¶
在开始复制之前,Openflow Connector for PostgreSQL 要求您在 PostgreSQL 中创建并配置一个 发布 (https://www.postgresql.org/docs/current/logical-replication-publication.html#LOGICAL-REPLICATION-PUBLICATION)。您可以为所有表或表的子集以及仅具有指定列的特定表创建发布。确保您计划复制的每张表和列都包含在发布中。您也可以稍后在连接器运行时修改发布。要创建和配置发布,请执行以下操作:
以具有
CREATE
权限的用户身份登录数据库,并执行以下查询:
CREATE PUBLICATION <publication name>;
使用以下命令定义数据库代理可见的表:
ALTER PUBLICATION <publication name> ADD TABLE <table name>;
重要
** PostgreSQL 15 及更高版本** 支持为表列的指定子集配置发布。为使连接器正确支持此功能,必须使用 列筛选设置 来包含与发布上设置的相同的列。
如果没有此设置,连接器将显示以下行为:
在目标表中,未包含在筛选器中的列将以
__DELETED
作为后缀。在快照阶段复制的所有数据仍将存在。一旦向发布添加新列,该表将永久失败,需要重新启动复制。
有关更多信息,请参阅 ALTER PUBLICATION (https://www.postgresql.org/docs/current/sql-alterpublication.html)。
作为 Snowflake 账户管理员,执行以下任务:
创建一个类型为 SERVICE 的 Snowflake 用户。创建一个用于存储复制数据的数据库,并为该 Snowflake 用户设置在该数据库中创建对象所需的权限,即授予 USAGE 和 CREATE SCHEMA 权限。
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
创建安全密钥对(公钥和私钥)。将用户的私钥存储在文件中,以提供给连接器的配置。将公钥分配给 Snowflake 服务用户:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
有关更多信息,请参阅 密钥对。
指定一个仓库供连接器使用。从
MEDIUM
仓库规模开始,然后根据要复制的表的数量和传输的数据量尝试规模。相较于扩大仓库规模,采用 多集群仓库 通常能更有效地应对海量表数量的扩展需求。
将连接器定义导入 Openflow¶
导航到 Openflow“Overview”页面。在 Featured connectors 部分中,选择 View more connectors。
在 Openflow 连接器页面上,找到连接器并选择 Add to runtime。
在 Select runtime 对话框中,从 Available runtimes 下拉列表中选择您的运行时。
选择 Add。
备注
在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。
使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。
使用您的 Snowflake 账户凭据进行运行时身份验证。
此时将显示 Openflow 画布,其中添加了连接器进程组。
配置连接器¶
您可以为以下用例配置连接器:
实时复制一组表¶
右键点击导入的进程组并选择 Parameters。
按照 流参数 中所述填充所需的参数值。
流参数¶
首先设置 PostgreSQL 源参数上下文的参数,然后设置 PostgreSQL 目标参数上下文的参数。完成此操作后,您就可以启用连接器,它应该会同时连接到 PostgreSQL 和 Snowflake,并开始运行。但在未显式将任何表添加到配置之前,它不会复制任何数据。
要为复制配置特定的表,请编辑 PostgreSQL 引入参数上下文。在对复制参数上下文应用更改后不久,连接器会检测到这些配置,并为每张表启动复制生命周期。
PostgreSQL 源参数上下文¶
参数 |
描述 |
---|---|
Postgres Connection URL |
指向源数据库的完整 JDBC URL。示例: |
Postgres JDBC Driver |
PostgreSQL JDBC 驱动程序 jar (https://jdbc.postgresql.org/) 的路径。从其网站下载 jar,然后选中 Reference asset 复选框将其上传并附加。 |
Postgres SSL Mode |
启用或禁用 SSL 连接。 |
Postgres Root SSL Certificate |
数据库的根证书的全部内容。如果已禁用 SSL,则为可选。 |
Postgres Username |
连接器的用户名。 |
Postgres Password |
连接器的密码。 |
Publication Name |
您之前创建的发布的名称。 |
PostgreSQL 目标参数上下文¶
参数 |
描述 |
---|---|
目标数据库 |
用于永久保存数据的数据库。它必须已经存在于 Snowflake 中。 |
Snowflake 账户标识符 |
Snowflake 账户名称格式为 [organization-name]-[account-name],数据永久保存在其中 |
Snowflake 身份验证策略 |
对 Snowflake 进行身份验证的策略。可能的值:当您在 SPCS 上运行流时为 |
Snowflake 私钥 |
用于身份验证的 RSA 私钥。RSA 密钥必须按照 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。请注意,必须定义 Snowflake 私钥文件或 Snowflake 私钥。 |
Snowflake 私钥文件 |
该文件包含用于对 Snowflake 进行身份验证的 RSA 私钥,该私钥根据 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。头标记行以 |
Snowflake 私钥密码 |
与 Snowflake 私钥文件关联的密码 |
Snowflake 角色 |
查询执行期间使用的 Snowflake 角色 |
Snowflake 用户名 |
用于连接到 Snowflake 实例的用户名 |
Snowflake 仓库 |
用于运行查询的 Snowflake 仓库 |
PostgreSQL 引入参数上下文¶
参数 |
描述 |
---|---|
包括表名 |
以逗号分隔的表路径列表,包括其架构。示例: |
包括表正则表达式 |
用于匹配表路径的正则表达式。与表达式匹配的每个路径都将被复制,并且还将自动包括与稍后创建的模式相匹配的新表。示例: |
筛选器 JSON |
一个 JSON,包含一个完全限定的表名列表和一个用于列名的正则表达式模式,这些列名应该包含在复制中。示例: |
合并任务计划 CRON |
定义触发从日志到目标表的合并任务的 CRON 表达式。如果您想持续合并或按照计划时间来限制仓库运行时间,请将其设置为 例如:
有关其他信息和示例,请参阅 Quartz 文档 (https://www.quartz-scheduler.org/documentation/quartz-2.2.2/tutorials/tutorial-lesson-06.html) 中的 cron 触发教程 |
从复制中移除并重新添加表¶
要从复制中移除表,请确保在复制参数上下文中的 包括表名
或 包括表正则表达式
参数里移除。
如果您想稍后将该表重新添加到复制中,请先在 Snowflake 中删除相应的目标表。然后,再将该表添加回 包括表名
或 包括表正则表达式
参数中。这样可以确保表的复制过程重新开始。
此方法还可用于从失败的表复制场景中恢复复制。
复制表中列的子集¶
连接器可以将每张表复制的数据筛选到已配置列的子集。
要对列应用筛选器,请在复制参数上下文中修改 Column Filter 属性,添加一个配置数组,每个条目对应一张需要筛选列的表。
可以按名称或模式包含或排除列。您可以对每张表应用一个条件,也可以合并多个条件,排除项的优先级始终高于包含项。
以下示例显示了可用的字段。schema
和 table
是必填项,且必须至少填写以下其中一个字段:included
、excluded
、includedPattern
、excludedPattern
。
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
跟踪表中的数据变化¶
连接器不仅复制源表中数据的当前状态,还复制每个变更集中每行的每个状态。这些数据存储在与目标表相同的架构中创建的日志表中。
日志表名称的格式为:<source table name>_JOURNAL_<timestamp>_<schema generation>
其中,<timestamp>
is the value of epoch seconds when the source table was added to replication, and <schema generation>
是一个整数,随着源表的每一次架构更改而递增。这意味着经过架构更改的源表将有多个日志表。
当表从复制中移除后又重新添加时,<timestamp>
value will change, and <schema generation>
will start again from 1
。
重要
Snowflake 建议您不要以任何方式更改日志表或其中的数据。作为复制过程的一部分,连接器使用它们来更新目标表。
连接器永远不会删除日志表,但只会为每个复制的源表积极使用最新的日志。如果您想回收存储空间,可以安全地删除与已从复制中移除的源表相关的日志表,以及除最新一代主动复制表之外的所有日志表。
例如,如果您的连接器设置为主动复制源表 orders
,并且您之前已将表 customers
从复制中移除,则可能存在以下日志表:在这种情况下,您可以把它们全部删除,仅保留 orders_5678_2
。
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
配置合并任务的调度¶
连接器使用仓库将变更数据捕获 (CDC) 数据合并到目标表中。此操作由 MergeSnowflakeJournalTable 处理器触发。如果没有新的更改,或者 MergeSnowflakeJournalTable 队列中没有新的待处理的 FlowFile,则不会触发合并,仓库会自动暂停。
要限制仓库成本并将合并仅限于预定时间,可以在合并任务计划 CRON 参数中使用 CRON 表达式。它限制了流向 MergeSnowflakeJournalTable 处理器的 FlowFile,并且只能在特定的时间段内触发合并。有关调度的更多信息,请参阅 调度策略 (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy)。
停止或删除连接器¶
停止或移除连接器时,需要注意连接器使用的 复制槽 (https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS)。
连接器会创建自己的复制槽,其名称以 snowflake_connector_
开头,后跟随机后缀。当连接器读取复制流时会推进复制槽的位置,使 PostgreSQL 能够清理 WAL 日志并释放磁盘空间。
当连接器暂停时,复制槽不会被推进,对源数据库的更改会不断增加 WAL 日志大小。因此,不建议长时间暂停连接器,尤其是在高流量数据库中。
移除连接器时,无论是将其从 Openflow 画布中删除,还是通过任何其他方式(例如删除整个 Openflow 实例),复制槽都将保持原位,必须手动删除。
如果您有多个连接器实例从同一个 PostgreSQL 数据库进行复制,则每个实例将创建自己的唯一命名的复制槽。手动删除复制槽时,请确保删除的是正确的复制槽。通过检查 CaptureChangePostgreSQL
处理器的状态,您可以查看给定连接器实例使用的是哪个复制槽。
运行流¶
右键点击“飞机”图标并选择 Enable all Controller Services。
右键点击导入的进程组并选择 Start。连接器开始数据引入。