How the Snowflake High Performance connector for Kafka works

This topic describes how various aspects of the connector, how it works with tables and pipes, and how to configure the connector.

How the connector works with tables and pipes

The High Performance Snowflake Connector for Kafka requires you to create destination tables manually. The connector treats each Kafka record as a row to be inserted into a Snowflake table. For example, if you have a Kafka topic with the content of the message structured like :

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}
Copy

You can create a table with columns matching the JSON keys, and rely on a default pipe named {tableName}-STREAMING which will automatically map the record content’s first-level keys to table columns matching by name (case-insensitive).

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  ispaid BOOLEAN
);
Copy

If you choose to create your own pipe, you can define the data transformation logic in the pipe’s COPY INTO statement. You can rename columns as required and cast the data types as needed. For example:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:order_id::STRING,
  $1:customer_name,
  $1:order_total::STRING,
  $1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

or

CREATE TABLE ORDERS (
 topic VARCHAR,
 partition VARCHAR,
 order_id VARCHAR,
 customer_name VARCHAR,
 order_total VARCHAR,
 ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:RECORD_METADATA.topic::STRING AS topic,
  $1:RECORD_METADATA.partition::STRING AS partition,
  $1['order_id']::STRING AS order_id,
  $1['customer_name']::STRING as customer_name,
  CONCAT($1['order_total']::STRING, ' USD') AS order_total,
  $1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

When you define your own pipe your destination table columns do not need to match the JSON keys. You can rename the columns to your desired names and cast the data types as needed.

Topic names, table names, and pipe names

Depending on the configuration settings, the connector will use different names for the destination table. The destination table name is always derived from the topic name.

How the connector maps topic names to the destination table

The Kafka connector provides two modes for mapping Kafka topic names to Snowflake table names:

  • Static mapping: The connector derives destination table names using only Kafka topic name.

  • Explicit topic-to-table mapping mode: You specify custom mappings between topics and tables using the snowflake.topic2table.map configuration parameter

Static mapping

If you do not configure the snowflake.topic2table.map parameter, the connector always derives the table names from the topic name.

Table name generation:

The connector derives the destination table name from the topic name using the following rules:

  1. If the topic name is a valid Snowflake identifier (starts with a letter or underscore, and contains only letters, digits, underscores, or dollar signs), the connector uses the topic name as the table name (converted to uppercase).

  2. If the topic name contains invalid characters, the connector:

    • Replaces invalid characters with underscores

    • Appends an underscore followed by a hash code to ensure uniqueness

    • For example, the topic my-topic.data becomes MY_TOPIC_DATA_<hash>

Pipe name determination:

The connector determines which pipe to use based on the following logic:

  1. The connector checks if a pipe exists with the same name as the destination table name.

  2. If a user-created pipe with that name exists, the connector uses that pipe (user-defined pipe mode).

  3. If not, the connector uses the default pipe named {tableName}-STREAMING

Note

Snowflake recommends choosing topic names that follow the rules for Snowflake identifier names to ensure predictable table names.

Understanding RECORD_METADATA

The connector populates the RECORD_METADATA structure with metadata about the Kafka record. This metadata is sent through the Snowpipe Streaming data source to Snowflake, where it becomes available in pipe transformations using the $1:RECORD_METADATA accessor. RECORD_METADATA structure is available in both user-defined pipe and default pipe modes. Its content can be saved to the column of type VARIANT, or individual fields can be extracted and saved to separate columns.

Example pipe with transformations and metadata:

CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total,
    $1:RECORD_METADATA.topic AS source_topic,
    $1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
    $1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

In this example:

  • The pipe extracts specific fields from the Kafka message (order_id, customer_name, order_total)

  • It also captures metadata fields (topic, offset, and ingestion timestamp)

  • The values can be cast and/or transformed as needed

How metadata fields are populated

The connector automatically populates metadata fields based on the Kafka record properties and connector configuration. You can control which metadata fields are included using these configuration parameters:

  • snowflake.metadata.topic (default: true) - Includes the topic name

  • snowflake.metadata.offset.and.partition (default: true) - Includes offset and partition

  • snowflake.metadata.createtime (default: true) - Includes the Kafka record timestamp

  • snowflake.metadata.all (default: true) - Includes all available metadata

When snowflake.metadata.all=true (the default), all metadata fields are populated. Setting individual metadata flags to false excludes those specific fields from the RECORD_METADATA structure.

Note

The SnowflakeConnectorPushTime field is always available and represents the time when the connector pushed the record into the ingestion buffer. This is useful for calculating end-to-end ingestion latency.

The RECORD_METADATA structure contains the following information by default:

Field

Data Type

Description

topic

String

The name of the Kafka topic that the record came from.

partition

String

The number of the partition within the topic. (Note that this is the Kafka partition, not the Snowflake micro-partition.)

offset

number

The offset in that partition.

CreateTime / . LogAppendTime

number

This is the timestamp associated with the message in the Kafka topic. The value is milliseconds since midnight January 1, 1970, UTC. For more information, see: https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html (https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html)

SnowflakeConnectorPushTime

number

A timestamp when a record was pushed into an Ingest SDK buffer. The value is the number of milliseconds since midnight January 1, 1970, UTC. For more information, see Estimating ingestion latency.

key

String

If the message is a Kafka KeyedMessage, this is the key for that message. In order for the connector to store the key in the RECORD_METADATA, the key.converter parameter in the Kafka configuration properties must be set to “org.apache.kafka.connect.storage.StringConverter”; otherwise, the connector ignores keys.

headers

Object

A header is a user-defined key-value pair associated with the record. Each record can have 0, 1, or multiple headers.

The amount of metadata recorded in the RECORD_METADATA column is configurable using optional Kafka configuration properties.

The field names and values are case-sensitive.

How Kafka records are converted before ingestion

Before each row is handed over to Snowpipe Streaming, the connector converts the Kafka Connect record value into a Map<String, Object> whose keys must match your target column names (or can be transformed inside a user-defined pipe). Primitive strings, byte arrays, or numbers must be wrapped (for example by using the HoistField SMT) so that the connector receives a structured object. The converter applies the following rules:

  • Null values are treated as tombstones. They are skipped when behavior.on.null.values=IGNORE or ingested as empty JSON objects otherwise.

  • Numeric and boolean fields are passed through as-is. Decimal values whose precision is greater than 38 are serialized as strings to stay within Snowflake’s NUMBER limits.

  • byte[] and ByteBuffer payloads are Base64-encoded strings, so store them in VARIANT or VARCHAR columns.

  • Arrays remain arrays, and nested objects remain nested maps. Declare VARIANT columns when you rely on the default pipe to land nested data as-is.

  • Maps with non-string keys are emitted as arrays of [key, value] pairs because Snowflake column names must be text.

  • Record headers and keys are copied into RECORD_METADATA whenever the relevant metadata flags are enabled.

If you need the entire message body preserved as a single column, wrap it into a new top-level field using SMTs. See Legacy RECORD_CONTENT column for the transformation pattern.

User-defined pipe mode vs default pipe mode

The connector supports two modes for managing data ingestion:

User-defined pipe mode

In this mode, you have full control over data transformation and column mapping.

When to use this mode:

  • You need custom column names that differ from JSON field names

  • You need to apply data transformations (type casting, masking, filtering)

  • You want full control over how data is mapped to columns

Default pipe mode

In this mode, the connector uses a default pipe named {tableName}-STREAMING and maps kafka record fields to table columns matching by name (case-insensitive).

When to use this mode:

  • Your kafka record key names match your desired column names

  • You don’t need custom data transformations

  • You want simplified configuration

Mapping kafka record keys to table columns with default pipe mode

When using default pipe mode, the connector uses default pipe named {tableName}-STREAMING and maps content’s first-level keys directly to table columns using case-insensitive matching.

Using default pipe mode - example

Example 1:

Consider the following kafka record content payload:

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  "@&$#* includes special characters": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

You create a table with columns matching the JSON keys (case-insensitive, including special characters):

CREATE TABLE PERSON_DATA (
  record_metadata VARIANT,
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  "has cat" BOOLEAN,
  "!@&$#* includes special characters" BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

Matching behavior:

  • "city" (kafka) → city or CITY or City (column) - case insensitive

  • "has cat" (kafka) → "has cat" (column) - must be quoted due to space

  • "!@&$#* includes special characters" (kafka) → "!@&$#* includes special characters" (column) - special characters preserved

  • Nested objects like skills and family map to VARIANT columns automatically

Using user-defined pipe mode - examples

This example shows how to configure and use user-defined pipes with custom data transformations.

Example 1:

Create a table with your desired schema:

CREATE TABLE ORDERS (
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  order_date TIMESTAMP_NTZ,
  source_topic VARCHAR
);
Copy

Create a pipe that transforms the incoming Kafka records to match your table schema:

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total::NUMBER,
    $1:order_date::TIMESTAMP_NTZ,
    $1:RECORD_METADATA.topic
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Note that the pipe name (ORDERS) matches the table name (ORDERS). The pipe definition extracts fields from the JSON payload using $1:field_name syntax and maps them to the table columns.

Note

You can access nested JSON fields and fields with special characters using bracket notation, such as $1['field name'] or $1['has cat'].

Configure topic to table mapping:

snowflake.topic2table.map=kafka-orders-topic:ORDERS
Copy

This configuration maps the Kafka topic kafka-orders-topic to the pre-existing table and pipe named ORDERS.

Example 2:

When you need to access keys in the content that do not have conventional names use the following syntax:

  • Simple fields: $1:field_name

  • Fields with spaces or special characters: $1['field name'] or $1['has cat']

  • Fields with unicode characters: $1[' @&$#* has Łułósżź']

  • Nested fields: $1:parent.child or $1:parent['child field']

Consider this JSON payload from Kafka:

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  " @&$#* has Łułósżź": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

You create a destination table with your chosen column names:

CREATE TABLE PERSON_DATA (
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  has_cat BOOLEAN,
  weird_field_name BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

Then create a pipe with the same name that defines the mapping:

CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
  SELECT
    $1:city,
    $1:age,
    $1:married,
    $1['has cat'] AS has_cat,
    $1[' @&$#* has Łułósżź'] AS weird_field_name,
    $1:skills,
    $1:family
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Key points:

  • You control column names (e.g., renaming "has cat" to has_cat)

  • You can cast data types as needed (e.g., $1:age::NUMBER)

  • You can include or exclude fields as desired

  • You can add metadata fields (e.g., $1:RECORD_METADATA.topic)

  • VARIANT columns automatically handle nested JSON structures

Example 3: With interactive tables

Interactive tables are a special type of Snowflake table optimized for low-latency, high-concurrency queries. You can find out more about interactive tables in the interactive tables documentation.

Note

Interactive tables are currently a preview feature available only to selected accounts.

  1. Create an interactive table:

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) AS (SELECT
          $1:M_NAME::VARCHAR,
          $1:M_VALUE::NUMBER,
          $1:RECORD_METADATA.topic::VARCHAR,
          $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
          from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    
    Copy
  2. Configure topic to table mapping:

    snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
    
    Copy

Important considerations:

  • Interactive tables have specific limitations and query restrictions. Review the interactive tables documentation before using them with the connector.

  • For interactive tables, any required transformations must be handled in the table definition.

  • Interactive warehouses are required to query interactive tables efficiently.

Explicit topic-to-table mapping

When you configure the snowflake.topic2table.map parameter, the connector operates in explicit mapping mode. This mode allows you to:

  • Map multiple Kafka topics to a single Snowflake table

  • Use custom table names that differ from topic names

  • Apply regex patterns to match multiple topics

Configuration format:

The snowflake.topic2table.map parameter accepts a comma-separated list of topic-to-table mappings in the format:

topic1:table1,topic2:table2,topic3:table3
Copy

Example configurations:

Direct topic mapping

snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
Copy

Regex pattern matching

snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
Copy

This configuration maps all topics ending with _cat (such as orange_cat, calico_cat) to the CAT_TABLE table, and all topics ending with _dog to the DOG_TABLE table.

Many topics to one table

snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
Copy

This configuration maps both topic1 and topic2 to shared_table, while topic3 maps to other_table.

Important

  • Regex patterns in the mapping cannot overlap. Each topic must match at most one pattern.

  • Table names in the mapping must be valid Snowflake identifiers with at least 2 characters, starting with a letter or underscore.

  • You can map multiple topics to a single table.

Legacy RECORD_CONTENT column

In older versions of the connector, when the schematization feature was disabled, the connector created a destination table with two columns: RECORD_CONTENT and RECORD_METADATA. The RECORD_CONTENT column contained the entire Kafka message content in column of type VARIANT. The RECORD_METADATA column is still supported but the RECORD_CONTENT column is no longer created by the connector. The same functionality can be achieved using SMT transformations (see examples later in this section). The RECORD_CONTENT key is also no longer available in PIPE transformations. For example, this PIPE definition will not work by default:

Note

This pipe definition will not work without additional SMT transformations.

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

If you need entire Kafka message content saved to a single column, or you need a handle to the entire Kafka message content in a PIPE transformation, you can use the following SMT transformation that wraps the entire Kafka message content into your desired custom field:

transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
Copy

This transformation will wrap the entire Kafka message content into a custom field named your_top_level_field_name. You can then access the entire Kafka message content using the $1:your_top_level_field_name accessor in your PIPE transformation.

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

Alternatively, if you want to save both the entire metadata and content to a single table using the default pipe, do not create a custom pipe; instead, create only a table with two columns: RECORD_CONTENT and your_top_level_field_name.

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  your_top_level_field_name VARIANT
);
Copy

To read more about the HoistField$Value transformation, see the Kafka documentation (https://kafka.apache.org/39/documentation.html#connect_transforms).

Warning

Saving the entire Kafka message content and metadata to a table can negatively impact your ingestion cost, pipeline speed and latency. If you need the best possible performace, consider saving only the data you need if it is accessible from the top-level of the Kafka record content, or use SMT transformations to extract the data from deeply nested fields to top-level fields.

Handling streaming channel errors and dead-letter queues

In version 4.0.0-rc4 the connector inspects the Snowpipe Streaming channel status before committing offsets. If Snowflake reports rejected rows (rowsErrorCount > 0), the connector now raises a fatal error (ERROR_5030) when errors.tolerance=none so that data issues cannot go unnoticed. To allow ingestion to continue while you triage bad rows, set errors.tolerance=all

errors.tolerance=all
Copy

Schema evolution

Important

Schema evolution is not supported in the High Performance Snowflake Connector for Kafka. You must manually manage schema changes to your destination tables.

The connector does not automatically detect schema changes or evolve table schemas based on incoming Kafka records. When you need to add columns, modify data types, or make other schema changes, you must:

  1. Pause the connector to stop data ingestion

  2. Manually alter the table schema using ALTER TABLE or recreate the table

  3. Update your pipe definition if using user-defined pipes and the transformation logic needs to change

  4. Restart the connector to resume data ingestion

Note

Schema evolution support will be added in future releases.

Fault tolerance

Limitations of fault tolerance with the connector

Kafka Topics can be configured with a limit on storage space or retention time.

  • If the system is offline for more than the retention time, then expired records will not be loaded. Similarly, if Kafka’s storage space limit is exceeded, some messages will not be delivered.

  • If messages in the Kafka topic are deleted, these changes will not be reflected in the Snowflake table.

Next steps

Set up tasks.

Language: English