设置 Openflow Connector for MySQL¶
备注
使用该连接器需遵守 连接器条款。
本主题介绍设置 Openflow Connector for MySQL 的步骤。
先决条件¶
确保您已查看 关于 Openflow Connector for MySQL。
确保您有 MySQL 8 或更高版本来与 Snowflake 同步数据。
确保您已 设置 Openflow。
作为数据库管理员,请执行以下任务:
启用 二进制日志 (https://dev.mysql.com/doc/refman/8.4/en/binary-log.html),然后按如下方式保存和配置其格式:
log_bin
设置为
on
。这将启用记录结构和数据更改的二进制日志。
binlog_format
设置为
row
。该连接器仅支持基于行的复制。MySQL 8.x 版本可能是最后一个支持此设置的版本,将来的版本将仅支持基于行的复制。
不适用于 GCP Cloud SQL,会固定在正确的值。
binlog_row_metadata
设置为
full
。连接器需要所有行元数据才能运行,最重要的是,列名称和主键信息。
binlog_row_image
设置为
full
。连接器要求将所有列写入二进制日志。
不适用于 Amazon Aurora,会固定在正确的值。
binlog_row_value_options
留空。
此选项仅影响 JSON 列,在这些列中可以设置为仅包含 JSON 文档中针对
UPDATE
语句修改过的部分。连接器要求将完整文档写入二进制日志。binlog_expire_logs_seconds
设置为至少几个小时或更长时间,以确保数据库代理可以在长时间暂停或停机后继续增量复制。Snowflake 建议您将 二进制日志有效期 (binlog_expire_logs_seconds) (https://dev.mysql.com/doc/refman/8.4/en/replication-options-binary-log.html#sysvar_binlog_expire_logs_seconds) 设置为至少几个小时,以确保连接器稳定运行。二进制日志有效期结束后,二进制日志文件可能会被自动移除。如果集成长时间暂停(例如由于维护工作),并且在此期间删除了过期的二进制日志文件,则 Openflow 无法从这些文件复制数据。
如果您使用的是计划复制,则该值需要长于配置的计划。
以下面的代码为例:
log_bin = on binlog_format = row binlog_row_metadata = full binlog_row_image = full binlog_row_value_options =
增加
sort_buffer_size
的值。sort_buffer_size = 4194304
sort_buffer_size
定义为内存中排序操作分配的每个查询线程的内存量(以字节为单位),例如 ORDER BY。如果该值太小,则连接器可能会出现故障,并显示以下错误消息:Out of sort memory, consider increasing server sort buffer size
。这表明应该提高sort_buffer_size
。如果您使用的是 Amazon RDS 数据库,那么使用
rds_set_configuration
延长与binlog_expire_logs_seconds
相关的保留期。例如,如果您想将二进制日志存储 24 小时,请致电mysql.rds_set_configuration('binlog retention hours', 24)
。通过 SSL 连接。如果您计划使用 SSL 连接 MySQL,请为数据库服务器准备根证书。在配置期间会用到该证书。
为连接器创建用户。连接器要求具有 REPLICATION_SLAVE 和 REPLICATION_CLIENT 权限的用户才能读取二进制日志。授予以下权限:
GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%' GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
授予对每个复制表的 SELECT 权限:
GRANT SELECT ON <schema>.* TO '<username>'@'%' GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
有关复制安全性的更多信息,请参阅 二进制文档 (https://dev.mysql.com/doc/refman/8.4/en/binary-log.html)。
作为 Snowflake 账户管理员,请执行以下任务:
创建一个类型为 SERVICE 的 Snowflake 用户。创建一个用于存储复制数据的数据库,并为该 Snowflake 用户设置在该数据库中创建对象所需的权限,即授予 USAGE 和 CREATE SCHEMA 权限。
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
创建安全密钥对(公钥和私钥)。将用户的私钥存储在文件中,以提供给连接器的配置。将公钥分配给 Snowflake 服务用户:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
有关更多信息,请参阅 密钥对。
指定一个仓库供连接器使用。从
MEDIUM
仓库规模开始,然后根据要复制的表的数量和传输的数据量尝试规模。相较于扩大仓库规模,采用 多集群仓库 通常能更有效地应对海量表数量的扩展需求。
设置连接器¶
作为数据工程师,执行以下任务以安装和配置连接器:
安装连接器¶
导航到 Openflow“Overview”页面。在 Featured connectors 部分中,选择 View more connectors。
在 Openflow 连接器页面上,找到连接器并选择 Add to runtime。
在 Select runtime 对话框中,从 Available runtimes 下拉列表中选择您的运行时。
选择 Add。
备注
在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。
使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。
使用您的 Snowflake 账户凭据进行运行时身份验证。
此时将显示 Openflow 画布,其中添加了连接器进程组。
配置连接器¶
您可以为以下用例配置连接器:
实时复制一组表¶
右键点击导入的进程组并选择 Parameters。
按照 流参数 中所述填充所需的参数值。
流参数¶
首先设置 MySQL 源参数上下文的参数,然后设置 MySQL 目标参数上下文的参数。完成此操作后,您可以启用连接器。连接器应同时连接到 MySQL 和Snowflake,并开始运行。但是,在将任何要复制的表明确添加到其配置中之前,连接器不会复制任何数据。
要为复制配置特定的表,请编辑 MySQL 引入参数上下文。将更改应用到复制参数上下文后,连接器会选择配置,并且每个表的复制生命周期都将开始。
MySQL 源参数上下文¶
参数 |
描述 |
---|---|
MySQL 连接 URL |
源数据库的完整 JDBC URL。连接器使用 MariaDB 驱动程序,该驱动程序 MySQL 兼容,并且 URL 需要以 示例:
|
MySQL JDBC 驱动程序 |
MariaDB JDBC 驱动程序 jar (https://mariadb.com/downloads/connectors/connectors-data-access/java8-connector/) 的绝对路径。连接器使用与 MySQL 兼容的 MariaDB 驱动程序。选中 Reference asset 复选框以上传 MariaDB JDBC 驱动程序。 示例: |
MySQL 用户名 |
连接器的用户名。 |
MySQL 密码 |
连接器的密码。 |
MySQL 目标参数上下文¶
参数 |
描述 |
---|---|
目标数据库 |
用于永久保存数据的数据库。它必须已经存在于 Snowflake 中。 |
Snowflake 账户标识符 |
Snowflake 账户名称格式为 [organization-name]-[account-name],数据永久保存在其中 |
Snowflake 身份验证策略 |
对 Snowflake 进行身份验证的策略。可能的值:当您在 SPCS 上运行流时为 |
Snowflake 私钥 |
用于身份验证的 RSA 私钥。RSA 密钥必须按照 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。请注意,必须定义 Snowflake 私钥文件或 Snowflake 私钥。 |
Snowflake 私钥文件 |
该文件包含用于对 Snowflake 进行身份验证的 RSA 私钥,该私钥根据 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。头标记行以 |
Snowflake 私钥密码 |
与 Snowflake 私钥文件关联的密码 |
Snowflake 角色 |
查询执行期间使用的 Snowflake 角色 |
Snowflake 用户名 |
用于连接到 Snowflake 实例的用户名 |
Snowflake 仓库 |
用于运行查询的 Snowflake 仓库 |
MySQL 引入参数上下文¶
参数 |
描述 |
---|---|
包括表名 |
以逗号分隔的表路径列表,包括其架构。示例: |
包括表正则表达式 |
用于匹配表路径的正则表达式。与表达式匹配的每个路径都将被复制,并且还将自动包括与稍后创建的模式相匹配的新表。示例: |
筛选器 JSON |
一个 JSON,包含一个完全限定的表名列表和一个用于列名的正则表达式模式,这些列名应该包含在复制中。示例: |
合并任务计划 CRON |
定义触发从日志到目标表的合并任务的 CRON 表达式。如果您想持续合并或按照计划时间来限制仓库运行时间,请将其设置为 |
从复制中移除并重新添加表¶
要从复制中移除表,请确保在复制参数上下文中的 包括表名
或 包括表正则表达式
参数里移除。
如果您想稍后将该表重新添加到复制中,请先在 Snowflake 中删除相应的目标表。然后,再将该表添加回 包括表名
或 包括表正则表达式
参数中。这样可以确保表的复制过程重新开始。
此方法还可用于从失败的表复制场景中恢复复制。
复制表中列的子集¶
连接器可以将每张表复制的数据筛选到已配置列的子集。
要对列应用筛选器,请在复制参数上下文中修改列筛选器属性,添加一个配置数组,每个条目对应一张需要筛选列的表。
可以按名称或模式包含或排除列。您可以对每张表应用一个条件,也可以合并多个条件,排除项的优先级始终高于包含项。
以下示例显示了可用的字段。schema
和 table
字段是必填字段。必须至少填写以下其中一个字段:included
、excluded
、includedPattern
、excludedPattern
。
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
跟踪表中的数据变化¶
连接器不仅复制源表中数据的当前状态,还复制每个变更集中每行的每个状态。这些数据存储在与目标表相同的架构中创建的日志表中。
日志表名称的格式为:<source table name>_JOURNAL_<timestamp>_<schema generation>
其中,<timestamp>
is the value of epoch seconds when the source table was added to replication, and <schema generation>
是一个整数,随着源表的每一次架构更改而递增。这意味着经过架构更改的源表将有多个日志表。
当表从复制中移除后又重新添加时,<timestamp>
value will change, and <schema generation>
will start again from 1
。
重要
Snowflake 建议您不要以任何方式更改日志表或其中的数据。作为复制过程的一部分,连接器使用它们来更新目标表。
连接器永远不会删除日志表,但只会为每个复制的源表积极使用最新的日志。如果您想回收存储空间,可以安全地删除与已从复制中移除的源表相关的日志表,以及除最新一代主动复制表之外的所有日志表。
例如,如果您的连接器设置为主动复制源表 orders
,并且您之前已将表 customers
从复制中移除,则可能存在以下日志表:在这种情况下,您可以把它们全部删除,仅保留 orders_5678_2
。
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
配置合并任务的调度¶
连接器使用仓库将变更数据捕获 (CDC) 数据合并到目标表中。此操作由 MergeSnowflakeJournalTable 处理器触发。如果没有新的更改,或者 MergeSnowflakeJournalTable 队列中没有新的待处理的 FlowFile,则不会触发合并,仓库会自动暂停。
要限制仓库成本并将合并仅限于预定时间,可以在合并任务计划 CRON 参数中使用 CRON 表达式。它限制了流向 MergeSnowflakeJournalTable 处理器的 FlowFile,并且只能在特定的时间段内触发合并。有关调度的更多信息,请参阅 调度策略 (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy)。
运行流¶
右键点击“飞机”图标并选择 Enable all Controller Services。
右键点击导入的进程组并选择 Start。连接器开始数据引入。