从 Kafka Connector v3 迁移到 v4

本主题介绍如何从经典 Kafka Connector(v3 及更早版本)迁移到 Snowflake Connector for Kafka (v4)。

概述

Snowflake Connector for Kafka (v4) 是从头重写的,专门使用 Snowpipe Streaming 高性能架构。您必须手动创建新的连接器配置才能迁移到 v4。

重要

v4 连接器不能直接替代 v3 连接器。它使用不同的连接器类、不同的默认行为和不同的功能集。在迁移之前,请查看下面的重大变更和迁移路径。

定价变更

v4 连接器使用基于引入数据量 (GB) 的统一定价,即基于吞吐量的定价。这与 Snowpipe Streaming 高性能架构 的定价模型相同。要估算成本,请将您的数据引入速率乘以 Snowpipe Streaming 成本页面 上列出的每 GB 价格。

这取代了 v3 的定价模型,后者基于无服务器计算和文件通知。

兼容性验证

默认情况下,v4 会启用启动兼容性检查 (snowflake.streaming.validate.compatibility.with.classic=true),以防止您意外使用复制的 v3 配置运行 v4。启用后,连接器会在启动时验证您是否已显式配置了关键的迁移设置。如果缺少任何设置或存在不兼容设置,连接器将失败,并显示描述性错误消息,准确告知您需要设置什么。

验证器会检查以下内容:

  • snowflake.validation 设置为 client_side

  • snowflake.compatibility.enable.column.identifier.normalization 设置为 true

  • snowflake.compatibility.enable.autogenerated.table.name.sanitization 设置为 true

  • snowflake.enable.schematization 显式设置为 truefalse``(默认值从 v3 中的 ``false 更改为 v4 中的 true,因此验证器要求您确认您的选择)

  • snowflake.streaming.classic.offset.migration 已显式设置

  • snowflake.streaming.classic.offset.migration.include.connector.name 已显式设置(当偏移量迁移为 strictbest_effort 时)

在您查看了重大变更并显式配置了这些设置之后,您可以设置 snowflake.streaming.validate.compatibility.with.classic=false 以在后续重启时跳过检查。

有关这些属性的完整说明,请参阅 :ref:` 自动格式演化、验证和兼容性属性 <label-kafkahp_migration_properties>` 和 偏移迁移属性

迁移路径

迁移路径取决于您的 v3 连接器的配置方式。

在迁移之前,请确保您的 v3 连接器中已启用 snowflake.metadata.topicsnowflake.metadata.offset.and.partitionsnowflake.metadata.createtime``(它们默认是开启的)。这可以确保在出现任何问题时,``RECORD_METADATA 包含去重所需的主题、分区和偏移量字段。

从 v3 Snowpipe 模式迁移

如果您的 v3 连接器使用的是经典 Snowpipe(默认 snowflake.ingestion.method=SNOWPIPE),则 v4 会使用 Kafka 使用者组偏移量进行无缝迁移。

  1. 停止 v3 连接器。

  2. 等待所有暂存的数据被引入到 Snowflake 中。 经典 Snowpipe 在加载文件之前会先暂存文件,当您停止连接器时,队列中仍存在的任何文件都将被异步加载。在此过程完成之前启动 v4 连接器可能会导致数据写入顺序错乱。

  3. 使用与 v3 **相同的连接器名称**(相同的 Kafka 使用者组)部署新的 v4 配置。将偏移量迁移配置设置为跳过 SSv1 迁移:

    snowflake.streaming.classic.offset.migration=skip
    
  4. 启动 v4 连接器。它会继承 Kafka 使用者组偏移量,并从 v3 停止的位置恢复引入。

``offsets.retention.minutes``(默认 7 天)内完成切换,以避免偏移量过期。

此迁移路径不会引入重复数据或数据缺失。

从 v3 Snowpipe Streaming 模式迁移

如果您的 v3 连接器使用的是 Snowpipe Streaming (snowflake.ingestion.method=SNOWPIPE_STREAMING),则 v4 可以自动从 v3 Snowpipe Streaming (SSv1) 渠道迁移已提交的偏移量。这可以防止重复数据或数据缺失。

  1. 停止 v3 连接器。

  2. 使用与 v3 相同的连接器名称 部署新的 v4 配置。配置偏移迁移设置:

    # Use 'strict' to fail if SSv1 channels aren't found, or 'best_effort' to fall
    # back to Kafka consumer group offsets if channels aren't found.
    snowflake.streaming.classic.offset.migration=best_effort
    
    # Must match your v3 setting for snowflake.streaming.channel.name.include.connector.name.
    # Set to 'true' if your v3 connector included the connector name in channel names.
    snowflake.streaming.classic.offset.migration.include.connector.name=false
    
  3. 启动 v4 连接器。它会从现有的 SSv1 通道恢复已提交的偏移量,并从 v3 停止的位置恢复引入。

``offsets.retention.minutes``(默认 7 天)内完成切换。

从 v4 降级到 v3

通过逆向执行迁移过程,可以从 v4 降级回 v3。然而,降级后预计会出现重复记录,因为 v3 和 v4 跟踪偏移量的方式不同。

要降级,请执行以下操作:

  1. 停止 v4 连接器。

  2. 使用 相同的连接器名称 部署 v3 配置。

  3. 启动 v3 连接器。

  4. 降级后,使用 RECORD_METADATA 列对数据进行去重。以下查询使用窗口函数基于主题、分区和偏移量删除重复记录:

    DELETE FROM my_table
    WHERE RECORD_METADATA IS NOT NULL
      AND (RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset)
          IN (
            SELECT RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
            FROM (
              SELECT RECORD_METADATA,
                     ROW_NUMBER() OVER (
                       PARTITION BY RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
                       ORDER BY RECORD_METADATA:offset
                     ) AS rn
              FROM my_table
              WHERE RECORD_METADATA IS NOT NULL
            )
            WHERE rn > 1
          );
    

重要

去重要求 RECORD_METADATA 包含主题、分区和偏移量字段。请确保在迁移到 v4 之前 启用了 snowflake.metadata.topicsnowflake.metadata.offset.and.partition 设置。

如果在降级过程中遇到问题,请联系 Snowflake 支持部门

重大变更

新连接器类

变更

v3

v4

连接器类

com.snowflake.kafka.connector.SnowflakeSinkConnector

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

引入方法

Snowpipe(批处理)或 Snowpipe Streaming(可选)

仅 Snowpipe Streaming

Java 版本

Java 8 及以上

Java 11 及以上

更改的默认行为

配置

v3 默认值

v4 默认值

snowflake.enable.schematization

``false``(记录存储在 RECORD_CONTENT 和 RECORD_METADATA VARIANT 列中)

``true``(记录字段映射到各个表列)

snowflake.validation

客户端等效

``server_side``(由 Snowflake 后端执行验证)

snowflake.compatibility.enable.autogenerated.table.name.sanitization

true 等效(替换无效字符,名称转为大写)

``false``(主题名称原样用作表名,保留大小写和特殊字符)

snowflake.compatibility.enable.column.identifier.normalization

true 等效(列名转为大写)

``false``(列标识符保留大小写)

已移除的配置

v4 不接受来自 v3 的以下配置属性:

  • ``snowflake.ingestion.method``(v4 专门使用 Snowpipe Streaming)

  • buffer.flush.timebuffer.size.bytes``buffer.count.records``(由 Snowpipe Streaming SDK 管理)

  • ``snowflake.streaming.max.client.lag``(由 SDK 管理)

  • snowflake.streaming.enable.single.buffer

  • snowflake.streaming.max.memory.limit.bytes

  • ``snowflake.streaming.closeChannelsInParallel.enabled``(在 v4 中始终并行)

  • ``snowflake.streaming.iceberg.enabled``(在 v4 中自动检测)

  • ``snowflake.snowpipe.*``(不支持非流式 Snowpipe)

  • enable.streaming.client.optimization

  • ``enable.streaming.channel.offset.migration``(v3 内部渠道名称格式迁移,v4 中不需要)

  • snowflake.streaming.channel.name.include.connector.name

  • enable.streaming.channel.offset.verification

  • ``snowflake.authenticator``(仅支持密钥对认证)

  • ``snowflake.oauth.*``(v4 不支持 OAuth)

  • provider

已移除的自定义转换器

v4 中不提供以下 Snowflake 提供的自定义转换器:

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

请改用标准的社区转换器:

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

身份验证

v4 仅支持密钥对身份验证。如果您在 v3 中使用 OAuth,则必须在迁移前切换到密钥对认证。

迁移步骤

  1. 查看重大变更:查看上述重大变更,并确定它们对您当前部署的影响。

  2. 验证元数据设置:在迁移之前,确认您的 v3 连接器中已启用 snowflake.metadata.topic``snowflake.metadata.offset.and.partition``(它们默认是开启的)。这可以确保在需要时能够进行去重。

  3. 创建新的连接器配置:使用 SnowflakeStreamingSinkConnector 类创建新的配置文件。您不能直接复制 v3 配置,因为 v4 在自动格式演化、验证和标识符处理方面有不同的默认值。完整的配置参考请参阅 Snowflake Connector for Kafka:选择使用 时默认使用的角色和仓库。安装并配置

  4. 配置兼容性和偏移迁移设置:v4 连接器在启动时验证这些设置。您必须显式设置以下内容:

    • snowflake.enable.schematization:选择使用 时默认使用的角色和仓库。设置为 ``true``(新的 v4 行为)或 ``false``(v3 行为)。

    • snowflake.validation:选择使用 时默认使用的角色和仓库。设置为 client_side 以兼容 v3,或设置为 server_side 以使用 v4 默认值。

    • snowflake.compatibility.enable.autogenerated.table.name.sanitization:选择使用 时默认使用的角色和仓库。设置为 true 以兼容 v3。

    • snowflake.compatibility.enable.column.identifier.normalization:选择使用 时默认使用的角色和仓库。设置为 true 以兼容 v3。

    • snowflake.streaming.classic.offset.migration:选择使用 时默认使用的角色和仓库。如果从 Snowpipe 模式迁移,则设置为 skip;如果从 Snowpipe Streaming 模式迁移,则设置为 best_effort/strict

    有关更多信息,请参阅 兼容性验证

  5. 替换自定义转换器:如果您使用 Snowflake 提供的转换器,请将其替换为上面列出的社区等效转换器。

  6. 遵循引入模式的迁移路径:请参阅上文中的 从 Snowpipe 模式迁移从 Snowpipe Streaming 模式迁移

  7. 使用示例数据进行测试:在测试环境中部署新的连接器配置,并在迁移生产工作负载之前验证数据流是否正常。

  8. 逐步采用 v4 默认值:在您的迁移得到验证后,请考虑逐步采用 v4 默认值(服务端验证、区分大小写的标识符),以提高性能并与 Snowflake 规范保持一致。