适用于采用高性能架构的 Snowpark Streaming 的最佳实践

本指南概述了使用具有高性能架构的 Snowpipe Streaming 设计和实现稳健数据引入管道的关键最佳实践。遵循这些最佳实践,可以确保管道具有持久性、可靠性,并能够高效处理错误。

有策略地管理通道

为了提高性能和长期稳定性,应采用以下通道管理策略:

  • 使用长期存在的通道:为减少开销,在引入任务期间只需打开一次通道并保持其活跃状态。避免重复开关通道。

  • 使用确定性通道名称:采用一致、可预测的命名规范(例如 source-env-region-client-id),简化故障排除并促进自动恢复。

  • 通过多个通道横向扩展:要提高吞吐量,请打开多个通道。这些通道可以指向单个目标管道或多个管道,具体取决于服务限制和吞吐量要求。

  • 监控通道状态:定期使用 getChannelStatus 方法来监控引入通道运行状况。

    • 跟踪 last_committed_offset_token 以确认数据是否成功引入,以及管道是否正常运行。

    • 监控 row_error_count 以便及早发现错误记录或其他引入问题。

持续验证架构

确保传入的数据符合预期表架构,以防引入失败并保持数据完整性:

  • 客户端验证:在客户端实现架构验证,可提供即时反馈并减少服务器端错误。尽管完整的逐行验证可提供最大的安全性,但性能更好的方法可能涉及选择性验证;例如,在批次边界或通过采样行验证。

  • 服务器端验证:高性能架构可以将架构验证卸载到服务器。如果在将数据引入到目标管道和表时发生架构不匹配,错误及其数量会通过 getChannelStatus 报告。

添加客户端元数据列

要实现强大的错误检测和恢复,必须将引入元数据作为行有效负载的一部分。这需要提前规划数据结构和 PIPE 定义。

在引入之前,将以下列添加到行有效负载中:

  • CHANNEL_ID (例如,紧凑型 INTEGER。)

  • STREAM_OFFSETBIGINT,每个通道单调递增,例如 Kafka 分区偏移。)

这些列共同唯一标识每个通道的记录,使您能够跟踪数据的来源。

可选:如果多个管道将数据引入到同一个目标表中,请添加 PIPE_ID 列。这样可以轻松追踪行对应的引入管道。您可以在单独的查找表中存储描述性管道名称,并将其映射为紧凑整数,以降低存储成本。

使用元数据偏移检测并恢复错误

将通道监控与元数据列结合使用,以便及时发现并修复问题:

  • 监控状态:定期检查 getChannelStatusrow_error_count 增加通常表明可能存在问题。

  • 检测缺失记录:如果检测到错误,请使用 SQL 查询,通过检查 STREAM_OFFSET 的序列间隙来识别缺失或顺序异常的记录。

SELECT
  PIPE_ID,
  CHANNEL_ID,
  STREAM_OFFSET,
  LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) AS previous_offset,
  (LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) + 1) AS expected_next
FROM my_table
QUALIFY STREAM_OFFSET != previous_offset + 1;
Copy

通过 MATCH_BY_COLUMN_NAME 优化引入性能并降低成本

配置管道,以映射源数据中的必要列,而不是将所有数据引入到单个 VARIANT 列中。为此,请使用 MATCH_BY_COLUMN_NAME = CASE_SENSITIVE 或者在管道定义中应用转换。这种最佳实践不仅可以减少引入成本,还可以提高流式传输数据管道的整体性能。

这种最佳实践具有以下重要优势:

  • 使用 MATCH_BY_COLUMN_NAME = CASE_SENSITIVE 时,您只需为引入目标表的数据值付费。相比之下,将数据引入到单个 VARIANT 列会按所有 JSON 字节(包括键和值)计费。对于具有冗长或大量 JSON 键的数据,这可能会导致引入成本显著且不必要地增加。

  • Snowflake 的处理引擎在计算上更加高效。与先将整个 JSON 对象解析到 VARIANT,再提取所需列的方法不同,该方法直接提取必要的值。

语言: 中文