通道和精确一次传递

本主题介绍 Snowpipe Streaming 如何通过具有排序保证的通道引入数据,以及偏移令牌如何实现精确一次传递。

流式引入基础知识

Snowpipe Streaming 围绕几个核心流引入原则构建:

  • 持续引入:数据在生成时流入 Snowflake,而不是分批收集并定期加载。应用程序通过长期连接连续提交行,Snowflake 会自动提交数据。

  • 精确一次传递:即使在客户端出现故障或网络中断的情况下,每条记录也只会引入一次。Snowpipe Streaming 通过偏移令牌跟踪实现这一点,该跟踪允许客户端从上次提交的位置恢复,而无需复制数据。

  • 有序引入:行按其在通道中提交的顺序提交。这保留了源系统中的事件序列,这对于时间序列数据、CDC 管道和审计跟踪据至关重要。

  • 低延迟:数据在引入后最短 5 秒内即可用于查询。这样可以实现近乎实时的分析,而不会出现传统批处理加载的延迟。

  • 无服务器:Snowflake 管理所有用于引入的计算资源。资源根据吞吐量自动扩展,无需客户端配置或管理基础设施。

数据如何流动

客户端应用程序使用 Snowpipe SDK(Java 或 Python)或 REST API 连接到 Snowflake。客户端针对管道打开一个或多个通道,然后通过这些通道提交行。Snowflake 缓冲数据并将其提交到目标表,使其在几秒钟内可供查询。

端到端流:

  1. 客户端应用程序 使用 SDK (appendRows) 或 REST API(Append Rows 端点)提交行。

  2. 通道 按顺序接收行,并将每个批次与偏移令牌相关联,以进行进度跟踪。

  3. 管道 在服务器端处理数据:验证架构,应用任何配置的转换或预聚类,然后提交到目标表。

  4. 目标表 接收提交的数据,这些数据将立即变为可查询。

Snowpipe Streaming 客户端、通道和表映射

通道

通道是与 Snowflake 的逻辑、指定流处理连接,用于将数据加载到表中。通道提供两个保证:

  • 有序引入:行的排序及其相应的偏移令牌保留在通道内。

  • 精确一次传递:偏移令牌使客户端能够跟踪提交的进度,并在恢复时从上次提交的位置重放。

排序会在通道内保留,但不会在指向同一表的通道之间保留。

通道是针对管道打开的。客户端 SDK 可以为多个管道打开多个通道;但 SDK 无法跨账户打开通道。在客户端主动插入数据时,通道应长期存在,并且由于保留了偏移令牌信息,应该在客户端进程重新启动时重复使用。

当不再需要通道和相关的偏移元数据时,您可以使用 DropChannelRequest API 永久删除通道。您可以通过两种方式删除通道:

  • 关闭时删除通道。在删除通道之前,通道内的数据会自动刷新。

  • 盲目删除通道。我们不建议使用这种方法,因为它会丢弃任何待处理的数据。

您可以运行 SHOW CHANNELS 命令,列出您有访问权限的通道。有关更多信息,请参阅 SHOW CHANNELS

备注

不活动的通道及其偏移令牌会在闲置 30 天后自动删除。

偏移令牌和精确一次传递

小技巧

精确一次在 Snowpipe Streaming 中的工作原理:应用程序提交带有偏移令牌(例如 Kafka 分区偏移)的行。提交数据时,Snowflake 会保留令牌。恢复时,应用程序会调用 getLatestCommittedOffsetToken 找到它停止的位置,然后从该位置重放。不会引入重复数据,也不会丢失任何数据。

偏移令牌 是客户端在行提交请求中包含的字符串,用于跟踪每个通道的引入进度。具体使用的方法是 SDK 中的 appendRowappendRows,以及 REST API 中的 Append Rows 端点。

在通道创建时,此令牌将初始化为 NULL,并会在具有所提供的偏移令牌的行提交到 Snowflake 时更新。客户端可以定期调用 getLatestCommittedOffsetToken 获取通道的最新提交的偏移令牌,并使用它来推断引入进度。

在客户端重新打开一个通道时,将会返回最新的持久偏移令牌。客户端可使用令牌重置其在数据源中的位置,以避免发送两次相同的数据。在通道重新打开事件发生时,Snowflake 中缓存的任何未提交数据都将被丢弃,以避免提交这些数据。

您可以使用最新提交的偏移令牌来执行以下操作:

  • 跟踪引入进度

  • 将特定偏移与最新提交的偏移令牌进行比较,确认特定偏移是否已提交

  • 推进源偏移,并清除已提交的数据

  • 启用重复数据删除并确保数据精确一次传递

示例:Kafka Connector 崩溃恢复

Kafka Connector 从以下主题读取偏移令牌,例如 <partition>:<offset>。考虑以下情景:

  1. Kafka Connector 联机并在 Kafka 主题 T 中打开了对应于 Partition 1 的通道,通道名称为 T:P1

  2. 该连接器开始从 Kafka 分区读取记录。

  3. 该连接器调用 API,发出一条 appendRows 方法请求,使用与记录关联的偏移作为偏移令牌。

    例如,偏移令牌可以是 10,代表 Kafka 分区中的第十条记录。

  4. 连接器定期发出 getLatestCommittedOffsetToken 方法请求,确定引入进度。

如果 Kafka Connector 崩溃,以下过程将恢复从正确偏移位置读取记录:

  1. Kafka Connector 重新联机,并使用与之前相同的名称重新打开通道。

  2. 连接器调用 getLatestCommittedOffsetToken 获取分区的最新提交偏移。

    例如,假设最新的持久偏移令牌是 20

  3. 连接器使用 Kafka 读取 APIs,重置与偏移加 1(本例中为 21)相对应的游标。

  4. 连接器恢复读取记录。读取游标在重新定位成功后不会检索到重复数据。

示例:带崩溃恢复的日志文件引入

应用程序从目录读取日志,并使用 Snowpipe Streaming SDK 将这些日志导出到 Snowflake。应用程序执行以下操作:

  1. 列出日志目录中的文件。

    假设日志记录框架生成可按字典顺序排序的日志文件,并且在此排序中,新的日志文件位于末尾。

  2. 逐行读取日志文件并调用 API,发出 appendRows 方法请求,在请求中使用与日志文件名、行数或字节位置相对应的偏移令牌。

    例如,偏移令牌可能是 messages_1.log:20,其中的 messages_1.log 是日志文件的名称,20 是行号。

如果应用程序崩溃或需要重新启动,则会调用 getLatestCommittedOffsetToken 检索与上次导出的日志文件和行相对应的偏移令牌。根据之前的例子,这可能是 messages_1.log:20。随后应用程序会打开 messages_1.log 并寻找 21 行,以防止同一日志行被引入两次。

备注

偏移令牌信息可能会丢失。偏移令牌链接到一个通道对象,如果 30 天内未使用该通道执行新的引入,该通道将自动清除。为了防止偏移令牌丢失,请考虑维护单独的偏移,并按需重置通道的偏移令牌。

offsetTokencontinuationToken 的角色

offsetTokencontinuationToken 都用于确保数据精确传输一次,但它们的用途不同,由不同的子系统管理。主要区别在于谁控制令牌的价值及其使用范围。

  • :code:`continuationToken`(仅供直接使用 REST API 的用户使用):

    该令牌由 Snowflake 管理,对于维持单个连续流式传输会话的状态至关重要。当客户端使用 Append Rows API 发送数据时,Snowflake 返回 continuationToken。客户端必须在下一个 AppendRows 请求中传回此令牌,以确保 Snowflake 以正确的顺序连续接收数据。Snowflake 使用令牌来检测和防止 SDK 重试时出现重复数据或缺少数据。

  • offsetToken:选择使用 时默认使用的角色和仓库。

    该令牌是用户定义的标识符,允许从外部源确切传送一次。Snowflake 会存储此值,但不会将其用于自己的内部操作或防止重新引入。外部系统(如 Kafka Connector)有责任从 Snowflake 读取 offsetToken 并使用它来跟踪自己的引入进度,避免在需要重放外部流时发送重复的数据。