Snowpipe Streaming 迁移指南¶
本指南介绍如何从经典 Snowpipe Java SDK 迁移到高性能 Snowpipe Streaming SDK. 此处讨论的架构变更和 API 更新也适用于到 Python SDK 的迁移,因为高性能架构适用于两种语言。尽管本文档中的代码示例使用的是 Java,但核心迁移原则在不同语言之间保持一致。
关键架构变更¶
下表总结了高性能 Snowpipe Streaming SDK 中最重要的架构变更。有关 SDKs 的详细比较信息,请参阅 经典 SDK 和高性能 SDK 的比较。
领域 |
经典 (snowflake-ingest-java) |
高性能 (snowpipe-streaming SDK) |
|---|---|---|
入口点 |
数据直接引入到表中。 |
数据通过 PIPE 对象引入,支持转换和架构强制执行。 |
SDK / Core |
仅限 Java SDK。 |
多语言(Java 和 Python)SDK,具有共享的 Rust 核心。 |
API 名称 |
|
|
错误处理 |
执行客户端验证。 |
提供了具有更丰富错误反馈的服务器端验证。 |
背压处理 |
使线程进入休眠状态,导致阻塞或无响应。 |
返回错误,允许调用方实施退避/重试策略。 |
客户端到表的映射 |
单个客户端对象可以打开任意表的通道。 |
现在,单个客户端对象仅绑定到一个管道对象。 |
计费 |
基于计算和客户端数量。 |
固定,按照引入的 GB 数。 |
架构/转换 |
在客户端管理。 |
通过 PIPE 定义在服务器端管理。 |
迁移过程¶
要将应用程序迁移到高性能 SDK,完成以下高级步骤:
对于每个目标表,创建一个 PIPE。
CREATE PIPE my_pipe AS COPY INTO my_table FROM TABLE (DATA_SOURCE(TYPE => 'STREAMING')) MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE [CLUSTER_AT_INGEST_TIME = TRUE];
停止从所有经典客户端引入。
对于经典客户端中的每个通道,确认最后提交的偏移。要检索这些偏移,请使用经典 SDK 中的
getLatestCommittedOffsetTokens()方法。验证这些偏移是否与客户端记录一致。更新应用程序代码。
将项目依赖项切换到高性能 SDK(Java 或 Python)。
更新您的 API 调用,详细步骤见以下 API 和配置变更 章节。
使用 Snowflake 最后提交的偏移每个表/PIPE 初始化一个客户端。
新客户端配置完毕并稳定后,恢复引入。
API 和配置变更¶
迁移期间必须对您的 API 调用和配置设置进行以下变更:
客户端初始化¶
经典:
builder(name)高性能:
builder(name, db, schema, pipeName)
通道¶
经典:
openChannel(OpenChannelRequest)高性能:
openChannel(channelName, offsetToken)同时返回通道和状态
引入方法¶
经典:
insertRow/insertRows(...)高性能:
appendRow/appendRows(...)
偏移跟踪¶
经典 SDK 的
getLatestCommittedOffsetTokens(channels)方法提供的可见性有限,并且缺乏错误上下文。高性能 SDK 仍然支持
getLatestCommittedOffsetTokens(...),但为了可靠进行监控,我们建议您使用getChannelStatuses(...)。此方法执行以下任务:确认偏移按预期推进。
返回每个通道的错误计数和详细错误信息。
支持对数据管道进行主动监控和故障排除。