Set up the Openflow Connector for Kafka

Note

The connector is subject to the Connector Terms.

Prerequisites

  1. Ensure that you have reviewed Openflow Connector for Kafka.

  2. Ensure that you have set up the Openflow.

Connector types

The Openflow Connector for Kafka is available in three different configurations, each optimized for specific use cases. You can download these connector definitions from the connectors gallery:

Apache Kafka for JSON data format

Simplified connector for JSON message ingestion with schema evolution and topic-to-table mapping

Apache Kafka for AVRO data format

Simplified connector for AVRO message ingestion with schema evolution and topic-to-table mapping

Apache Kafka with DLQ and metadata

Full-featured connector with dead letter queue (DLQ) support, metadata handling, and feature parity with the legacy Snowflake connector for Kafka

For detailed configuration of specific connector types, see:

Set up Snowflake account

As a Snowflake account administrator, perform the following tasks:

  1. Create a new role or use an existing role and grant the Database privileges.

  2. Create a new Snowflake service user with the type as SERVICE.

  3. Grant the Snowflake service user the role you created in the previous steps.

  4. Configure with key-pair auth for the Snowflake SERVICE user from step 2.

  5. Snowflake strongly recommends this step. Configure a secrets manager supported by Openflow, for example, AWS, Azure, and Hashicorp, and store the public and private keys in the secret store.

    Note

    If for any reason, you do not wish to use a secrets manager, then you are responsible for safeguarding the public key and private key files used for key-pair authentication according to the security policies of your organization.

    1. Once the secrets manager is configured, determine how you will authenticate to it. On AWS, it’s recommended that you the EC2 instance role associated with Openflow as this way no other secrets have to be persisted.

    2. In Openflow, configure a Parameter Provider associated with this Secrets Manager, from the hamburger menu in the upper right. Navigate to Controller Settings » Parameter Provider and then fetch your parameter values.

    3. At this point all credentials can be referenced with the associated parameter paths and no sensitive values need to be persisted within Openflow.

  6. If any other Snowflake users require access to the raw ingested documents and tables ingested by the connector (for example, for custom processing in Snowflake), then grant those users the role created in step 1.

  7. Designate a warehouse for the connector to use. Start with the smallest warehouse size, then experiment with size depending on the number of tables being replicated, and the amount of data transferred. Large table numbers typically scale better with multi-cluster warehouses, rather than larger warehouse sizes.

Set up the connector

As a data engineer, perform the following tasks to install and configure the connector:

Install the connector

  1. Navigate to the Openflow Overview page. In the Featured connectors section, select View more connectors.

  2. On the Openflow connectors page, find the connector and select Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list.

  4. Select Add.

    Note

    Before you install the connector, ensure that you have created a database and schema in Snowflake for the connector to store ingested data.

  5. Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.

  6. Authenticate to the runtime with your Snowflake account credentials.

The Openflow canvas appears with the connector process group added to it.

Configure the connector

  1. Populate the process group parameters

    1. Right click on the imported process group and select Parameters.

    2. Fill out the required parameter values as described in Common parameters.

Common parameters

All Kafka connector variants share common parameter contexts for basic connectivity and authentication.

Snowflake destination parameters

Parameter

Description

Required

Destination Database

The database where data will be persisted. It must already exist in Snowflake

Yes

Destination Schema

The schema where data will be persisted. It must already exist in Snowflake

Yes

Snowflake Account Identifier

Snowflake account name formatted as [organization-name]-[account-name] where data will be persisted

Yes

Snowflake Authentication Strategy

Strategy of authentication to Snowflake. Possible values: SNOWFLAKE_SESSION_TOKEN - when we are running flow on SPCS, KEY_PAIR when we want to setup access using private key

Yes

Snowflake Private Key

The RSA private key used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined

No

Snowflake Private Key File

The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with -----BEGIN PRIVATE. Select the Reference asset checkbox to upload the private key file.

No

Snowflake Private Key Password

The password associated with the Snowflake Private Key File

No

Snowflake Role

Snowflake Role used during query execution

Yes

Snowflake Username

User name used to connect to Snowflake instance

Yes

Snowflake Warehouse

Snowflake warehouse used to run queries

Yes

Kafka source parameters (SASL authentication)

Parameter

Description

Required

Kafka Security Protocol

Security protocol used to communicate with brokers. Corresponds to Kafka Client security.protocol property. One of: SASL_PLAINTEXT / SASL_SSL

Yes

Kafka SASL Mechanism

SASL mechanism used for authentication. Corresponds to Kafka Client sasl.mechanism property. One of: PLAIN / SCRAM-SHA-256 / SCRAM-SHA-512

Yes

Kafka SASL Username

The username to authenticate to Kafka

Yes

Kafka SASL Password

The password to authenticate to Kafka

Yes

Kafka Bootstrap Servers

A comma-separated list of Kafka broker to fetch data from, should contain port, for example kafka-broker:9092. The same instance is used for the DLQ topic.

Yes

Kafka ingestion parameters

Parameter

Description

Required

Kafka Topic Format

One of: names / pattern. Specifies whether the “Kafka Topics” provided are a comma separated list of names or a single regular expression.

Yes

Kafka Topics

A comma-separated list of Kafka topics or a regular expression.

Yes

Kafka Group Id

The ID of a consumer group used by the connector. Can be arbitrary but must be unique.

Yes

Kafka Auto Offset Reset

Automatic offset configuration applied when no previous consumer offset is found corresponding to Kafka auto.offset.reset property. One of: earliest / latest. Default: latest

Yes

Topic To Table Map

This optional parameter allows user to specify which topics should be mapped to which tables. Each topic and its table name should be separated by a colon (see example below). This table name must be a valid Snowflake unquoted identifier. The regular expressions cannot be ambiguous — any matched topic must match only a single target table. If empty or no matches found, topic name will be used as table name. Note: The mapping cannot contain spaces after commas.

No

Topic To Table Map example values:

  • topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range

  • topic[0-4]:low_range,topic[5-9]:high_range

  • .*:destination_table - maps all topics to the destination_table

Configure variant-specific settings

After configuring the common parameters, you need to configure settings specific to your chosen connector variant:

For Apache Kafka for JSON data format and Apache Kafka for AVRO data format connectors:

See Apache Kafka for JSON/AVRO data format for JSON/AVRO-specific parameters.

For Apache Kafka with DLQ and metadata connector:

See Apache Kafka with DLQ and metadata for advanced parameters including DLQ configuration, schematization settings, Iceberg table support, and message format options.

Authentication

All connector variants support SASL authentication configured through parameter contexts as described in Kafka source parameters (SASL authentication).

For other authentication methods including mTLS and AWS MSK IAM, see Configure other authentication methods for Openflow Connector for Kafka.

Run the flow

  1. Right click on the plane and click Enable all Controller Services.

  2. Right click on the plane and click Start. The connector starts data ingestion.

Language: English