关于 Openflow Connector for MySQL¶
备注
This connector is subject to the Snowflake Connector Terms.
本主题介绍 Openflow Connector for MySQL 的基本概念、工作流和限制。
About the Openflow Connector for MySQL¶
Openflow Connector for MySQL 可将 MySQL 数据库实例连接到 Snowflake,并近乎实时或按指定计划从选定表中复制数据。该连接器还会创建一个包含所有数据更改的日志,可与当前已复制的表状态一起提供。
Use cases¶
如果您希望实现以下功能,可以使用此连接器:
将 MySQL 表的 CDC 复制到 Snowflake,实现全面集中报告
支持的 MySQL 版本¶
下表列出了经过测试和官方支持的 MySQL 版本。
8.0 |
8.4 |
|
|---|---|---|
Standard (https://www.mysql.com/) |
是 |
是 |
AWS RDS (https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_MySQL.html) |
是 |
是 |
Amazon Aurora (https://docs.aws.amazon.com/AmazonRDS/latest/AuroraMySQLReleaseNotes/Welcome.html) |
是,与版本 3 相同 |
Not applicable. Aurora 8.4 is not currently supported. |
GCP Cloud SQL (https://cloud.google.com/sql/mysql?hl=en) |
是 |
是 |
Azure Database (https://azure.microsoft.com/en-us/products/mysql/) |
是 |
是 |
Openflow 要求¶
运行时大小必须至少为“Medium”。复制大容量数据时请使用更大的运行时环境,尤其是在行大小较大的情况下。
该连接器不支持多节点 Openflow 运行时。使用 Min nodes 配置此连接器的运行时,并将 Max nodes 设置为
1。
限制¶
该连接器支持 MySQL 版本 8 或更高版本。
连接器仅支持使用用户名和密码的方式进行身份验证,以连接 MySQL。
Only database tables containing primary keys can be replicated.
The connector does not replicate individual values larger than 16 MB. By default, processing such a value results in the associated table being marked permanently failed. To prevent table failures, modify the Oversized Value Strategy destination parameter.
如果表中的数据超出 Snowflake 的类型限制,则连接器无法复制该表。
连接器不复制以下类型的列:GEOMETRY、GEOMETRYCOLLECTION、LINESTRING、MULTILINESTRING、MULTIPOINT、MULTIPOLYGON、POINT 和 POLYGON。
该连接器具有 MySQL 组复制限制 (https://dev.mysql.com/doc/refman/8.4/en/group-replication-limitations.html#group-replication-limitations-transaction-size)。这意味着单个事务必须适合大小不超过 4 GB 的二进制日志消息。
The connector does not support replicating tables from a reader instance in Amazon Aurora as Aurora reader instances do not maintain their own binary logs.
该连接器支持源表架构更改,但不支持更改主键定义、数字列的精度或小数位数。
The connector does not support re-adding a column after it is dropped.
For
DATEandDATETIMEtypes, any values that contain a zero month or day are mapped to the Unix epoch ('1970-01-01' or '1970-01-01T00:00'). Date zero ('0000-00-00') is also mapped to the Unix epoch. Values with a zero year are converted to year one, for example, '0000-05-30 7:59:59' becomes '0001-05-30T7:59:59'). The remaining date and time components are unchanged.For
TIMESTAMPtype, value '0000-00-00 00:00:00' is mapped to the Unix EPOCH ('1970-01-01T00:00Z').
备注
通过将这些特定列排除在复制范围之外,可以绕过影响某些表列的限制。
工作流程¶
MySQL 数据库管理员 执行以下任务:
配置 MySQL 复制设置
为连接器创建证书
(可选)提供 SSL 证书。
Snowflake 账户管理员 执行以下任务:
为连接器创建服务用户,为连接器创建仓库,为复制数据创建目标数据库。
安装连接器。
为流模板指定必需的参数。
运行流程。连接器在 Openflow 中运行时执行以下任务:
创建一个架构,用于日志表。
创建与源表对应的架构和目标表,用于复制。
开始复制表。有关复制流程的详细信息,请参阅 如何复制表。
How the connector works¶
The following sections describe how the connector works in various scenarios, including replication, changes in schema, and data retention.
如何复制表¶
表的复制分为以下几个阶段:
架构自检:连接器会发现源表中的列、包括列名和数据类型,然后根据 Snowflake 和连接器的 限制 来验证它们。验证失败会导致此阶段失败,并且周期完成。如果成功完成该阶段,连接器将创建一个空目标表。
快照加载:连接器将源表中的所有可用数据复制到目标表中。如果此阶段失败,则不会再复制任何数据。成功完成后,源表中的数据将在目标表中可用。
增量加载连接器跟踪源表中的更改并将这些更改应用到目标表。此过程将一直持续,直到从复制中移除该表。如果该阶段失败,将永久停止源表的复制,直到问题解决。
备注
This connector can be configured to immediately start replicating incremental changes for newly added tables, bypassing the snapshot load phase. This option is often useful when reinstalling the connector in an account where previously replicated data exists and you want to continue replication without having to re-snapshot tables.
For details on the bypassing snapshot load and using the incremental load process, see Incremental replication.
重要
Interim failures, such as connection errors, do not prevent tables from being replicated. Permanent failures, such as unsupported data types, do prevent tables from being replicated. If a permanent failure prevents a table from being replicated, remove the table from the list of replicated tables. After you address the problem that caused the failure, you can add the table back to the list of replicated tables.
Table replication status¶
Interim failures, such as connection errors, do not prevent table replication. However, permanent failures, such as unsupported data types, prevent table replication.
To troubleshoot replication issues or verify that a table has been successfully removed from the replication flow, check the Table State Store:
In the Openflow runtime canvas, right-click a processor group and choose Controller Services. A table listing controller services displays.
Locate the row labeled Table State Store, click the More
button on the right side of the row, and then choose View State.
A list of tables and their current states displays. Type in the search box to filter the list by table name. The possible states are:
NEW: The table is scheduled for replication but replication hasn't started.
SNAPSHOT_REPLICATION: The connector is copying existing data. This status displays until all records are stored in the destination table.
INCREMENTAL_REPLICATION: The connector is actively replicating changes. This status displays after snapshot replication ends and continues to display indefinitely until a table is either removed from replication or replication fails.
FAILED: Replication has permanently stopped due to an error.
备注
The Openflow runtime canvas doesn't display table status changes --- only the current table status. However, table status changes are recorded in logs when they occur. Look for the following log message:
Replication state for table <database_name>.<schema_name>.<table_name> changed from <old_state> to <new_state>
If a permanent failure prevents table replication, remove the table from replication. After you address the problem that caused the failure, you can add the table back to replication. For more information, see Restart table replication.
Understanding data retention¶
The connector follows a data retention philosophy where customer data is never automatically deleted. You maintain full ownership and control over your replicated data, and the connector preserves historical information rather than permanently removing it.
This approach has the following implications:
Rows deleted from the source table are soft-deleted in the destination table rather than physically removed.
Columns dropped from the source table are renamed in the destination table rather than dropped.
Journal tables are retained indefinitely and are not automatically cleaned up.
Destination table metadata columns¶
Each destination table includes the following metadata columns that track replication information:
Column name |
Type |
Description |
|---|---|---|
|
TIMESTAMP_NTZ |
The timestamp when the row was originally inserted into the destination table. |
|
TIMESTAMP_NTZ |
The timestamp when the row was last updated in the destination table. |
|
BOOLEAN |
Indicates whether the row was deleted from the source table. When |
Soft-deleted rows¶
When a row is deleted from the source table, the connector does not physically remove it from the
destination table. Instead, the row is marked as deleted by setting the _SNOWFLAKE_DELETED metadata
column to true.
This approach allows you to:
Retain historical data for auditing or compliance purposes.
Query deleted records when needed.
Decide when and how to permanently remove data based on your requirements.
To query only active (non-deleted) rows, filter on the _SNOWFLAKE_DELETED column:
SELECT * FROM my_table WHERE _SNOWFLAKE_DELETED = FALSE;
To query deleted rows:
SELECT * FROM my_table WHERE _SNOWFLAKE_DELETED = TRUE;
Dropped columns¶
When a column is dropped from the source table, the connector does not drop the corresponding column
from the destination table. Instead, the column is renamed by appending the __SNOWFLAKE_DELETED suffix
to preserve historical values.
For example, if a column named EMAIL is dropped from the source table, it is renamed to
EMAIL__SNOWFLAKE_DELETED in the destination table. Rows that existed before the column was dropped
retain their original values, while rows added after the drop have NULL in this column.
You can still query historical values from the renamed column:
SELECT EMAIL__SNOWFLAKE_DELETED FROM my_table;
Renamed columns¶
Due to limitations in CDC (Change Data Capture) mechanisms, the connector cannot distinguish between a column being renamed and a column being dropped followed by a new column being added. As a result, when you rename a column in the source table, the connector treats this as two separate operations: dropping the original column and adding a new column with the new name.
For example, if you rename a column from A to B in the source table, the destination table
will contain:
A__SNOWFLAKE_DELETED: Contains values from before the rename. Rows added after the rename haveNULLin this column.B: Contains values from after the rename. Rows that existed before the rename haveNULLin this column.
Querying renamed columns¶
To retrieve data from both the original and renamed columns as a single unified column, use a
COALESCE or CASE expression:
SELECT
COALESCE(B, A__SNOWFLAKE_DELETED) AS A_RENAMED_TO_B
FROM my_table;
Alternatively, using a CASE expression:
SELECT
CASE
WHEN B IS NOT NULL THEN B
ELSE A__SNOWFLAKE_DELETED
END AS A_RENAMED_TO_B
FROM my_table;
Creating a view for renamed columns¶
Rather than manually modifying the destination table, you can create a view that presents the renamed column as a single unified column. This approach is recommended because it preserves the original data and avoids potential issues with ongoing replication.
CREATE VIEW my_table_unified AS
SELECT
*,
COALESCE(B, A__SNOWFLAKE_DELETED) AS A_RENAMED_TO_B
FROM my_table;
重要
Manually modifying the destination table structure (such as dropping or renaming columns) is not recommended, as it may interfere with ongoing replication and cause data inconsistencies.
Journal tables¶
During incremental replication, changes from the source database are first written to journal tables before being merged into the destination tables. The connector does not automatically remove data from journal tables, as this data may be useful for auditing, debugging, or reprocessing purposes.
Journal tables are created in the same schema as their corresponding destination tables and follow this naming convention:
<TABLE_NAME>_JOURNAL_<timestamp>_<number>
Where:
<TABLE_NAME>is the name of the destination table.<timestamp>is the creation timestamp in Unix epoch format (seconds since January 1, 1970), ensuring uniqueness.<number>starts at 1 and increments whenever the destination table schema changes, either due to schema changes in the source table or modifications to column filters.
For example, if your destination table is SALES.ORDERS, the journal table might be named
SALES.ORDERS_JOURNAL_1705320000_1.
重要
Do not drop journal tables while replication is in progress. Removing an active journal table may cause data loss or replication failures. Only drop journal tables after the corresponding source table has been fully removed from replication.
Managing journal table storage¶
If you need to manage storage costs by removing old journal data, you can create a Snowflake task that periodically cleans up journal tables for tables that are no longer being replicated.
Before implementing journal cleanup, verify that:
The corresponding source tables have been fully removed from replication.
You no longer need the journal data for auditing or processing purposes.
For information on creating and managing tasks for automated cleanup, see Introduction to tasks.
后续步骤¶
Review Openflow Connector for MySQL:选择使用 时默认使用的角色和仓库。数据映射 to understand how the connector maps data types to Snowflake data types.
Review 设置 Openflow Connector for MySQL to set up the connector.