Set up the Openflow Connector for MySQL¶
Note
This connector is subject to the Snowflake Connector Terms.
This topic describes the steps to set up the Openflow Connector for MySQL.
Note
This connector can be configured to immediately start replicating incremental changes for newly added tables, bypassing the snapshot load phase. This option is often useful when reinstalling the connector in an account where previously replicated data exists and you want to continue replication without having to re-snapshot tables.
For details on the incremental load process, see Incremental replication.
Prerequisites¶
-
Ensure that you have reviewed About Openflow Connector for MySQL.
-
Ensure that you have Set up Openflow - BYOC or Set up Openflow - Snowflake Deployments.
-
If using Openflow - Snowflake Deployments, ensure that you’ve reviewed configuring required domains and have granted access to the required domains for the MySQL connector.
-
Ensure that you have a MySQL 8 or a later version to synchronize data with Snowflake.
-
Recommended: Ensure that you add only one connector instance per runtime.
-
As a database administrator, perform the following tasks:
-
Enable binary logs (https://dev.mysql.com/doc/refman/8.4/en/binary-log.html), then save and configure its format as follows:
log_binSet to
on.This enables the binary log that records structural and data changes.
binlog_formatSet to
row.The connector supports only row-based replication. MySQL 8.x versions may be the last ones to support this setting, and future versions will only support row-based replication.
Not applicable in GCP Cloud SQL, where it is fixed at the right value.
binlog_row_metadataSet to
full.The connector requires all row metadata to operate, most importantly, column names and primary key information.
Under Microsoft Azure Database for MySQL the
binlog_row_metadatafield isn’t user modifiable. Raise a Microsoft support ticket to change this value.binlog_row_imageSet to
full.The connector requires that all columns be written into the binary log.
Not applicable in Amazon Aurora, where it is fixed at the right value.
binlog_row_value_optionsLeave empty.
This option only affects JSON columns, where it can be set to include only the modified parts of JSON documents for
UPDATEstatements. The connector requires that full documents are written into the binary log.binlog_expire_logs_secondsSet to at least a few hours, or longer to ensure that the database agent can continue incremental replication after extended pauses or downtime. Snowflake recommends that you set the binary log expiration period (binlog_expire_logs_seconds) (https://dev.mysql.com/doc/refman/8.4/en/replication-options-binary-log.html#sysvar_binlog_expire_logs_seconds) to at least a few hours to ensure stable working of the connector. After binary log expiration period ends, binary log files might be automatically removed. If the integration is paused for a long period, for example due to maintenance work, and the expired binary log files are deleted during this time, Openflow can’t replicate the data from these files.
If you’re using scheduled replication, the value needs to be longer than the configured schedule.
binlog_legacy_event_posSet to
ON.Required only when the source is MariaDB. The connector requires this flag to track binary log positions correctly during replication.
Not applicable to MySQL.
For example:
-
Increase the value of
sort_buffer_size.sort_buffer_sizedefines the amount of memory (in bytes) allocated per query thread for in-memory sorting operations, such as ORDER BY. If the value is too small, the connector may fail with the following error message:Out of sort memory, consider increasing server sort buffer size. This indicates thatsort_buffer_sizeshould be raised. -
If you’re using Amazon RDS databases, then increase the retention period relevant to binlog_expire_logs_seconds using rds_set_configuration. For example, if you want to store binlog for 24 hours, then call
mysql.rds_set_configuration('binlog retention hours', 24). -
When using a read replica to connect, binary logging must be enabled on the replica.
Configuration details are provided in step 4.
-
After binary logging is enabled, configure the replica to log the events received from its source into its own binary log.
log_replica_updatesallows the replica to write events received from its source to its own binary log, making those changes available to any databases that are replicating from it. -
Connect via SSL. If you’re planning to use an SSL connection to MySQL, prepare the root certificate for your database server. It is required during configuration.
-
Create a user for the connector. The connector requires a user with the REPLICATION_SLAVE and REPLICATION_CLIENT privileges for reading the binary logs. Grant these privileges:
-
Grant the SELECT privilege on every replicated table:
For more information on replication security, see Binary log (https://dev.mysql.com/doc/refman/8.4/en/binary-log.html).
-
-
As a Snowflake account administrator, perform the following tasks:
-
Create a Snowflake user with the type as SERVICE. Create a database to store the replicated data, and set up privileges for the Snowflake user to create objects in that database by granting the USAGE and CREATE SCHEMA privileges.
-
Create a pair of secure keys (public and private). Store the private key for the user in a file to supply to the connector’s configuration. Assign the public key to the Snowflake service user:
For more information, see pair of keys.
-
Designate a warehouse for the connector to use. Start with the
XSMALLwarehouse size, then experiment with size depending on the amount of tables being replicated, and the amount of data transferred. Large table numbers typically scale better with multi-cluster warehouses, rather than the warehouse size.
-
Install the connector¶
To install the connector, do the following as a data engineer:
-
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
-
On the Openflow connectors page, find the connector and select Install.
-
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Install.
Note
Before you install the connector, ensure that you have created a database and schema in Snowflake for the connector to store ingested data.
-
Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.
-
Authenticate to the runtime with your Snowflake account credentials.
The Openflow canvas appears with the connector process group added to it.
Runtime sizing¶
The runtime size determines the CPU and memory available to the connector. The available sizes are Small, Medium, and Large. The connector requires Medium or Large. Choose the size when you create the runtime: you can’t change the size of an existing runtime in place.
Choose Large if you expect high replication throughput or if source tables contain wide rows.
Resize a runtime¶
Runtime size is fixed at creation, so to change size you run the connector on a different runtime. You have two options depending on whether you want to preserve the current replication progress.
If you don’t need to keep the progress of the current connector, the simplest path is to create a new runtime at the size you need and install a new connector instance on it. The new connector starts from scratch: it snapshots all configured tables and then captures ongoing changes from that point. The replication progress of the existing connector is discarded.
To keep the progress of the current connector, for example to avoid re-snapshotting tables that took a long time to snapshot initially, migrate the connector to the new runtime. This reuses the existing destination tables and resumes incremental replication from where it left off.
For migration instructions, see Reinstall the connector.
Configure the connector¶
To configure the connector, do the following as a data engineer:
-
Right-click on the imported process group and select Parameters.
-
Populate the required parameter values.
For more information on the required parameter values, see the following sections:
- MySQL Source Parameters: Used to establish a connection with MySQL.
- MySQL Destination Parameters: Used to establish a connection with Snowflake.
- MySQL Ingestion Parameters: Used to specify the tables to replicate.
Start with setting the parameters of the MySQL Source Parameters context, then the MySQL Destination Parameters context. After this is done, you can enable the connector. The connector should connect to both MySQL and Snowflake and start running. However, the connector doesn’t replicate any data until any tables to be replicated are explicitly added to its configuration.
To configure specific tables for replication, edit the MySQL Ingestion Parameters context. After you apply the changes to the Replication Parameters context, the configuration is picked up by the connector, and the replication lifecycle starts for every table.
MySQL Source Parameters¶
| Parameter | Description |
|---|---|
| MySQL Connection URL | The full JDBC URL to the source database. The connector uses the MariaDB driver, which is compatible with MySQL and requires the Examples:
|
| MySQL JDBC Driver | The absolute path to the MariaDB JDBC driver jar (https://mariadb.com/downloads/connectors/connectors-data-access/java8-connector/). The connector uses the MariaDB driver, which is compatible with MySQL. Select the Reference asset checkbox to upload the MariaDB JDBC driver. Example: |
| MySQL Username | The username for the connector. |
| MySQL Password | The password for the connector. |
MySQL Destination Parameters¶
| Parameter | Description | Required |
|---|---|---|
| Destination Database | The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. | Yes |
| Destination Schema Pattern | A pattern for the names of destination schemas where data is persisted. The connector creates the schemas if they don’t exist. You can customize the pattern per ingested table using these optional variables:
For example, for a table To ingest all tables into a single schema, provide a schema name without any variables,
like Important Don’t change this setting after the connector has begun ingesting data. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance. | Yes |
| Snowflake Authentication Strategy | When using:
| Yes |
| Snowflake Account Identifier | When using:
| Yes |
| Snowflake Connection Strategy | When using KEY_PAIR, specify the strategy for connecting to Snowflake:
| Required for BYOC with KEY_PAIR only, otherwise ignored. |
| Snowflake Private Key | When using:
| No |
| Snowflake Private Key File | When using:
| No |
| Snowflake Private Key Password | When using
| No |
| Snowflake Role | When using
| Yes |
| Snowflake Username | When using
| Yes |
| Oversized Value Strategy | Determines how the connector handles values that exceed its internal size limits (16 MB) during replication. Possible values are:
| No |
| Snowflake Warehouse | Snowflake warehouse used to run queries. | Yes |
MySQL Ingestion Parameters¶
| Parameter | Description |
|---|---|
| Included Table Names | A comma-separated list of table paths, including their schemas. Example: public.my_table, other_schema.other_table |
| Included Table Regex | A regular expression to match against table paths. Every path matching the expression will be replicated, and new tables matching the pattern that get created later will also be included automatically. Example: public\.auto_.* |
| Column Filter JSON | Optional. A JSON array of filter objects specifying which columns to include or exclude per table. For syntax details and examples, see Replicate a subset of columns in a table. |
| Table Key Configuration Service | Optional. A For more information, see Specify a logical key for a table. |
| Merge Task Schedule CRON | CRON expression defining periods when merge operations from Journal to Destination Table will be triggered. Set it to * * * * * ? if you want to have continuous merge or time schedule to limit warehouse run time. For example, the string * 0 * * * ? indicates that you want to schedule merges at full hour for one minute. The string * 20 14 ? * MON-FRI indicates that you want to schedule merges at 2:20 PM every Monday through Friday. For more information and examples, see the CronTrigger tutorial (https://www.quartz-scheduler.org/documentation/quartz-2.2.2/tutorials/tutorial-lesson-06.html). |
| Object Identifier Resolution | Specifies how source object identifiers such as the names of schemas, tables, and columns are stored and queried in Snowflake. This setting specifies that you must use double quotes in SQL queries. Option 1: Default, case-sensitive. For backwards compatibility.
Note Snowflake recommends using this option if you must preserve source casing for legacy or compatibility reasons.
For example, if the source database includes table names that differ in case only–such as
Note Snowflake recommends using this option if database objects are not expected to have mixed case names. Important Do not change this setting after the connector has begun ingesting data. Changing this setting after ingestion has begun breaks the existing ingestion. If you must change this setting, create a new connector instance. |
| Concurrent Snapshot Queries | Maximum number of concurrent queries to the source database to run in the Snapshot flow. Increasing this can speed up snapshotting large numbers of tables, but will also increase the load on the source database. |
Replicate a subset of columns in a table¶
The connector can filter the data replicated per table to a subset of configured columns. Primary key columns are always included regardless of exclusions.
To apply column filters, set the Column Filter JSON parameter in the Ingestion Parameters context to a JSON array of filter objects, one per table you want to filter.
Columns can be included or excluded by name or by regular expression pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.
Syntax¶
Each object in the array identifies a table and specifies which columns to include or exclude.
The following rules apply:
- Use
schemaandtablefor exact name matching, orschemaPatternandtablePatternfor regex matching. You can’t use both a field and its pattern variant in the same object (for example,schemaandschemaPatterncan’t both appear). - At least one of
included,excluded,includedPattern, orexcludedPatternmust be provided. - When both included and excluded filters are specified, exclusions take precedence.
- When multiple filters match the same table, the last matching filter is used, with exact matches taking precedence over pattern-based filters.
- The value can be an array of objects to apply different filters to different tables.
Examples¶
Include specific columns by name:
Exclude specific columns by name:
Combine an include pattern with a specific exclusion (for example, include all email columns except admin_email):
Mix a schema pattern with an exact table name to apply a filter across schemas:
Pass multiple filter objects to apply different rules to different tables:
Including and excluding the same column¶
Removing a column from a table’s replicated set (by excluding it or by removing
it from the included list) has the same effect on the destination as dropping
the column at the source: the connector soft-deletes the column on the
destination by renaming it with a suffix (by default, __SNOWFLAKE_DELETED).
If you then add the column back to the replicated set and later remove it a
second time, replication for the affected table fails because the soft-deleted
column name is already taken. To recover, restart replication for the affected
table.
Track data changes in tables¶
The connector replicates not only the current state of data from the source tables, but also every state of every row from every changeset. This data is stored in journal tables created in the same schema as the destination table.
The journal table names are formatted as: <source_table_name>_JOURNAL_<timestamp>_<schema_generation>
where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema_generation> is an integer increasing with every schema change on the source table.
As a result, source tables that undergo schema changes will have multiple journal tables.
When a table is removed from replication, then added back, the <timestamp> value will change, and <schema_generation> will start again from 1.
Important
Snowflake recommends that you don’t alter the structure of journal tables in any way. They are used by the connector to update the destination table as part of the replication process.
The connector never drops journal tables, but does make use of the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:
- Truncate all journal tables at any time.
- Drop the journal tables related to source tables that were removed from replication.
- Drop all but the latest generation journal tables for actively replicated tables.
For example, if your connector is set to actively replicate source table orders, and you have earlier removed table customers from replication, you may have the following journal tables. In this case you can drop all of them except orders_5678_2.
Configure scheduling of merge tasks¶
The connector uses a warehouse to merge change data capture (CDC) data into destination tables. This operation is triggered by the MergeSnowflakeJournalTable processor. If there are no new changes or if no new flow files are waiting in the MergeSnowflakeJournalTable queue, no merge is triggered and the warehouse auto-suspends.
To limit the warehouse cost and limit merges to only scheduled time, use the CRON expression in the Merge task Schedule CRON parameter. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy (https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy).
Specify a logical key for a table¶
The connector requires a replication key for every table it replicates. By default, the connector uses the table’s primary key. A logical key is a user-declared replacement for the auto-detected key. Configure a logical key when:
- A table has no primary key, but one or more columns are unique in the data.
- A specific column or set of columns should be used as the replication key, regardless of what the connector would auto-detect (for example, to override a synthetic primary key).
A logical key takes the highest priority. When the connector finds a logical key for a table, it uses that key and ignores any primary key on the table.
JSON syntax¶
The Table Key Configuration JSON value is a JSON array. Each entry maps one table to its logical key columns:
The fields are:
| Field | Description |
|---|---|
schema | Required. The exact source schema name. |
table | Required. The exact source table name. |
logicalKey | Required. A non-empty array of source column names that uniquely identify rows in the table. |
The following rules apply:
schema,table, andlogicalKeycolumn matching is case-sensitive. Use the exact names as reported by MySQL.- An entry whose
schemaandtabledon’t match any replicated table is silently ignored.
Logical key configuration examples¶
A single-column logical key on a table without a primary key:
A composite logical key:
Logical keys for several tables in one JSON value:
Restrictions¶
The connector rejects the configuration when any of the following is true:
logicalKeyis missing, empty, or not an array.logicalKeycontains duplicate column names.logicalKeycontains a nullable column. Logical key columns must be defined asNOT NULLto reliably identify rows.logicalKeycontains a column name that doesn’t exist in the source table.
When the configuration is rejected, the connector either fails to enable the
controller service (for structural issues detected at enablement time) or holds the
table in the NEW state (for issues detected when the table is initialized). After
you fix the configuration, replication for the table resumes without resetting state.
Warnings logged for risky configurations¶
The connector accepts the following configurations but logs a warning at table initialization.
When choosing logical-key columns, prefer columns with high cardinality and, where possible, monotonically increasing values. Low-cardinality or non-monotonic keys can degrade snapshot performance.
- A logical-key column is a large-object type (
blob,tinyblob,mediumblob,longblob,text,tinytext,mediumtext,longtext). Using large objects as keys severely degrades MERGE performance. - A logical-key column is a floating-point type (
float,double). Floating-point comparisons can produce inconsistent results because of precision differences. - A logical-key column is a semi-structured type (
json). Semi-structured values may produce non-deterministic equality comparisons. - The composite logical key includes more than five columns. Long composite keys often indicate a design issue and might degrade MERGE performance.
- The logical key overrides an existing primary key on the table. Verify that the replacement key is intentional: the connector no longer uses the primary key for MERGE operations.
If you observe data divergence after any of these warnings, run a periodic full reload to reconcile the destination with the source.
Schema changes that affect a logical key¶
Logical keys reference column names. The connector doesn’t follow renames or drops of those columns:
- If a logical-key column is dropped on the source, replication for the affected table
fails. The table is marked
FAILED. For more information, see Restart table replication. - If a logical-key column is renamed on the source, the configuration still references the old name and replication fails. Update the JSON to use the new name and restart table replication.
Run the flow¶
- Right-click on the plane and select Enable all Controller Services.
- Right-click on the imported process group and select Start. The connector starts the data ingestion.