从 Kafka Connector v3 迁移到 v4¶
本主题介绍如何从经典 Kafka Connector(v3 及更早版本)迁移到 Snowflake Connector for Kafka (v4)。
概述¶
Snowflake Connector for Kafka (v4) 是从头重写的,专门使用 Snowpipe Streaming 高性能架构。您必须手动创建新的连接器配置才能迁移到 v4。
重要
v4 连接器不能直接替代 v3 连接器。它使用不同的连接器类、不同的默认行为和不同的功能集。在迁移之前,请查看下面的重大变更和迁移路径。
定价变更¶
v4 连接器使用基于引入数据量 (GB) 的统一定价,即基于吞吐量的定价。这与 Snowpipe Streaming 高性能架构 的定价模型相同。要估算成本,请将您的数据引入速率乘以 Snowpipe Streaming 成本页面 上列出的每 GB 价格。
这取代了 v3 的定价模型,后者基于无服务器计算和文件通知。
兼容性验证¶
默认情况下,v4 会启用启动兼容性检查 (snowflake.streaming.validate.compatibility.with.classic=true),以防止您意外使用复制的 v3 配置运行 v4。启用后,连接器会在启动时验证您是否已显式配置了关键的迁移设置。如果缺少任何设置或存在不兼容设置,连接器将失败,并显示描述性错误消息,准确告知您需要设置什么。
验证器会检查以下内容:
将
snowflake.validation设置为client_side将
snowflake.compatibility.enable.column.identifier.normalization设置为true将
snowflake.compatibility.enable.autogenerated.table.name.sanitization设置为truesnowflake.enable.schematization显式设置为true或false``(默认值从 v3 中的 ``false更改为 v4 中的true,因此验证器要求您确认您的选择)snowflake.streaming.classic.offset.migration已显式设置snowflake.streaming.classic.offset.migration.include.connector.name已显式设置(当偏移量迁移为strict或best_effort时)
在您查看了重大变更并显式配置了这些设置之后,您可以设置 snowflake.streaming.validate.compatibility.with.classic=false 以在后续重启时跳过检查。
有关这些属性的完整说明,请参阅 :ref:` 自动格式演化、验证和兼容性属性 <label-kafkahp_migration_properties>` 和 偏移迁移属性。
迁移路径¶
迁移路径取决于您的 v3 连接器的配置方式。
在迁移之前,请确保您的 v3 连接器中已启用 snowflake.metadata.topic、snowflake.metadata.offset.and.partition 和 snowflake.metadata.createtime``(它们默认是开启的)。这可以确保在出现任何问题时,``RECORD_METADATA 包含去重所需的主题、分区和偏移量字段。
从 v3 Snowpipe 模式迁移¶
如果您的 v3 连接器使用的是经典 Snowpipe(默认 snowflake.ingestion.method=SNOWPIPE),则 v4 会使用 Kafka 使用者组偏移量进行无缝迁移。
停止 v3 连接器。
等待所有暂存的数据被引入到 Snowflake 中。 经典 Snowpipe 在加载文件之前会先暂存文件,当您停止连接器时,队列中仍存在的任何文件都将被异步加载。在此过程完成之前启动 v4 连接器可能会导致数据写入顺序错乱。
使用与 v3 **相同的连接器名称**(相同的 Kafka 使用者组)部署新的 v4 配置。将偏移量迁移配置设置为跳过 SSv1 迁移:
启动 v4 连接器。它会继承 Kafka 使用者组偏移量,并从 v3 停止的位置恢复引入。
在 ``offsets.retention.minutes``(默认 7 天)内完成切换,以避免偏移量过期。
此迁移路径不会引入重复数据或数据缺失。
从 v3 Snowpipe Streaming 模式迁移¶
如果您的 v3 连接器使用的是 Snowpipe Streaming (snowflake.ingestion.method=SNOWPIPE_STREAMING),则 v4 可以自动从 v3 Snowpipe Streaming (SSv1) 渠道迁移已提交的偏移量。这可以防止重复数据或数据缺失。
停止 v3 连接器。
使用与 v3 相同的连接器名称 部署新的 v4 配置。配置偏移迁移设置:
启动 v4 连接器。它会从现有的 SSv1 通道恢复已提交的偏移量,并从 v3 停止的位置恢复引入。
在 ``offsets.retention.minutes``(默认 7 天)内完成切换。
从 v4 降级到 v3¶
通过逆向执行迁移过程,可以从 v4 降级回 v3。然而,降级后预计会出现重复记录,因为 v3 和 v4 跟踪偏移量的方式不同。
要降级,请执行以下操作:
停止 v4 连接器。
使用 相同的连接器名称 部署 v3 配置。
启动 v3 连接器。
降级后,使用
RECORD_METADATA列对数据进行去重。以下查询使用窗口函数基于主题、分区和偏移量删除重复记录:
重要
去重要求 RECORD_METADATA 包含主题、分区和偏移量字段。请确保在迁移到 v4 之前 启用了 snowflake.metadata.topic 和 snowflake.metadata.offset.and.partition 设置。
如果在降级过程中遇到问题,请联系 Snowflake 支持部门。
重大变更¶
新连接器类¶
变更 |
v3 |
v4 |
|---|---|---|
连接器类 |
|
|
引入方法 |
Snowpipe(批处理)或 Snowpipe Streaming(可选) |
仅 Snowpipe Streaming |
Java 版本 |
Java 8 及以上 |
Java 11 及以上 |
更改的默认行为¶
配置 |
v3 默认值 |
v4 默认值 |
|---|---|---|
|
``false``(记录存储在 RECORD_CONTENT 和 RECORD_METADATA VARIANT 列中) |
``true``(记录字段映射到各个表列) |
|
客户端等效 |
``server_side``(由 Snowflake 后端执行验证) |
|
|
``false``(主题名称原样用作表名,保留大小写和特殊字符) |
|
|
``false``(列标识符保留大小写) |
已移除的配置¶
v4 不接受来自 v3 的以下配置属性:
``snowflake.ingestion.method``(v4 专门使用 Snowpipe Streaming)
buffer.flush.time、buffer.size.bytes、``buffer.count.records``(由 Snowpipe Streaming SDK 管理)``snowflake.streaming.max.client.lag``(由 SDK 管理)
snowflake.streaming.enable.single.buffersnowflake.streaming.max.memory.limit.bytes``snowflake.streaming.closeChannelsInParallel.enabled``(在 v4 中始终并行)
``snowflake.streaming.iceberg.enabled``(在 v4 中自动检测)
``snowflake.snowpipe.*``(不支持非流式 Snowpipe)
enable.streaming.client.optimization``enable.streaming.channel.offset.migration``(v3 内部渠道名称格式迁移,v4 中不需要)
snowflake.streaming.channel.name.include.connector.nameenable.streaming.channel.offset.verification``snowflake.authenticator``(仅支持密钥对认证)
``snowflake.oauth.*``(v4 不支持 OAuth)
provider
已移除的自定义转换器¶
v4 中不提供以下 Snowflake 提供的自定义转换器:
com.snowflake.kafka.connector.records.SnowflakeJsonConvertercom.snowflake.kafka.connector.records.SnowflakeAvroConvertercom.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry
请改用标准的社区转换器:
org.apache.kafka.connect.json.JsonConverterio.confluent.connect.avro.AvroConverterio.confluent.connect.protobuf.ProtobufConverter
身份验证¶
v4 仅支持密钥对身份验证。如果您在 v3 中使用 OAuth,则必须在迁移前切换到密钥对认证。
迁移步骤¶
查看重大变更:查看上述重大变更,并确定它们对您当前部署的影响。
验证元数据设置:在迁移之前,确认您的 v3 连接器中已启用
snowflake.metadata.topic和 ``snowflake.metadata.offset.and.partition``(它们默认是开启的)。这可以确保在需要时能够进行去重。创建新的连接器配置:使用
SnowflakeStreamingSinkConnector类创建新的配置文件。您不能直接复制 v3 配置,因为 v4 在自动格式演化、验证和标识符处理方面有不同的默认值。完整的配置参考请参阅 Snowflake Connector for Kafka:选择使用 时默认使用的角色和仓库。安装并配置。配置兼容性和偏移迁移设置:v4 连接器在启动时验证这些设置。您必须显式设置以下内容:
snowflake.enable.schematization:选择使用 时默认使用的角色和仓库。设置为 ``true``(新的 v4 行为)或 ``false``(v3 行为)。snowflake.validation:选择使用 时默认使用的角色和仓库。设置为client_side以兼容 v3,或设置为server_side以使用 v4 默认值。snowflake.compatibility.enable.autogenerated.table.name.sanitization:选择使用 时默认使用的角色和仓库。设置为true以兼容 v3。snowflake.compatibility.enable.column.identifier.normalization:选择使用 时默认使用的角色和仓库。设置为true以兼容 v3。snowflake.streaming.classic.offset.migration:选择使用 时默认使用的角色和仓库。如果从 Snowpipe 模式迁移,则设置为skip;如果从 Snowpipe Streaming 模式迁移,则设置为best_effort/strict。
有关更多信息,请参阅 兼容性验证。
替换自定义转换器:如果您使用 Snowflake 提供的转换器,请将其替换为上面列出的社区等效转换器。
遵循引入模式的迁移路径:请参阅上文中的 从 Snowpipe 模式迁移 或 从 Snowpipe Streaming 模式迁移。
使用示例数据进行测试:在测试环境中部署新的连接器配置,并在迁移生产工作负载之前验证数据流是否正常。
逐步采用 v4 默认值:在您的迁移得到验证后,请考虑逐步采用 v4 默认值(服务端验证、区分大小写的标识符),以提高性能并与 Snowflake 规范保持一致。
推荐的迁移配置¶
从 v3 Snowpipe 模式迁移(兼容 v3)¶
以下配置在 v4 连接器上运行时重现了 v3 的行为,适用于从经典 Snowpipe 迁移的用户:
从 v3 Snowpipe Streaming 模式迁移(兼容 v3)¶
以下配置重现了 v3 行为,并从 v3 Snowpipe Streaming (SSv1) 渠道迁移偏移量: