经典 SDK 和高性能 SDK 的比较

本节总结了经典版和高性能版 SDKs 之间的主要区别。

客户端和通道管理

  • OpenClient:高性能 SDK 要求您指定 DBSCHEMAPIPE。在经典 SDK 中,您只需要指定一个客户端 NAME

  • OpenChannel:高性能 SDK 仅需通道名称即可简化此操作。经典 SDK 要求您指定 DBSCHEMATABLEERROR_OPTION。新的 SDK 还会返回一个包含通道实体和状态的 OpenChannelResult,从而无需再进行单独的 RPC 调用来获取最后提交的偏移令牌。

  • 支持 offsetToken:新的 openChannel 方法现在有一个可选的 offsetToken 参数,允许您在特定位置打开通道。openChannel(String channelName, (optional) String offsetToken)

数据引入

  • 已重命名 InsertRows:在高性能 SDK 中,InsertRows 方法现更名为 AppendRows

  • 已移除 AppendResultappendRowappendRows 方法不再返回 AppendResult。他们的签名已变更为 void appendRow(Map<String, Object> row, String offsetToken)void appendRows(Iterable<Map<String, Object>> row, String startOffsetToken, String endOffsetToken)

新的异步和实用方法

  • GetChannelStatus:这是 Channel 对象上可用的新 API。

  • waitForFlush:新 waitForFlush 方法已添加到客户端和通道对象中。

    • 客户端 void close(boolean waitForFlush, Duration timeoutDuration)

    • 通道和客户端:void waitForFlush((optional) Duration timeoutDuration)

  • waitForCommit:新方法 CompletableFuture<Boolean> waitForCommit(Predicate<String> tokenChecker, Duration timeoutDuration) 可等待提交确认。

  • initiateFlush:这个新方法 void initiateFlush() 可对通道或客户端异步执行刷新操作,适用于无需等待超时或达到容量限制即可刷新数据的场景。

其他变更

  • GetLatestCommittedOffsetTokens:此 API 已得到改进。在高性能 SDK 中,它现在可以提取客户端未打开的通道的偏移令牌,并允许部分失败。

  • 已移除 isValid:已从高性能 SDK 中移除 isValid 方法。

以下表格展示了从经典 SDK 到高性能 SDK 的 API 变更内容:

SnowflakeStreamingIngestClientFactory 和 SnowflakeStreamingIngestClientFactory.Builder

经典

高性能

备注

builder(String name)

builder(String clientName, String dbName, String schemaName, String pipeName)

经典版本中的 name = 高性能版本中的 clientName

不适用

setExecutorService(ExecutorService executorService)

一种新方法。允许您指定 SDK 后台任务将使用的 ExecutorService

SnowflakeStreamingIngestClient

经典

高性能

备注

String getName()

String getClientName()

仅限 API 名称更改;返回相同的信息。

不适用

String getDBName()

新 API。

不适用

String getPipeName()

新 API。

不适用

String getSchemaName()

新 API。

SnowflakeStreamingIngestChannel openChannel(OpenChannelRequest request)

OpenChannelResult openChannel(String channelName, (optional) String offsetToken)

不同的请求实参和返回值。

Map<String,String> getLatestCommittedOffsetTokens (List<SnowflakeStreamingIngestChannel> channels)

Map<String, String> getLatestCommittedOffsetTokens (List<String> channelNames)

不同的请求实参。高性能 SDK 使 API 能够获取由其他客户端打开、且可能不属于该客户端的通道状态。

不适用

ChannelStatusBatch getChannelStatus(List<String> channelNames)

新 API。

Void dropChannel(DropChannelRequest request)

Void dropChannel(String channelName)

不同的请求实参。

Void setRefreshToken(String refreshToken)

不适用

已移除。

不适用

CompletableFuture<Void> close(boolean waitForFlush, Duration timeoutDuration)

新增的客户端 close 方法,可对关闭流程进行更精细的控制。waitForFlush:布尔参数,用于指示客户端是否应等待所有通道刷新后再关闭。timeoutDurationDuration 指定客户端在强制关闭之前应等待刷新完成的时间。

不适用

CompletableFuture<Void> waitForFlush((optional) Duration timeoutDuration)

等待刷新完成的新方法。timeoutDuration:指定客户端在超时前应等待的时间。

不适用

void initiateFlush()

新增的客户端方法,可异步触发刷新并立即返回。

SnowflakeStreamingIngestChannel

经典

高性能

备注

getLatestCommittedOffsetToken

getLatestCommittedOffsetToken

此 API 已得到改进。在高性能 SDK 中,它现在可以提取客户端未打开的通道的偏移令牌,并允许部分失败。

isValid

不适用

已移除。

不适用

String getDBName()

新 API。

不适用

String getSchemaName()

新 API。

不适用

String getPipeName()

新 API。

不适用

String getFullyQualifiedPipeName()

新 API。

InsertValidationResponse insertRow(Map<String, Object> row, String offsetToken)

void appendRow(Map<String, Object> row, @Nullable String offsetToken)

API 名称已更改。响应类型已更改,因为客户端上不再进行验证。

InsertValidationResponse insertRow(Iterable<Map<String, Object>> row, @Nullable String startOffsetToken, @Nullable String endOffsetToken)

void appendRows(Iterable<Map<String, Object>> row, String startOffsetToken, String endOffsetToken)

API 名称已更改。响应类型已更改,因为客户端上不再进行验证。

InsertValidationResponse insertRow(Iterable<Map<String, Object>> row, String offsetToken)

不适用

已移除。

String getTableName()

不适用

已移除。

String getFullyQualifiedTableName()

不适用

已移除。

不适用

String getPipeName()

新 API。

不适用

String getFullyQualifiedPipeName()

新 API。

String getName()

String getChannelName()

API 名称已更改。

String getFullyQualifiedName()

String getFullyQualifiedChannelName()

API 名称已更改。

Map<String, ColumnProperties> getTableSchema()

不适用

已移除。

不适用

ChannelStatus getChannelStatus()

新 API。

CompletableFuture<Void> close()

Void close()

返回类型已更改,但行为相同。

CompletableFuture<Void> close(boolean drop)

Void close(boolean waitForFlush, Duration timeoutDuration)

API 名称已更改,但行为相同。

Boolean isValid()

不适用

已移除。

不适用

CompletableFuture<Void> waitForFlush((optional)Duration timeoutDuration)

等待刷新完成的新方法。timeoutDuration:指定通道在超时前应等待的时间。

不适用

void waitForCommit(Predicate<String> tokenChecker, Duration timeoutDuration)

新增方法,可异步触发并等待此特定通道中所有缓冲数据刷新至 Snowflake 服务器。此方法可确保所有待处理数据成功写入且刷新操作完成后,再继续执行后续流程。

不适用

void initiateFlush()

新增的通道方法,可异步触发刷新。

语言: 中文