Snowflake High Performance connector for Kafka: Configure Kafka¶
This topic describes the steps to install and configure Kafka for Snowflake High Performance connector for Kafka.
Installing the Kafka connector¶
The Kafka connector is provided as a JAR (Java executable) file.
Snowflake provides two versions of the connector:
A version for the Confluent implementation of Kafka Connect.
A version for the open source software (OSS) Apache Kafka package https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/ (https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/).
Both versions of the connector are available in the Snowflake Private Preview and must be obtained from Snowflake. Reach out to your Snowflake account team to obtain the connector JAR file.
If you are not sure which version to use, see Choosing a connector version. Configuring the Kafka connector ==============================================================================
The configuration of the connector is vendor specific. Some implementations, such as Amazon MSK Connect, have a UI for configuring the connector and accept the configuration in a JSON as well as a properties file format.
This section is a general reference for the parameters names and values for the connector. Keep in mind that different cloud vendors may have slightly different configuration requirements.
Important
The Kafka Connect framework broadcasts the configuration settings for the Kafka connector from the master node to worker nodes. The configuration settings include sensitive information (specifically, the Snowflake username and private key). Make sure to secure the communication channel between Kafka Connect nodes. For instructions, see the documentation for your Apache Kafka software.
Each configuration file specifies the topics and corresponding tables for one database and one schema in that database. Note that a connector can ingest messages from any number of topics, but the corresponding tables must all be located in a single database and schema.
For descriptions of the configuration fields, see Connector configuration properties.
Important
Because the configuration file typically contains security related information, such as the private key, set read/write privileges appropriately on the file to limit access.
In addition, consider storing the configuration file in a secure external location or a key management service.
Example configuration json file
{
"name":"XYZCompanySensorData",
"config":{
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"tasks.max": "1",
"snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
"snowflake.url.name": "myorganization-myaccount.snowflakecomputing.cn:443",
"snowflake.warehouse.name": "WH",
"snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
"snowflake.schema.name": "MY_SCHEMA",
"snowflake.database.name": "MY_DATABASE",
"snowflake.role.name": "MY_ROLE",
"snowflake.user.name": "MY_USER",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"topics": "topic1,topic2",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all"
}
}
Example configuration properties file
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.cn:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=all
Connector configuration properties¶
Required properties¶
nameApplication name. This must be unique across all Kafka connectors used by the customer. This name must be a valid Snowflake unquoted identifier. For information about valid identifiers, see Identifier requirements.
connector.classcom.snowflake.kafka.connector.SnowflakeStreamingSinkConnectortopicsComma-separated list of topics. By default, Snowflake assumes that the table name is the same as the topic name. If the table name is not the same as the topic name, then use the optional
topic2table.mapparameter (below) to specify the mapping from topic name to table name. This table name must be a valid Snowflake unquoted identifier. For information about valid table names, see Identifier requirements.Note
Either
topicsortopics.regexis required; not both.topics.regexThis is a regular expression (“regex”) that specifies the topics that contain the messages to load into Snowflake tables. The connector loads data from any topic name that matches the regex. The regex must follow the rules for Java regular expressions (i.e. be compatible with java.util.regex.Pattern). The configuration file should contain either
topicsortopics.regex, not both.snowflake.url.nameThe URL for accessing your Snowflake account. This URL must include your account identifier. Note that the protocol (
https://) and port number are optional.snowflake.user.nameUser login name for the Snowflake account.
snowflake.role.nameThe name of the role that the connector will use to insert data into the table.
snowflake.private.keyThe private key to authenticate the user. Include only the key, not the header or footer. If the key is split across multiple lines, remove the line breaks. You can provide an unencrypted key, or you can provide an encrypted key and provide the
snowflake.private.key.passphraseparameter to enable Snowflake to decrypt the key. Use this parameter if and only if thesnowflake.private.keyparameter value is encrypted. This decrypts private keys that were encrypted according to the instructions in Key-pair authentication and key-pair rotation.Note
Also see
snowflake.private.key.passphrasein Optional properties.snowflake.database.nameThe name of the database that contains the table to insert rows into.
snowflake.schema.nameThe name of the schema that contains the table to insert rows into.
header.converterRequired only if the records are formatted in Avro and include a header. The value is
"org.apache.kafka.connect.storage.StringConverter".key.converterThis is the Kafka record’s key converter (e.g.
"org.apache.kafka.connect.storage.StringConverter"). This is not used by the Kafka connector, but is required by the Kafka Connect Platform.See Kafka connector limitations for current limitations.
value.converterThe connector supports standard Kafka community converters. Choose the appropriate converter based on your data format:
For JSON records:
"org.apache.kafka.connect.json.JsonConverter"For Avro records with Schema Registry:
"io.confluent.connect.avro.AvroConverter"
See Kafka connector limitations for current limitations.
Optional properties¶
snowflake.private.key.passphraseIf the value of this parameter is not empty, the connector uses this phrase to try to decrypt the private key.
tasks.maxNumber of tasks, usually the same as the number of CPU cores across the worker nodes in the Kafka Connect cluster. To achieve best performance, Snowflake recommends setting the number of tasks equal to the total number of Kafka partitions, but not exceeding the number of CPU cores. High number of tasks may result in an increased memory consumption and frequent rebalances.
snowflake.topic2table.mapThis optional parameter lets a user specify which topics should be mapped to which tables. Each topic and its table name should be separated by a colon (see example below). This table name must be a valid Snowflake unquoted identifier. For information about valid table names, see Identifier requirements. The topic configuration allows use of regular expressions to define topics, just as the use of
topics.regexdoes. The regular expressions cannot be ambiguous — any matched topic must match only a single target table.Example:
topics="topic1,topic2,topic5,topic6" snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
could be written as:
topics.regex="topic[0-9]" snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
value.converter.schema.registry.urlIf the format is Avro and you are using a Schema Registry Service, this should be the URL of the Schema Registry Service. Otherwise this field should be empty.
value.converter.break.on.schema.registry.errorIf loading Avro data from the Schema Registry Service, this property determines if the Kafka connector should stop consuming records if it encounters an error while fetching the schema id. The default value is
false. Set the value totrueto enable this behavior.jvm.proxy.hostTo enable the Snowflake Kafka Connector to access Snowflake through a proxy server, set this parameter to specify the host of that proxy server.
jvm.proxy.portTo enable the Snowflake Kafka Connector to access Snowflake through a proxy server, set this parameter to specify the port of that proxy server.
snowflake.streaming.max.client.lagSpecifies how often Snowflake Ingest Java (https://to.be.determined/) flushes the data to Snowflake, in seconds.
- Values:
Minimum:
1secondMaximum:
600seconds
- Default:
1second
jvm.proxy.usernameUsername that authenticates with the proxy server.
jvm.proxy.passwordPassword for the username that authenticates with the proxy server.
snowflake.jdbc.mapExample:
"snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"Additional JDBC properties (see JDBC Driver connection parameter reference) are not validated. These additional properties are not validated, and must not override nor be used instead of required properties such as:
jvm.proxy.xxx,snowflake.user.name,snowflake.private.key,snowflake.schema.nameetc.- Specifying either of the following combinations:
tracingproperty along withJDBC_TRACEenv variabledatabaseproperty along withsnowflake.database.name
Will result in an ambiguous behavior and the behavior will be determined by the JDBC Driver.
value.converter.basic.auth.credentials.sourceIf you are using the Avro data format and require secure access to the Kafka schema registry, set this parameter to the string “USER_INFO”, and set the
value.converter.basic.auth.user.infoparameter described below. Otherwise, omit this parameter.value.converter.basic.auth.user.infoIf you are using the Avro data format and require secure access to the Kafka schema registry, set this parameter to the string “<user_ID>:<password>”, and set the value.converter.basic.auth.credentials.source parameter described above. Otherwise, omit this parameter.
snowflake.metadata.createtimeIf value is set to FALSE, the
CreateTimeproperty value is omitted from the metadata in the RECORD_METADATA column. The default value is TRUE.snowflake.metadata.topicIf value is set to FALSE, the
topicproperty value is omitted from the metadata in the RECORD_METADATA column. The default value is TRUE.snowflake.metadata.offset.and.partitionIf value is set to FALSE, the
OffsetandPartitionproperty values are omitted from the metadata in the RECORD_METADATA column. The default value is TRUE.snowflake.metadata.allIf value is set to FALSE, the metadata in the RECORD_METADATA column is completely empty. The default value is TRUE.
transformsSpecify to skip tombstone records encountered by the Kafka connector and not load them into the target table. A tombstone record is defined as a record where the entire value field is null.
Set the property value to
"tombstoneHandlerExample".Note
Use this property with the Kafka community converters (i.e.
value.converterproperty value) only (e.g.org.apache.kafka.connect.json.JsonConverterororg.apache.kafka.connect.json.AvroConverter). To manage tombstone record handling with the Snowflake converters, use thebehavior.on.null.valuesproperty instead.transforms.tombstoneHandlerExample.typeRequired when setting the
transformsproperty.Set the property value to
"io.confluent.connect.transforms.TombstoneHandler"behavior.on.null.valuesSpecify how the Kafka connector should handle tombstone records. A tombstone record is defined as a record where the entire value field is null. For Snowpipe, this property is supported by the Kafka connector version 1.5.5 and later. For Snowpipe Streaming, this property is supported by the Kafka connector version 2.1.0 and later.
This property supports the following values:
DEFAULTWhen the Kafka connector encounters a tombstone record, it inserts an empty JSON string in the content column.
IGNOREThe Kafka connector skips tombstone records and does not insert rows for these records.
The default value is
DEFAULT.Note
Tombstone records ingestion varies by the ingestion methods:
For Snowpipe, the Kafka connector uses Snowflake converters only. To manage tombstone record handling with the Kafka community converters, use the
transformandtransforms.tombstoneHandlerExample.typeproperties instead.For Snowpipe Streaming, the Kafka connector uses community converters only.
Records sent to Kafka brokers must not be NULL because these records will be dropped by the Kafka connector resulting in missing offsets. The missing offsets will break the Kafka connector in specific use cases. It is recommended that you use tombstone records instead of NULL records.