设置 Openflow Connector for MySQL

备注

使用该连接器需遵守 连接器条款

本主题介绍设置 Openflow Connector for MySQL 的步骤。

先决条件

  1. 确保您已查看 关于 Openflow Connector for MySQL

  2. 确保您有 MySQL 8 或更高版本来与 Snowflake 同步数据。

  3. 确保您已 设置 Openflow

  4. 作为数据库管理员,请执行以下任务:

    1. 启用 二进制日志 (https://dev.mysql.com/doc/refman/8.4/en/binary-log.html),然后按如下方式保存和配置其格式:

      log_bin

      设置为 on

      这将启用记录结构和数据更改的二进制日志。

      binlog_format

      设置为 row

      该连接器仅支持基于行的复制。MySQL 8.x 版本可能是最后一个支持此设置的版本,将来的版本将仅支持基于行的复制。

      不适用于 GCP Cloud SQL,会固定在正确的值。

      binlog_row_metadata

      设置为 full

      连接器需要所有行元数据才能运行,最重要的是,列名称和主键信息。

      binlog_row_image

      设置为 full

      连接器要求将所有列写入二进制日志。

      不适用于 Amazon Aurora,会固定在正确的值。

      binlog_row_value_options

      留空。

      此选项仅影响 JSON 列,在这些列中可以设置为仅包含 JSON 文档中针对 UPDATE 语句修改过的部分。连接器要求将完整文档写入二进制日志。

      binlog_expire_logs_seconds

      设置为至少几个小时或更长时间,以确保数据库代理可以在长时间暂停或停机后继续增量复制。Snowflake 建议您将 二进制日志有效期 (binlog_expire_logs_seconds) (https://dev.mysql.com/doc/refman/8.4/en/replication-options-binary-log.html#sysvar_binlog_expire_logs_seconds) 设置为至少几个小时,以确保连接器稳定运行。二进制日志有效期结束后,二进制日志文件可能会被自动移除。如果集成长时间暂停(例如由于维护工作),并且在此期间删除了过期的二进制日志文件,则 Openflow 无法从这些文件复制数据。

      如果您使用的是计划复制,则该值需要长于配置的计划。

      以下面的代码为例:

      log_bin = on
      binlog_format = row
      binlog_row_metadata = full
      binlog_row_image = full
      binlog_row_value_options =
      
      Copy
    2. 增加 sort_buffer_size 的值。

      sort_buffer_size = 4194304
      
      Copy

      sort_buffer_size 定义为内存中排序操作分配的每个查询线程的内存量(以字节为单位),例如 ORDER BY。如果该值太小,则连接器可能会出现故障,并显示以下错误消息:

      Out of sort memory, consider increasing server sort buffer size。这表明应该提高 sort_buffer_size

    3. 如果您使用的是 Amazon RDS 数据库,那么使用 rds_set_configuration 延长与 binlog_expire_logs_seconds 相关的保留期。例如,如果您想将二进制日志存储 24 小时,请致电 mysql.rds_set_configuration('binlog retention hours', 24)

    4. 通过 SSL 连接。如果您计划使用 SSL 连接 MySQL,请为数据库服务器准备根证书。在配置期间会用到该证书。

    5. 为连接器创建用户。连接器要求具有 REPLICATION_SLAVE 和 REPLICATION_CLIENT 权限的用户才能读取二进制日志。授予以下权限:

      GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%'
      GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
      
      Copy
    6. 授予对每个复制表的 SELECT 权限:

      GRANT SELECT ON <schema>.* TO '<username>'@'%'
      GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
      
      Copy

      有关复制安全性的更多信息,请参阅 二进制文档 (https://dev.mysql.com/doc/refman/8.4/en/binary-log.html)。

  5. 作为 Snowflake 账户管理员,请执行以下任务:

    1. 创建一个类型为 SERVICE 的 Snowflake 用户。创建一个用于存储复制数据的数据库,并为该 Snowflake 用户设置在该数据库中创建对象所需的权限,即授予 USAGE 和 CREATE SCHEMA 权限

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
               WAREHOUSE_SIZE = 'MEDIUM'
               AUTO_SUSPEND = 300
               AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. 创建安全密钥对(公钥和私钥)。将用户的私钥存储在文件中,以提供给连接器的配置。将公钥分配给 Snowflake 服务用户:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      有关更多信息,请参阅 密钥对

    3. 指定一个仓库供连接器使用。从 MEDIUM 仓库规模开始,然后根据要复制的表的数量和传输的数据量尝试规模。相较于扩大仓库规模,采用 多集群仓库 通常能更有效地应对海量表数量的扩展需求。

设置连接器

作为数据工程师,执行以下任务以安装和配置连接器:

安装连接器

  1. 导航到 Openflow“Overview”页面。在 Featured connectors 部分中,选择 View more connectors

  2. 在 Openflow 连接器页面上,找到连接器并选择 Add to runtime

  3. Select runtime 对话框中,从 Available runtimes 下拉列表中选择您的运行时。

  4. 选择 Add

    备注

    在安装连接器之前,请确保在 Snowflake 中为连接器创建了数据库和架构,用于存储引入的数据。

  5. 使用您的 Snowflake 账户凭据对部署进行身份验证,并在系统提示时选择 Allow,以允许运行时应用程序访问您的 Snowflake 账户。连接器安装过程需要几分钟才能完成。

  6. 使用您的 Snowflake 账户凭据进行运行时身份验证。

此时将显示 Openflow 画布,其中添加了连接器进程组。

配置连接器

您可以为以下用例配置连接器:

实时复制一组表

  1. 右键点击导入的进程组并选择 Parameters

  2. 按照 流参数 中所述填充所需的参数值。

流参数

首先设置 MySQL 源参数上下文的参数,然后设置 MySQL 目标参数上下文的参数。完成此操作后,您可以启用连接器。连接器应同时连接到 MySQL 和Snowflake,并开始运行。但是,在将任何要复制的表明确添加到其配置中之前,连接器不会复制任何数据。

要为复制配置特定的表,请编辑 MySQL 引入参数上下文。将更改应用到复制参数上下文后,连接器会选择配置,并且每个表的复制生命周期都将开始。

MySQL 源参数上下文

参数

描述

MySQL 连接 URL

源数据库的完整 JDBC URL。连接器使用 MariaDB 驱动程序,该驱动程序 MySQL 兼容,并且 URL 需要以 jdbc:mariadb 为前缀。如果禁用 SSL,则连接 URL 的 allowPublicKeyRetrieval 参数应设置为 true

示例:

  • 启用 SSL 后:jdbc:mariadb://example.com:3306

  • 禁用 SSL 后:jdbc:mariadb://example.com:3306?allowPublicKeyRetrieval=true

MySQL JDBC 驱动程序

MariaDB JDBC 驱动程序 jar (https://mariadb.com/downloads/connectors/connectors-data-access/java8-connector/) 的绝对路径。连接器使用与 MySQL 兼容的 MariaDB 驱动程序。选中 Reference asset 复选框以上传 MariaDB JDBC 驱动程序。

示例:/opt/resources/drivers/mariadb-java-client-3.5.2.jar

MySQL 用户名

连接器的用户名。

MySQL 密码

连接器的密码。

MySQL 目标参数上下文

参数

描述

目标数据库

用于永久保存数据的数据库。它必须已经存在于 Snowflake 中。

Snowflake 账户标识符

Snowflake 账户名称格式为 [organization-name]-[account-name],数据永久保存在其中

Snowflake 身份验证策略

对 Snowflake 进行身份验证的策略。可能的值:当您在 SPCS 上运行流时为 SNOWFLAKE_SESSION_TOKEN,当您想使用私钥设置访问权限时为 KEY_PAIR

Snowflake 私钥

用于身份验证的 RSA 私钥。RSA 密钥必须按照 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。请注意,必须定义 Snowflake 私钥文件或 Snowflake 私钥。

Snowflake 私钥文件

该文件包含用于对 Snowflake 进行身份验证的 RSA 私钥,该私钥根据 PKCS8 标准进行格式化,并具有标准的 PEM 头尾标记。头标记行以 -----BEGIN PRIVATE 开头。选中 Reference asset 复选框,上传私钥文件。

Snowflake 私钥密码

与 Snowflake 私钥文件关联的密码

Snowflake 角色

查询执行期间使用的 Snowflake 角色

Snowflake 用户名

用于连接到 Snowflake 实例的用户名

Snowflake 仓库

用于运行查询的 Snowflake 仓库

MySQL 引入参数上下文

参数

描述

包括表名

以逗号分隔的表路径列表,包括其架构。示例:public.my_table, other_schema.other_table

包括表正则表达式

用于匹配表路径的正则表达式。与表达式匹配的每个路径都将被复制,并且还将自动包括与稍后创建的模式相匹配的新表。示例:public\.auto_.*

筛选器 JSON

一个 JSON,包含一个完全限定的表名列表和一个用于列名的正则表达式模式,这些列名应该包含在复制中。示例:[ {"schema":"public", "table":"table1", "includedPattern":".*name"} ] 将包括 public 架构的 table1 中所有以 name 结尾的列。

合并任务计划 CRON

定义触发从日志到目标表的合并任务的 CRON 表达式。如果您想持续合并或按照计划时间来限制仓库运行时间,请将其设置为 * * * * * ?。例如,字符串 * 0 * * * ? 表示您要在每小时整点计划合并,持续一分钟。字符串 * 20 14 ? * MON-FRI 表示您计划在周一到周五每天 2:20 PM 触发合并。有关更多信息和示例,请参阅 CronTrigger 教程 (https://www.quartz-scheduler.org/documentation/quartz-2.2.2/tutorials/tutorial-lesson-06.html)。

从复制中移除并重新添加表

要从复制中移除表,请确保在复制参数上下文中的 包括表名包括表正则表达式 参数里移除。

如果您想稍后将该表重新添加到复制中,请先在 Snowflake 中删除相应的目标表。然后,再将该表添加回 包括表名包括表正则表达式 参数中。这样可以确保表的复制过程重新开始。

此方法还可用于从失败的表复制场景中恢复复制。

复制表中列的子集

连接器可以将每张表复制的数据筛选到已配置列的子集。

要对列应用筛选器,请在复制参数上下文中修改列筛选器属性,添加一个配置数组,每个条目对应一张需要筛选列的表。

可以按名称或模式包含或排除列。您可以对每张表应用一个条件,也可以合并多个条件,排除项的优先级始终高于包含项。

以下示例显示了可用的字段。schematable 字段是必填字段。必须至少填写以下其中一个字段:includedexcludedincludedPatternexcludedPattern

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

跟踪表中的数据变化

连接器不仅复制源表中数据的当前状态,还复制每个变更集中每行的每个状态。这些数据存储在与目标表相同的架构中创建的日志表中。

日志表名称的格式为:<source table name>_JOURNAL_<timestamp>_<schema generation>

其中,<timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> 是一个整数,随着源表的每一次架构更改而递增。这意味着经过架构更改的源表将有多个日志表。

当表从复制中移除后又重新添加时,<timestamp> value will change, and <schema generation> will start again from 1

重要

Snowflake 建议您不要以任何方式更改日志表或其中的数据。作为复制过程的一部分,连接器使用它们来更新目标表。

连接器永远不会删除日志表,但只会为每个复制的源表积极使用最新的日志。如果您想回收存储空间,可以安全地删除与已从复制中移除的源表相关的日志表,以及除最新一代主动复制表之外的所有日志表。

例如,如果您的连接器设置为主动复制源表 orders,并且您之前已将表 customers 从复制中移除,则可能存在以下日志表:在这种情况下,您可以把它们全部删除,仅保留 orders_5678_2

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

配置合并任务的调度

连接器使用仓库将变更数据捕获 (CDC) 数据合并到目标表中。此操作由 MergeSnowflakeJournalTable 处理器触发。如果没有新的更改,或者 MergeSnowflakeJournalTable 队列中没有新的待处理的 FlowFile,则不会触发合并,仓库会自动暂停。

要限制仓库成本并将合并仅限于预定时间,可以在合并任务计划 CRON 参数中使用 CRON 表达式。它限制了流向 MergeSnowflakeJournalTable 处理器的 FlowFile,并且只能在特定的时间段内触发合并。有关调度的更多信息,请参阅 调度策略 (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy)。

运行流

  1. 右键点击“飞机”图标并选择 Enable all Controller Services

  2. 右键点击导入的进程组并选择 Start。连接器开始数据引入。

语言: 中文