Snowflake High Performance connector for Kafka: Configure Kafka

This topic describes the steps to install and configure Kafka for Snowflake High Performance connector for Kafka.

Installing the Kafka connector

The Kafka connector is provided as a JAR (Java executable) file.

Snowflake provides two versions of the connector:

  • A version for the Confluent implementation of Kafka Connect.

  • A version for the open source software (OSS) Apache Kafka package https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/ (https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/).

Both versions of the connector are available in the Snowflake Private Preview and must be obtained from Snowflake. Reach out to your Snowflake account team to obtain the connector JAR file.

If you are not sure which version to use, see Choosing a connector version. Configuring the Kafka connector ==============================================================================

The configuration of the connector is vendor specific. Some implementations, such as Amazon MSK Connect, have a UI for configuring the connector and accept the configuration in a JSON as well as a properties file format.

This section is a general reference for the parameters names and values for the connector. Keep in mind that different cloud vendors may have slightly different configuration requirements.

Important

The Kafka Connect framework broadcasts the configuration settings for the Kafka connector from the master node to worker nodes. The configuration settings include sensitive information (specifically, the Snowflake username and private key). Make sure to secure the communication channel between Kafka Connect nodes. For instructions, see the documentation for your Apache Kafka software.

Each configuration file specifies the topics and corresponding tables for one database and one schema in that database. Note that a connector can ingest messages from any number of topics, but the corresponding tables must all be located in a single database and schema.

For descriptions of the configuration fields, see Connector configuration properties.

Important

Because the configuration file typically contains security related information, such as the private key, set read/write privileges appropriately on the file to limit access.

In addition, consider storing the configuration file in a secure external location or a key management service.

Example configuration json file

{
  "name":"XYZCompanySensorData",
  "config":{
      "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
      "tasks.max": "1",
      "snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
      "snowflake.url.name": "myorganization-myaccount.snowflakecomputing.cn:443",
      "snowflake.warehouse.name": "WH",
      "snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
      "snowflake.schema.name": "MY_SCHEMA",
      "snowflake.database.name": "MY_DATABASE",
      "snowflake.role.name": "MY_ROLE",
      "snowflake.user.name": "MY_USER",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "errors.log.enable": "true",
      "topics": "topic1,topic2",
      "value.converter.schemas.enable": "false",
      "errors.tolerance": "all"
      }
}
Copy

Example configuration properties file

connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.cn:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=all
Copy

Connector configuration properties

Required properties

name

Application name. This must be unique across all Kafka connectors used by the customer. This name must be a valid Snowflake unquoted identifier. For information about valid identifiers, see Identifier requirements.

connector.class

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

topics

Comma-separated list of topics. By default, Snowflake assumes that the table name is the same as the topic name. If the table name is not the same as the topic name, then use the optional topic2table.map parameter (below) to specify the mapping from topic name to table name. This table name must be a valid Snowflake unquoted identifier. For information about valid table names, see Identifier requirements.

Note

Either topics or topics.regex is required; not both.

topics.regex

This is a regular expression (“regex”) that specifies the topics that contain the messages to load into Snowflake tables. The connector loads data from any topic name that matches the regex. The regex must follow the rules for Java regular expressions (i.e. be compatible with java.util.regex.Pattern). The configuration file should contain either topics or topics.regex, not both.

snowflake.url.name

The URL for accessing your Snowflake account. This URL must include your account identifier. Note that the protocol (https://) and port number are optional.

snowflake.user.name

User login name for the Snowflake account.

snowflake.role.name

The name of the role that the connector will use to insert data into the table.

snowflake.private.key

The private key to authenticate the user. Include only the key, not the header or footer. If the key is split across multiple lines, remove the line breaks. You can provide an unencrypted key, or you can provide an encrypted key and provide the snowflake.private.key.passphrase parameter to enable Snowflake to decrypt the key. Use this parameter if and only if the snowflake.private.key parameter value is encrypted. This decrypts private keys that were encrypted according to the instructions in Key-pair authentication and key-pair rotation.

Note

Also see snowflake.private.key.passphrase in Optional properties.

snowflake.database.name

The name of the database that contains the table to insert rows into.

snowflake.schema.name

The name of the schema that contains the table to insert rows into.

header.converter

Required only if the records are formatted in Avro and include a header. The value is "org.apache.kafka.connect.storage.StringConverter".

key.converter

This is the Kafka record’s key converter (e.g. "org.apache.kafka.connect.storage.StringConverter"). This is not used by the Kafka connector, but is required by the Kafka Connect Platform.

See Kafka connector limitations for current limitations.

value.converter

The connector supports standard Kafka community converters. Choose the appropriate converter based on your data format:

  • For JSON records: "org.apache.kafka.connect.json.JsonConverter"

  • For Avro records with Schema Registry: "io.confluent.connect.avro.AvroConverter"

See Kafka connector limitations for current limitations.

Optional properties

snowflake.private.key.passphrase

If the value of this parameter is not empty, the connector uses this phrase to try to decrypt the private key.

tasks.max

Number of tasks, usually the same as the number of CPU cores across the worker nodes in the Kafka Connect cluster. To achieve best performance, Snowflake recommends setting the number of tasks equal to the total number of Kafka partitions, but not exceeding the number of CPU cores. High number of tasks may result in an increased memory consumption and frequent rebalances.

snowflake.topic2table.map

This optional parameter lets a user specify which topics should be mapped to which tables. Each topic and its table name should be separated by a colon (see example below). This table name must be a valid Snowflake unquoted identifier. For information about valid table names, see Identifier requirements. The topic configuration allows use of regular expressions to define topics, just as the use of topics.regex does. The regular expressions cannot be ambiguous — any matched topic must match only a single target table.

Example:

topics="topic1,topic2,topic5,topic6"
snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
Copy

could be written as:

topics.regex="topic[0-9]"
snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
Copy
value.converter.schema.registry.url

If the format is Avro and you are using a Schema Registry Service, this should be the URL of the Schema Registry Service. Otherwise this field should be empty.

value.converter.break.on.schema.registry.error

If loading Avro data from the Schema Registry Service, this property determines if the Kafka connector should stop consuming records if it encounters an error while fetching the schema id. The default value is false. Set the value to true to enable this behavior.

jvm.proxy.host

To enable the Snowflake Kafka Connector to access Snowflake through a proxy server, set this parameter to specify the host of that proxy server.

jvm.proxy.port

To enable the Snowflake Kafka Connector to access Snowflake through a proxy server, set this parameter to specify the port of that proxy server.

snowflake.streaming.max.client.lag

Specifies how often Snowflake Ingest Java (https://to.be.determined/) flushes the data to Snowflake, in seconds.

Values:
  • Minimum: 1 second

  • Maximum: 600 seconds

Default:

1 second

jvm.proxy.username

Username that authenticates with the proxy server.

jvm.proxy.password

Password for the username that authenticates with the proxy server.

snowflake.jdbc.map

Example: "snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"

Additional JDBC properties (see JDBC Driver connection parameter reference) are not validated. These additional properties are not validated, and must not override nor be used instead of required properties such as: jvm.proxy.xxx, snowflake.user.name, snowflake.private.key, snowflake.schema.name etc.

Specifying either of the following combinations:
  • tracing property along with JDBC_TRACE env variable

  • database property along with snowflake.database.name

Will result in an ambiguous behavior and the behavior will be determined by the JDBC Driver.

value.converter.basic.auth.credentials.source

If you are using the Avro data format and require secure access to the Kafka schema registry, set this parameter to the string “USER_INFO”, and set the value.converter.basic.auth.user.info parameter described below. Otherwise, omit this parameter.

value.converter.basic.auth.user.info

If you are using the Avro data format and require secure access to the Kafka schema registry, set this parameter to the string “<user_ID>:<password>”, and set the value.converter.basic.auth.credentials.source parameter described above. Otherwise, omit this parameter.

snowflake.metadata.createtime

If value is set to FALSE, the CreateTime property value is omitted from the metadata in the RECORD_METADATA column. The default value is TRUE.

snowflake.metadata.topic

If value is set to FALSE, the topic property value is omitted from the metadata in the RECORD_METADATA column. The default value is TRUE.

snowflake.metadata.offset.and.partition

If value is set to FALSE, the Offset and Partition property values are omitted from the metadata in the RECORD_METADATA column. The default value is TRUE.

snowflake.metadata.all

If value is set to FALSE, the metadata in the RECORD_METADATA column is completely empty. The default value is TRUE.

transforms

Specify to skip tombstone records encountered by the Kafka connector and not load them into the target table. A tombstone record is defined as a record where the entire value field is null.

Set the property value to "tombstoneHandlerExample".

Note

Use this property with the Kafka community converters (i.e. value.converter property value) only (e.g. org.apache.kafka.connect.json.JsonConverter or org.apache.kafka.connect.json.AvroConverter). To manage tombstone record handling with the Snowflake converters, use the behavior.on.null.values property instead.

transforms.tombstoneHandlerExample.type

Required when setting the transforms property.

Set the property value to "io.confluent.connect.transforms.TombstoneHandler"

behavior.on.null.values

Specify how the Kafka connector should handle tombstone records. A tombstone record is defined as a record where the entire value field is null. For Snowpipe, this property is supported by the Kafka connector version 1.5.5 and later. For Snowpipe Streaming, this property is supported by the Kafka connector version 2.1.0 and later.

This property supports the following values:

DEFAULT

When the Kafka connector encounters a tombstone record, it inserts an empty JSON string in the content column.

IGNORE

The Kafka connector skips tombstone records and does not insert rows for these records.

The default value is DEFAULT.

Note

Tombstone records ingestion varies by the ingestion methods:

  • For Snowpipe, the Kafka connector uses Snowflake converters only. To manage tombstone record handling with the Kafka community converters, use the transform and transforms.tombstoneHandlerExample.type properties instead.

  • For Snowpipe Streaming, the Kafka connector uses community converters only.

Records sent to Kafka brokers must not be NULL because these records will be dropped by the Kafka connector resulting in missing offsets. The missing offsets will break the Kafka connector in specific use cases. It is recommended that you use tombstone records instead of NULL records.

Next steps

test the connector.

Language: English