Migrate from Kafka connector v3 to v4

This topic describes how to migrate from the classic Kafka connector (v3 and earlier) to the Snowflake Connector for Kafka (v4).

Overview

The Snowflake Connector for Kafka (v4) is a ground-up rewrite that exclusively uses Snowpipe Streaming high-performance architecture. You must create a new connector configuration manually to migrate to v4.

Important

The v4 connector can’t be used as a drop-in replacement for v3. It uses a different connector class, different default behaviors, and a different feature set. Review the breaking changes and migration paths below before migrating.

Pricing changes

The v4 connector uses flat, throughput-based pricing based on the volume of data ingested (GB). This is the same pricing model as Snowpipe Streaming high-performance architecture. To estimate costs, multiply your data ingestion rate by the per-GB price listed on the Snowpipe Streaming cost page.

This replaces the v3 pricing model, which was based on serverless compute and file notifications.

Compatibility validation

By default, v4 enables a startup compatibility check (snowflake.streaming.validate.compatibility.with.classic=true) that prevents you from accidentally running v4 with a copied v3 configuration. When enabled, the connector validates at startup that you’ve explicitly configured the key migration settings. If any are missing or incompatible, the connector fails with a descriptive error message telling you exactly what to set.

The validator checks the following:

  • snowflake.validation is set to client_side

  • snowflake.compatibility.enable.column.identifier.normalization is set to true

  • snowflake.compatibility.enable.autogenerated.table.name.sanitization is set to true

  • snowflake.enable.schematization is explicitly set to true or false (the default changed from false in v3 to true in v4, so the validator requires you to confirm your choice)

  • snowflake.streaming.classic.offset.migration is explicitly set

  • snowflake.streaming.classic.offset.migration.include.connector.name is explicitly set (when offset migration is strict or best_effort)

After you’ve reviewed the breaking changes and explicitly configured these settings, you can set snowflake.streaming.validate.compatibility.with.classic=false to skip the check on subsequent restarts.

For full descriptions of these properties, see Schematization, validation, and compatibility properties and Offset migration properties.

Migration paths

The migration path depends on how your v3 connector was configured.

Before migrating, ensure that snowflake.metadata.topic, snowflake.metadata.offset.and.partition, and snowflake.metadata.createtime are enabled in your v3 connector (they’re on by default). This ensures that RECORD_METADATA contains the topic, partition, and offset fields needed for deduplication if any issues arise.

Migrating from v3 Snowpipe mode

If your v3 connector was using classic Snowpipe (the default snowflake.ingestion.method=SNOWPIPE), v4 migrates seamlessly using Kafka consumer group offsets.

  1. Stop the v3 connector.

  2. Wait for all staged data to be ingested into Snowflake. Classic Snowpipe stages files before loading them, and any files still in the queue when you stop the connector will be loaded asynchronously. Starting the v4 connector before this completes may cause data to land out of order.

  3. Deploy the new v4 configuration using the same connector name as v3 (same Kafka consumer group). Set the offset migration config to skip SSv1 migration:

    snowflake.streaming.classic.offset.migration=skip
    
  4. Start the v4 connector. It inherits the Kafka consumer group offsets and resumes ingestion where v3 left off.

Complete the switchover within offsets.retention.minutes (default 7 days) to avoid offset expiration.

This migration path doesn’t introduce duplicates or gaps.

Migrating from v3 Snowpipe Streaming mode

If your v3 connector was using Snowpipe Streaming (snowflake.ingestion.method=SNOWPIPE_STREAMING), v4 can automatically migrate the committed offsets from the v3 Snowpipe Streaming (SSv1) channels. This prevents duplicates or gaps.

  1. Stop the v3 connector.

  2. Deploy the new v4 configuration using the same connector name as v3. Configure the offset migration settings:

    # Use 'strict' to fail if SSv1 channels aren't found, or 'best_effort' to fall
    # back to Kafka consumer group offsets if channels aren't found.
    snowflake.streaming.classic.offset.migration=best_effort
    
    # Must match your v3 setting for snowflake.streaming.channel.name.include.connector.name.
    # Set to 'true' if your v3 connector included the connector name in channel names.
    snowflake.streaming.classic.offset.migration.include.connector.name=false
    
  3. Start the v4 connector. It recovers the committed offsets from the existing SSv1 channels and resumes ingestion where v3 left off.

Complete the switchover within offsets.retention.minutes (default 7 days).

Downgrading from v4 to v3

Downgrading from v4 back to v3 is possible by reversing the migration process. However, duplicate records are expected after a downgrade because v3 and v4 track offsets differently.

To downgrade:

  1. Stop the v4 connector.

  2. Deploy your v3 configuration using the same connector name.

  3. Start the v3 connector.

  4. After the downgrade, deduplicate your data using the RECORD_METADATA column. The following query removes duplicate records using a window function over topic, partition, and offset:

    DELETE FROM my_table
    WHERE RECORD_METADATA IS NOT NULL
      AND (RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset)
          IN (
            SELECT RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
            FROM (
              SELECT RECORD_METADATA,
                     ROW_NUMBER() OVER (
                       PARTITION BY RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
                       ORDER BY RECORD_METADATA:offset
                     ) AS rn
              FROM my_table
              WHERE RECORD_METADATA IS NOT NULL
            )
            WHERE rn > 1
          );
    

Important

Deduplication requires that RECORD_METADATA contains topic, partition, and offset fields. Ensure the snowflake.metadata.topic and snowflake.metadata.offset.and.partition settings are enabled before migrating to v4.

If you encounter issues during the downgrade, contact Snowflake Support.

Breaking changes

New connector class

Change

v3

v4

Connector class

com.snowflake.kafka.connector.SnowflakeSinkConnector

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

Ingestion method

Snowpipe (batch) or Snowpipe Streaming (optional)

Snowpipe Streaming only

Java version

Java 8+

Java 11+

Changed default behaviors

Configuration

v3 default

v4 default

snowflake.enable.schematization

false (records stored in RECORD_CONTENT and RECORD_METADATA VARIANT columns)

true (record fields mapped to individual table columns)

snowflake.validation

Client-side equivalent

server_side (validation performed by Snowflake backend)

snowflake.compatibility.enable.autogenerated.table.name.sanitization

true equivalent (invalid characters replaced, names uppercased)

false (topic names used as-is for table names, preserving case and special characters)

snowflake.compatibility.enable.column.identifier.normalization

true equivalent (column names uppercased)

false (column identifiers preserve case)

Removed configurations

The following configuration properties from v3 aren’t accepted in v4:

  • snowflake.ingestion.method (v4 exclusively uses Snowpipe Streaming)

  • buffer.flush.time, buffer.size.bytes, buffer.count.records (managed by the Snowpipe Streaming SDK)

  • snowflake.streaming.max.client.lag (managed by the SDK)

  • snowflake.streaming.enable.single.buffer

  • snowflake.streaming.max.memory.limit.bytes

  • snowflake.streaming.closeChannelsInParallel.enabled (always parallel in v4)

  • snowflake.streaming.iceberg.enabled (auto-detected in v4)

  • snowflake.snowpipe.* (Snowpipe non-streaming not supported)

  • enable.streaming.client.optimization

  • enable.streaming.channel.offset.migration (v3 internal channel name format migration, not needed in v4)

  • snowflake.streaming.channel.name.include.connector.name

  • enable.streaming.channel.offset.verification

  • snowflake.authenticator (only key-pair authentication supported)

  • snowflake.oauth.* (OAuth not supported in v4)

  • provider

Removed custom converters

The following Snowflake-provided custom converters aren’t available in v4:

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

Use standard community converters instead:

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

Authentication

v4 supports key-pair authentication only. If you use OAuth with v3, you must switch to key-pair authentication before migrating.

Migration steps

  1. Review breaking changes: Review the breaking changes above and determine how they affect your current deployment.

  2. Verify metadata settings: Before migrating, confirm that snowflake.metadata.topic and snowflake.metadata.offset.and.partition are enabled in your v3 connector (they’re on by default). This ensures deduplication is possible if needed.

  3. Create a new connector configuration: Create a new configuration file using the SnowflakeStreamingSinkConnector class. You can’t copy your v3 configuration directly because v4 has different defaults for schematization, validation, and identifier handling. See Snowflake Connector for Kafka: Install and configure for the full configuration reference.

  4. Configure compatibility and offset migration settings: The v4 connector validates these settings at startup. You must explicitly set the following:

    • snowflake.enable.schematization: Set to true (new v4 behavior) or false (v3 behavior).

    • snowflake.validation: Set to client_side for v3 compatibility or server_side for v4 defaults.

    • snowflake.compatibility.enable.autogenerated.table.name.sanitization: Set to true for v3 compatibility.

    • snowflake.compatibility.enable.column.identifier.normalization: Set to true for v3 compatibility.

    • snowflake.streaming.classic.offset.migration: Set to skip if migrating from Snowpipe mode, or best_effort / strict if migrating from Snowpipe Streaming mode.

    For more information, see Compatibility validation.

  5. Replace custom converters: If you use Snowflake-provided converters, replace them with the community equivalents listed above.

  6. Follow the migration path for your ingestion mode: See Migrating from Snowpipe mode or Migrating from Snowpipe Streaming mode above.

  7. Test with sample data: Deploy the new connector configuration against a test environment and verify that data flows correctly before migrating production workloads.

  8. Adopt v4 defaults incrementally: Once your migration is validated, consider incrementally adopting v4 defaults (server-side validation, case-sensitive identifiers) for improved performance and alignment with Snowflake conventions.