设置 Openflow Connector for MySQL¶
备注
This connector is subject to the Snowflake Connector Terms.
本主题介绍设置 Openflow Connector for MySQL 的步骤。
先决条件¶
确保您已查看 关于 Openflow Connector for MySQL。
Ensure that you have 设置 Openflow - BYOC or Set up Openflow - Snowflake Deployments.
If using Openflow - Snowflake Deployments, ensure that you've reviewed configuring required domains and have granted access to the required domains for the MySQL connector.
确保您有 MySQL 8 或更高版本来与 Snowflake 同步数据。
Recommended: Ensure that you add only one connector instance per runtime.
作为数据库管理员,请执行以下任务:
启用 二进制日志 (https://dev.mysql.com/doc/refman/8.4/en/binary-log.html),然后按如下方式保存和配置其格式:
log_bin设置为
on。这将启用记录结构和数据更改的二进制日志。
binlog_format设置为
row。该连接器仅支持基于行的复制。MySQL 8.x 版本可能是最后一个支持此设置的版本,将来的版本将仅支持基于行的复制。
不适用于 GCP Cloud SQL,会固定在正确的值。
binlog_row_metadata设置为
full。连接器需要所有行元数据才能运行,最重要的是,列名称和主键信息。
binlog_row_image设置为
full。连接器要求将所有列写入二进制日志。
不适用于 Amazon Aurora,会固定在正确的值。
binlog_row_value_options留空。
此选项仅影响 JSON 列,在这些列中可以设置为仅包含 JSON 文档中针对
UPDATE语句修改过的部分。连接器要求将完整文档写入二进制日志。binlog_expire_logs_seconds设置为至少几个小时或更长时间,以确保数据库代理可以在长时间暂停或停机后继续增量复制。Snowflake 建议您将 二进制日志有效期 (binlog_expire_logs_seconds) (https://dev.mysql.com/doc/refman/8.4/en/replication-options-binary-log.html#sysvar_binlog_expire_logs_seconds) 设置为至少几个小时,以确保连接器稳定运行。二进制日志有效期结束后,二进制日志文件可能会被自动移除。如果集成长时间暂停(例如由于维护工作),并且在此期间删除了过期的二进制日志文件,则 Openflow 无法从这些文件复制数据。
如果您使用的是计划复制,则该值需要长于配置的计划。
For example:
log_bin = on binlog_format = row binlog_row_metadata = full binlog_row_image = full binlog_row_value_options =
增加
sort_buffer_size的值。sort_buffer_size = 4194304
sort_buffer_size定义为内存中排序操作分配的每个查询线程的内存量(以字节为单位),例如 ORDER BY。如果该值太小,则连接器可能会出现故障,并显示以下错误消息:Out of sort memory, consider increasing server sort buffer size。这表明应该提高sort_buffer_size。如果您使用的是 Amazon RDS 数据库,那么使用
rds_set_configuration延长与binlog_expire_logs_seconds相关的保留期。例如,如果您想将二进制日志存储 24 小时,请致电mysql.rds_set_configuration('binlog retention hours', 24)。When using a read replica to connect, binary logging must be enabled on the replica.
Configuration details are provided in step 4.
After binary logging is enabled, configure the replica to log the events received from its source into its own binary log.
log_replica_updates = ON
log_replica_updatesallows the replica to write events received from its source to its own binary log, making those changes available to any databases that are replicating from it.通过 SSL 连接。如果您计划使用 SSL 连接 MySQL,请为数据库服务器准备根证书。在配置期间会用到该证书。
为连接器创建用户。连接器要求具有 REPLICATION_SLAVE 和 REPLICATION_CLIENT 权限的用户才能读取二进制日志。授予以下权限:
GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%' GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
授予对每个复制表的 SELECT 权限:
GRANT SELECT ON <schema>.* TO '<username>'@'%' GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
有关复制安全性的更多信息,请参阅 二进制文档 (https://dev.mysql.com/doc/refman/8.4/en/binary-log.html)。
作为 Snowflake 账户管理员,请执行以下任务:
创建一个类型为 SERVICE 的 Snowflake 用户。创建一个用于存储复制数据的数据库,并为该 Snowflake 用户设置在该数据库中创建对象所需的权限,即授予 USAGE 和 CREATE SCHEMA 权限。
CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
创建安全密钥对(公钥和私钥)。将用户的私钥存储在文件中,以提供给连接器的配置。将公钥分配给 Snowflake 服务用户:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
有关更多信息,请参阅 密钥对。
指定一个仓库供连接器使用。从
MEDIUM仓库规模开始,然后根据要复制的表的数量和传输的数据量尝试规模。相较于扩大仓库规模,采用 多集群仓库 通常能更有效地应对海量表数量的扩展需求。
设置连接器¶
作为数据工程师,执行以下任务以安装和配置连接器:
安装连接器¶
导航到 Openflow“Overview”页面。在 Featured connectors 部分中,选择 View more connectors。
在 Openflow 连接器页面上,找到连接器并选择 Add to runtime。
在 Select runtime 对话框中,从 Available runtimes 下拉列表中选择您的运行时。
选择 Add。
备注
在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。
使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。
使用您的 Snowflake 账户凭据进行运行时身份验证。
此时将显示 Openflow 画布,其中添加了连接器进程组。
配置连接器¶
您可以为以下用例配置连接器:
实时复制一组表¶
右键点击导入的进程组并选择 Parameters。
按照 流参数 中所述填充所需的参数值。
流参数¶
首先设置 MySQL 源参数上下文的参数,然后设置 MySQL 目标参数上下文的参数。完成此操作后,您可以启用连接器。连接器应同时连接到 MySQL 和Snowflake,并开始运行。但是,在将任何要复制的表明确添加到其配置中之前,连接器不会复制任何数据。
要为复制配置特定的表,请编辑 MySQL 引入参数上下文。将更改应用到复制参数上下文后,连接器会选择配置,并且每个表的复制生命周期都将开始。
MySQL 源参数上下文¶
参数 |
描述 |
|---|---|
MySQL 连接 URL |
指向源数据库的完整 JDBC URL。连接器使用 MariaDB 驱动程序,该驱动程序与 MySQL 兼容,并且 URL 需要以 示例:
|
MySQL JDBC 驱动程序 |
MariaDB JDBC 驱动程序 jar (https://mariadb.com/downloads/connectors/connectors-data-access/java8-connector/) 的绝对路径。连接器使用与 MySQL 兼容的 MariaDB 驱动程序。选中 Reference asset 复选框以上传 MariaDB JDBC 驱动程序。 示例: |
MySQL 用户名 |
连接器的用户名。 |
MySQL 密码 |
连接器的密码。 |
MySQL 目标参数上下文¶
参数 |
描述 |
Required |
|---|---|---|
目标数据库 |
The database where data will be persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. |
Yes |
Snowflake 账户标识符 |
When using:
|
Yes |
Snowflake 身份验证策略 |
When using:
|
Yes |
Snowflake 私钥 |
When using:
|
No |
Snowflake 私钥文件 |
When using:
|
No |
Snowflake 私钥密码 |
When using
|
No |
Snowflake 角色 |
When using
|
Yes |
Snowflake 用户名 |
When using
|
Yes |
Snowflake 仓库 |
Snowflake warehouse used to run queries. |
Yes |
MySQL 引入参数上下文¶
参数 |
描述 |
|---|---|
包括表名 |
以逗号分隔的表路径列表,包括其架构。示例: |
包括表正则表达式 |
用于匹配表路径的正则表达式。与表达式匹配的每个路径都将被复制,并且还将自动包括与稍后创建的模式相匹配的新表。示例: |
筛选器 JSON |
一个 JSON,包含一个完全限定的表名列表和一个用于列名的正则表达式模式,这些列名应该包含在复制中。示例: |
合并任务计划 CRON |
定义触发从日志到目标表的合并任务的 CRON 表达式。如果您想持续合并或按照计划时间来限制仓库运行时间,请将其设置为 |
从复制中移除并重新添加表¶
要从复制中移除表,请确保在复制参数上下文中的 包括表名 或 包括表正则表达式 参数里移除。
如果您想稍后将该表重新添加到复制中,请先在 Snowflake 中删除相应的目标表。然后,再将该表添加回 包括表名 或 包括表正则表达式 参数中。这样可以确保表的复制过程重新开始。
此方法还可用于从失败的表复制场景中恢复复制。
复制表中列的子集¶
连接器可以将每张表复制的数据筛选到已配置列的子集。
要对列应用筛选器,请在复制参数上下文中修改列筛选器属性,添加一个配置数组,每个条目对应一张需要筛选列的表。
可以按名称或模式包含或排除列。您可以对每张表应用一个条件,也可以合并多个条件,排除项的优先级始终高于包含项。
以下示例显示了可用的字段。schema 和 table 字段是必填字段。必须至少填写以下其中一个字段:included、excluded、includedPattern、excludedPattern。
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
跟踪表中的数据变化¶
连接器不仅复制源表中数据的当前状态,还复制每个变更集中每行的每个状态。这些数据存储在与目标表相同的架构中创建的日志表中。
The journal table names are formatted as: <source table name>_JOURNAL_<timestamp>_<schema generation>
where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> is an integer increasing with every schema change on the source table.
As a result, source tables that undergo schema changes will have multiple journal tables.
当表从复制中移除后又重新添加时,<timestamp> value will change, and <schema generation> will start again from 1。
重要
Snowflake recommends that you do not alter the structure of journal tables in any way. They are used by the connector to update the destination table as part of the replication process.
The connector never drops journal tables, but does make use of the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:
Truncate all journal tables at any time.
Drop the journal tables related to source tables that were removed from replication.
Drop all but the latest generation journal tables for actively replicated tables.
例如,如果您的连接器设置为主动复制源表 orders,并且您之前已将表 customers 从复制中移除,则可能存在以下日志表:在这种情况下,您可以把它们全部删除,仅保留 orders_5678_2。
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
配置合并任务的调度¶
连接器使用仓库将变更数据捕获 (CDC) 数据合并到目标表中。此操作由 MergeSnowflakeJournalTable 处理器触发。如果没有新的更改,或者 MergeSnowflakeJournalTable 队列中没有新的待处理的 FlowFile,则不会触发合并,仓库会自动暂停。
要限制仓库成本并将合并仅限于预定时间,可以在合并任务计划 CRON 参数中使用 CRON 表达式。它限制了流向 MergeSnowflakeJournalTable 处理器的 FlowFile,并且只能在特定的时间段内触发合并。有关调度的更多信息,请参阅 调度策略 (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy)。
运行流¶
右键点击“飞机”图标并选择 Enable all Controller Services。
右键点击导入的进程组并选择 Start。连接器开始数据引入。