How the Snowflake High Performance connector for Kafka works¶
This topic describes how various aspects of the connector, how it works with tables and pipes, and how to configure the connector.
How the connector works with tables and pipes¶
The High Performance Snowflake Connector for Kafka requires you to create destination tables manually. The connector treats each Kafka record as a row to be inserted into a Snowflake table. For example, if you have a Kafka topic with the content of the message structured like :
{
"order_id": 12345,
"customer_name": "John",
"order_total": 100.00,
"isPaid": true
}
You can create a table with columns matching the JSON keys, and rely on a default pipe named {tableName}-STREAMING which will automatically map the record content’s first-level keys to table columns matching by name (case-insensitive).
CREATE TABLE ORDERS (
record_metadata VARIANT,
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
ispaid BOOLEAN
);
If you choose to create your own pipe, you can define the data transformation logic in the pipe’s COPY INTO statement. You can rename columns as required and cast the data types as needed. For example:
CREATE TABLE ORDERS (
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::STRING,
$1:customer_name,
$1:order_total::STRING,
$1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
or
CREATE TABLE ORDERS (
topic VARCHAR,
partition VARCHAR,
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_METADATA.topic::STRING AS topic,
$1:RECORD_METADATA.partition::STRING AS partition,
$1['order_id']::STRING AS order_id,
$1['customer_name']::STRING as customer_name,
CONCAT($1['order_total']::STRING, ' USD') AS order_total,
$1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
When you define your own pipe your destination table columns do not need to match the JSON keys. You can rename the columns to your desired names and cast the data types as needed.
Topic names, table names, and pipe names¶
Depending on the configuration settings, the connector will use different names for the destination table. The destination table name is always derived from the topic name.
How the connector maps topic names to the destination table¶
The Kafka connector provides two modes for mapping Kafka topic names to Snowflake table names:
Static mapping: The connector derives destination table names using only Kafka topic name.
Explicit topic-to-table mapping mode: You specify custom mappings between topics and tables using the
snowflake.topic2table.mapconfiguration parameter
Static mapping¶
If you do not configure the snowflake.topic2table.map parameter, the connector always derives the table names from the topic name.
Table name generation:
The connector derives the destination table name from the topic name using the following rules:
If the topic name is a valid Snowflake identifier (starts with a letter or underscore, and contains only letters, digits, underscores, or dollar signs), the connector uses the topic name as the table name (converted to uppercase).
If the topic name contains invalid characters, the connector:
Replaces invalid characters with underscores
Appends an underscore followed by a hash code to ensure uniqueness
For example, the topic
my-topic.databecomesMY_TOPIC_DATA_<hash>
Pipe name determination:
The connector determines which pipe to use based on the following logic:
The connector checks if a pipe exists with the same name as the destination table name.
If a user-created pipe with that name exists, the connector uses that pipe (user-defined pipe mode).
If not, the connector uses the default pipe named
{tableName}-STREAMING
Note
Snowflake recommends choosing topic names that follow the rules for Snowflake identifier names to ensure predictable table names.
Understanding RECORD_METADATA¶
The connector populates the RECORD_METADATA structure with metadata about the Kafka record. This metadata is sent through the Snowpipe Streaming data source to Snowflake, where it becomes available in pipe transformations using the $1:RECORD_METADATA accessor. RECORD_METADATA structure is available in both user-defined pipe and default pipe modes. Its content can be saved to the column of type VARIANT, or individual fields can be extracted and saved to separate columns.
Example pipe with transformations and metadata:
CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total,
$1:RECORD_METADATA.topic AS source_topic,
$1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
$1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
In this example:
The pipe extracts specific fields from the Kafka message (order_id, customer_name, order_total)
It also captures metadata fields (topic, offset, and ingestion timestamp)
The values can be cast and/or transformed as needed
How metadata fields are populated¶
The connector automatically populates metadata fields based on the Kafka record properties and connector configuration. You can control which metadata fields are included using these configuration parameters:
snowflake.metadata.topic(default: true) - Includes the topic namesnowflake.metadata.offset.and.partition(default: true) - Includes offset and partitionsnowflake.metadata.createtime(default: true) - Includes the Kafka record timestampsnowflake.metadata.all(default: true) - Includes all available metadata
When snowflake.metadata.all=true (the default), all metadata fields are populated. Setting individual metadata flags to false excludes those specific fields from the RECORD_METADATA structure.
Note
The SnowflakeConnectorPushTime field is always available and represents the time when the connector pushed the record into the ingestion buffer. This is useful for calculating end-to-end ingestion latency.
The RECORD_METADATA structure contains the following information by default:
Field |
Data Type |
Description |
|---|---|---|
topic |
String |
The name of the Kafka topic that the record came from. |
partition |
String |
The number of the partition within the topic. (Note that this is the Kafka partition, not the Snowflake micro-partition.) |
offset |
number |
The offset in that partition. |
CreateTime / . LogAppendTime |
number |
This is the timestamp associated with the message in the Kafka topic. The value is milliseconds since midnight January 1, 1970, UTC. For more information, see: https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html (https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html) |
SnowflakeConnectorPushTime |
number |
A timestamp when a record was pushed into an Ingest SDK buffer. The value is the number of milliseconds since midnight January 1, 1970, UTC. For more information, see Estimating ingestion latency. |
key |
String |
If the message is a Kafka KeyedMessage, this is the key for that message. In order for the connector to store the key in the RECORD_METADATA, the key.converter parameter in the Kafka configuration properties must be set to “org.apache.kafka.connect.storage.StringConverter”; otherwise, the connector ignores keys. |
headers |
Object |
A header is a user-defined key-value pair associated with the record. Each record can have 0, 1, or multiple headers. |
The amount of metadata recorded in the RECORD_METADATA column is configurable using optional Kafka configuration properties.
The field names and values are case-sensitive.
How Kafka records are converted before ingestion¶
Before each row is handed over to Snowpipe Streaming, the connector converts the Kafka Connect record value into a Map<String, Object> whose keys must match your target column names (or can be transformed inside a user-defined pipe). Primitive strings, byte arrays, or numbers must be wrapped (for example by using the HoistField SMT) so that the connector receives a structured object. The converter applies the following rules:
Null values are treated as tombstones. They are skipped when
behavior.on.null.values=IGNOREor ingested as empty JSON objects otherwise.Numeric and boolean fields are passed through as-is. Decimal values whose precision is greater than 38 are serialized as strings to stay within Snowflake’s
NUMBERlimits.byte[]andByteBufferpayloads are Base64-encoded strings, so store them inVARIANTorVARCHARcolumns.Arrays remain arrays, and nested objects remain nested maps. Declare
VARIANTcolumns when you rely on the default pipe to land nested data as-is.Maps with non-string keys are emitted as arrays of
[key, value]pairs because Snowflake column names must be text.Record headers and keys are copied into
RECORD_METADATAwhenever the relevant metadata flags are enabled.
If you need the entire message body preserved as a single column, wrap it into a new top-level field using SMTs. See Legacy RECORD_CONTENT column for the transformation pattern.
User-defined pipe mode vs default pipe mode¶
The connector supports two modes for managing data ingestion:
User-defined pipe mode¶
In this mode, you have full control over data transformation and column mapping.
When to use this mode:
You need custom column names that differ from JSON field names
You need to apply data transformations (type casting, masking, filtering)
You want full control over how data is mapped to columns
Default pipe mode¶
In this mode, the connector uses a default pipe named {tableName}-STREAMING and maps kafka record fields to table columns matching by name (case-insensitive).
When to use this mode:
Your kafka record key names match your desired column names
You don’t need custom data transformations
You want simplified configuration
Mapping kafka record keys to table columns with default pipe mode
When using default pipe mode, the connector uses default pipe named {tableName}-STREAMING and maps content’s first-level keys directly to table columns using case-insensitive matching.
Using default pipe mode - example¶
Example 1:¶
Consider the following kafka record content payload:
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
"@&$#* includes special characters": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
You create a table with columns matching the JSON keys (case-insensitive, including special characters):
CREATE TABLE PERSON_DATA (
record_metadata VARIANT,
city VARCHAR,
age NUMBER,
married BOOLEAN,
"has cat" BOOLEAN,
"!@&$#* includes special characters" BOOLEAN,
skills VARIANT,
family VARIANT
);
Matching behavior:
"city"(kafka) →cityorCITYorCity(column) - case insensitive"has cat"(kafka) →"has cat"(column) - must be quoted due to space"!@&$#* includes special characters"(kafka) →"!@&$#* includes special characters"(column) - special characters preservedNested objects like
skillsandfamilymap to VARIANT columns automatically
Using user-defined pipe mode - examples¶
This example shows how to configure and use user-defined pipes with custom data transformations.
Example 1:¶
Create a table with your desired schema:
CREATE TABLE ORDERS (
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
order_date TIMESTAMP_NTZ,
source_topic VARCHAR
);
Create a pipe that transforms the incoming Kafka records to match your table schema:
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total::NUMBER,
$1:order_date::TIMESTAMP_NTZ,
$1:RECORD_METADATA.topic
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Note that the pipe name (ORDERS) matches the table name (ORDERS). The pipe definition extracts fields from the JSON payload using $1:field_name syntax and maps them to the table columns.
Note
You can access nested JSON fields and fields with special characters using bracket notation, such as $1['field name'] or $1['has cat'].
Configure topic to table mapping:
snowflake.topic2table.map=kafka-orders-topic:ORDERS
This configuration maps the Kafka topic kafka-orders-topic to the pre-existing table and pipe named ORDERS.
Example 2:¶
When you need to access keys in the content that do not have conventional names use the following syntax:
Simple fields:
$1:field_nameFields with spaces or special characters:
$1['field name']or$1['has cat']Fields with unicode characters:
$1[' @&$#* has Łułósżź']Nested fields:
$1:parent.childor$1:parent['child field']
Consider this JSON payload from Kafka:
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
" @&$#* has Łułósżź": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
You create a destination table with your chosen column names:
CREATE TABLE PERSON_DATA (
city VARCHAR,
age NUMBER,
married BOOLEAN,
has_cat BOOLEAN,
weird_field_name BOOLEAN,
skills VARIANT,
family VARIANT
);
Then create a pipe with the same name that defines the mapping:
CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
SELECT
$1:city,
$1:age,
$1:married,
$1['has cat'] AS has_cat,
$1[' @&$#* has Łułósżź'] AS weird_field_name,
$1:skills,
$1:family
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Key points:
You control column names (e.g., renaming
"has cat"tohas_cat)You can cast data types as needed (e.g.,
$1:age::NUMBER)You can include or exclude fields as desired
You can add metadata fields (e.g.,
$1:RECORD_METADATA.topic)VARIANT columns automatically handle nested JSON structures
Example 3: With interactive tables¶
Interactive tables are a special type of Snowflake table optimized for low-latency, high-concurrency queries. You can find out more about interactive tables in the interactive tables documentation.
Note
Interactive tables are currently a preview feature available only to selected accounts.
Create an interactive table:
CREATE INTERACTIVE TABLE REALTIME_METRICS ( metric_name VARCHAR, metric_value NUMBER, source_topic VARCHAR, timestamp TIMESTAMP_NTZ ) AS (SELECT $1:M_NAME::VARCHAR, $1:M_VALUE::NUMBER, $1:RECORD_METADATA.topic::VARCHAR, $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Configure topic to table mapping:
snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
Important considerations:
Interactive tables have specific limitations and query restrictions. Review the interactive tables documentation before using them with the connector.
For interactive tables, any required transformations must be handled in the table definition.
Interactive warehouses are required to query interactive tables efficiently.
Explicit topic-to-table mapping¶
When you configure the snowflake.topic2table.map parameter, the connector operates in explicit mapping mode. This mode allows you to:
Map multiple Kafka topics to a single Snowflake table
Use custom table names that differ from topic names
Apply regex patterns to match multiple topics
Configuration format:
The snowflake.topic2table.map parameter accepts a comma-separated list of topic-to-table mappings in the format:
topic1:table1,topic2:table2,topic3:table3
Example configurations:
Direct topic mapping
snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
Regex pattern matching
snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
This configuration maps all topics ending with _cat (such as orange_cat, calico_cat) to the CAT_TABLE table, and all topics ending with _dog to the DOG_TABLE table.
Many topics to one table
snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
This configuration maps both topic1 and topic2 to shared_table, while topic3 maps to other_table.
Important
Regex patterns in the mapping cannot overlap. Each topic must match at most one pattern.
Table names in the mapping must be valid Snowflake identifiers with at least 2 characters, starting with a letter or underscore.
You can map multiple topics to a single table.
Legacy RECORD_CONTENT column¶
In older versions of the connector, when the schematization feature was disabled, the connector created a destination table with two columns: RECORD_CONTENT and RECORD_METADATA. The RECORD_CONTENT column contained the entire Kafka message content in column of type VARIANT. The RECORD_METADATA column is still supported but the RECORD_CONTENT column is no longer created by the connector. The same functionality can be achieved using SMT transformations (see examples later in this section). The RECORD_CONTENT key is also no longer available in PIPE transformations. For example, this PIPE definition will not work by default:
Note
This pipe definition will not work without additional SMT transformations.
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
If you need entire Kafka message content saved to a single column, or you need a handle to the entire Kafka message content in a PIPE transformation, you can use the following SMT transformation that wraps the entire Kafka message content into your desired custom field:
transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
This transformation will wrap the entire Kafka message content into a custom field named your_top_level_field_name. You can then access the entire Kafka message content using the $1:your_top_level_field_name accessor in your PIPE transformation.
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Alternatively, if you want to save both the entire metadata and content to a single table using the default pipe, do not create a custom pipe; instead, create only a table with two columns: RECORD_CONTENT and your_top_level_field_name.
CREATE TABLE ORDERS (
record_metadata VARIANT,
your_top_level_field_name VARIANT
);
To read more about the HoistField$Value transformation, see the Kafka documentation (https://kafka.apache.org/39/documentation.html#connect_transforms).
Warning
Saving the entire Kafka message content and metadata to a table can negatively impact your ingestion cost, pipeline speed and latency. If you need the best possible performace, consider saving only the data you need if it is accessible from the top-level of the Kafka record content, or use SMT transformations to extract the data from deeply nested fields to top-level fields.
Handling streaming channel errors and dead-letter queues¶
In version 4.0.0-rc4 the connector inspects the Snowpipe Streaming channel status before committing offsets. If Snowflake reports rejected rows (rowsErrorCount > 0), the connector now raises a fatal error (ERROR_5030) when errors.tolerance=none so that data issues cannot go unnoticed. To allow ingestion to continue while you triage bad rows, set errors.tolerance=all
errors.tolerance=all
Schema evolution¶
Important
Schema evolution is not supported in the High Performance Snowflake Connector for Kafka. You must manually manage schema changes to your destination tables.
The connector does not automatically detect schema changes or evolve table schemas based on incoming Kafka records. When you need to add columns, modify data types, or make other schema changes, you must:
Pause the connector to stop data ingestion
Manually alter the table schema using ALTER TABLE or recreate the table
Update your pipe definition if using user-defined pipes and the transformation logic needs to change
Restart the connector to resume data ingestion
Note
Schema evolution support will be added in future releases.
Fault tolerance¶
Limitations of fault tolerance with the connector¶
Kafka Topics can be configured with a limit on storage space or retention time.
If the system is offline for more than the retention time, then expired records will not be loaded. Similarly, if Kafka’s storage space limit is exceeded, some messages will not be delivered.
If messages in the Kafka topic are deleted, these changes will not be reflected in the Snowflake table.