Set up the Openflow Connector for Kafka¶
Note
The connector is subject to the Connector Terms.
Prerequisites¶
Ensure that you have reviewed Openflow Connector for Kafka.
Ensure that you have set up the Openflow.
Connector types¶
The Openflow Connector for Kafka is available in three different configurations, each optimized for specific use cases. You can download these connector definitions from the connectors gallery:
- Apache Kafka for JSON data format
Simplified connector for JSON message ingestion with schema evolution and topic-to-table mapping
- Apache Kafka for AVRO data format
Simplified connector for AVRO message ingestion with schema evolution and topic-to-table mapping
- Apache Kafka with DLQ and metadata
Full-featured connector with dead letter queue (DLQ) support, metadata handling, and feature parity with the legacy Snowflake connector for Kafka
For detailed configuration of specific connector types, see:
Apache Kafka for JSON/AVRO data format - JSON/AVRO data format connectors
Apache Kafka with DLQ and metadata - DLQ and metadata connector
Set up Snowflake account¶
As a Snowflake account administrator, perform the following tasks:
Create a new role or use an existing role and grant the Database privileges.
Create a new Snowflake service user with the type as SERVICE.
Grant the Snowflake service user the role you created in the previous steps.
Configure with key-pair auth for the Snowflake SERVICE user from step 2.
Snowflake strongly recommends this step. Configure a secrets manager supported by Openflow, for example, AWS, Azure, and Hashicorp, and store the public and private keys in the secret store.
Note
If for any reason, you do not wish to use a secrets manager, then you are responsible for safeguarding the public key and private key files used for key-pair authentication according to the security policies of your organization.
Once the secrets manager is configured, determine how you will authenticate to it. On AWS, it’s recommended that you the EC2 instance role associated with Openflow as this way no other secrets have to be persisted.
In Openflow, configure a Parameter Provider associated with this Secrets Manager, from the hamburger menu in the upper right. Navigate to Controller Settings » Parameter Provider and then fetch your parameter values.
At this point all credentials can be referenced with the associated parameter paths and no sensitive values need to be persisted within Openflow.
If any other Snowflake users require access to the raw ingested documents and tables ingested by the connector (for example, for custom processing in Snowflake), then grant those users the role created in step 1.
Designate a warehouse for the connector to use. Start with the smallest warehouse size, then experiment with size depending on the number of tables being replicated, and the amount of data transferred. Large table numbers typically scale better with multi-cluster warehouses, rather than larger warehouse sizes.
Set up the connector¶
As a data engineer, perform the following tasks to install and configure the connector:
Install the connector¶
Navigate to the Openflow Overview page. In the Featured connectors section, select View more connectors.
On the Openflow connectors page, find the connector and select Add to runtime.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list.
Select Add.
Note
Before you install the connector, ensure that you have created a database and schema in Snowflake for the connector to store ingested data.
Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.
Authenticate to the runtime with your Snowflake account credentials.
The Openflow canvas appears with the connector process group added to it.
Configure the connector¶
Populate the process group parameters
Right click on the imported process group and select Parameters.
Fill out the required parameter values as described in Common parameters.
Common parameters¶
All Kafka connector variants share common parameter contexts for basic connectivity and authentication.
Snowflake destination parameters¶
Parameter |
Description |
Required |
---|---|---|
Destination Database |
The database where data will be persisted. It must already exist in Snowflake |
Yes |
Destination Schema |
The schema where data will be persisted. It must already exist in Snowflake |
Yes |
Snowflake Account Identifier |
Snowflake account name formatted as [organization-name]-[account-name] where data will be persisted |
Yes |
Snowflake Authentication Strategy |
Strategy of authentication to Snowflake. Possible values: SNOWFLAKE_SESSION_TOKEN - when we are running flow on SPCS, KEY_PAIR when we want to setup access using private key |
Yes |
Snowflake Private Key |
The RSA private key used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined |
No |
Snowflake Private Key File |
The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with |
No |
Snowflake Private Key Password |
The password associated with the Snowflake Private Key File |
No |
Snowflake Role |
Snowflake Role used during query execution |
Yes |
Snowflake Username |
User name used to connect to Snowflake instance |
Yes |
Snowflake Warehouse |
Snowflake warehouse used to run queries |
Yes |
Kafka source parameters (SASL authentication)¶
Parameter |
Description |
Required |
---|---|---|
Kafka Security Protocol |
Security protocol used to communicate with brokers. Corresponds to Kafka Client security.protocol property. One of: SASL_PLAINTEXT / SASL_SSL |
Yes |
Kafka SASL Mechanism |
SASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property. One of: PLAIN / SCRAM-SHA-256 / SCRAM-SHA-512 |
Yes |
Kafka SASL Username |
The username to authenticate to Kafka |
Yes |
Kafka SASL Password |
The password to authenticate to Kafka |
Yes |
Kafka Bootstrap Servers |
A comma-separated list of Kafka broker to fetch data from, should contain port, for example kafka-broker:9092. The same instance is used for the DLQ topic. |
Yes |
Kafka ingestion parameters¶
Parameter |
Description |
Required |
---|---|---|
Kafka Topic Format |
One of: names / pattern. Specifies whether the “Kafka Topics” provided are a comma separated list of names or a single regular expression. |
Yes |
Kafka Topics |
A comma-separated list of Kafka topics or a regular expression. |
Yes |
Kafka Group Id |
The ID of a consumer group used by the connector. Can be arbitrary but must be unique. |
Yes |
Kafka Auto Offset Reset |
Automatic offset configuration applied when no previous consumer offset is found corresponding to Kafka |
Yes |
Topic To Table Map |
This optional parameter allows user to specify which topics should be mapped to which tables. Each topic and its table name should be separated by a colon (see example below). This table name must be a valid Snowflake unquoted identifier. The regular expressions cannot be ambiguous — any matched topic must match only a single target table. If empty or no matches found, topic name will be used as table name. Note: The mapping cannot contain spaces after commas. |
No |
Topic To Table Map
example values:
topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range
topic[0-4]:low_range,topic[5-9]:high_range
.*:destination_table
- maps all topics to the destination_table
Configure variant-specific settings¶
After configuring the common parameters, you need to configure settings specific to your chosen connector variant:
- For Apache Kafka for JSON data format and Apache Kafka for AVRO data format connectors:
See Apache Kafka for JSON/AVRO data format for JSON/AVRO-specific parameters.
- For Apache Kafka with DLQ and metadata connector:
See Apache Kafka with DLQ and metadata for advanced parameters including DLQ configuration, schematization settings, Iceberg table support, and message format options.
Authentication¶
All connector variants support SASL authentication configured through parameter contexts as described in Kafka source parameters (SASL authentication).
For other authentication methods including mTLS and AWS MSK IAM, see Configure other authentication methods for Openflow Connector for Kafka.
Run the flow¶
Right click on the plane and click Enable all Controller Services.
Right click on the plane and click Start. The connector starts data ingestion.