适用于采用高性能架构的 Snowpark Streaming 的最佳实践¶
本指南概述了使用具有高性能架构的 Snowpipe Streaming 设计和实现稳健数据引入管道的关键最佳实践。遵循这些最佳实践,可以确保管道具有持久性、可靠性,并能够高效处理错误。
有策略地管理通道¶
为了提高性能和长期稳定性,应采用以下通道管理策略:
使用长期存在的通道:为减少开销,在引入任务期间只需打开一次通道并保持其活跃状态。避免重复开关通道。
使用确定性通道名称:采用一致、可预测的命名规范(例如
source-env-region-client-id),简化故障排除并促进自动恢复。通过多个通道横向扩展:要提高吞吐量,请打开多个通道。这些通道可以指向单个目标管道或多个管道,具体取决于服务限制和吞吐量要求。
监控通道状态:定期使用
getChannelStatus方法来监控引入通道运行状况。跟踪
last_committed_offset_token以确认数据是否成功引入,以及管道是否正常运行。监控
row_error_count以便及早发现错误记录或其他引入问题。
持续验证架构¶
确保传入的数据符合预期表架构,以防引入失败并保持数据完整性:
客户端验证:在客户端实现架构验证,可提供即时反馈并减少服务器端错误。尽管完整的逐行验证可提供最大的安全性,但性能更好的方法可能涉及选择性验证;例如,在批次边界或通过采样行验证。
服务器端验证:高性能架构可以将架构验证卸载到服务器。如果在将数据引入到目标管道和表时发生架构不匹配,错误及其数量会通过
getChannelStatus报告。
添加客户端元数据列¶
要实现强大的错误检测和恢复,必须将引入元数据作为行有效负载的一部分。这需要提前规划数据结构和 PIPE 定义。
在引入之前,将以下列添加到行有效负载中:
CHANNEL_ID(例如,紧凑型 INTEGER。)STREAM_OFFSET(BIGINT,每个通道单调递增,例如 Kafka 分区偏移。)
这些列共同唯一标识每个通道的记录,使您能够跟踪数据的来源。
可选:如果多个管道将数据引入到同一个目标表中,请添加 PIPE_ID 列。这样可以轻松追踪行对应的引入管道。您可以在单独的查找表中存储描述性管道名称,并将其映射为紧凑整数,以降低存储成本。
使用元数据偏移检测并恢复错误¶
将通道监控与元数据列结合使用,以便及时发现并修复问题:
监控状态:定期检查
getChannelStatus。row_error_count增加通常表明可能存在问题。检测缺失记录:如果检测到错误,请使用 SQL 查询,通过检查
STREAM_OFFSET的序列间隙来识别缺失或顺序异常的记录。
SELECT
PIPE_ID,
CHANNEL_ID,
STREAM_OFFSET,
LAG(STREAM_OFFSET) OVER (
PARTITION BY PIPE_ID, CHANNEL_ID
ORDER BY STREAM_OFFSET
) AS previous_offset,
(LAG(STREAM_OFFSET) OVER (
PARTITION BY PIPE_ID, CHANNEL_ID
ORDER BY STREAM_OFFSET
) + 1) AS expected_next
FROM my_table
QUALIFY STREAM_OFFSET != previous_offset + 1;
通过 MATCH_BY_COLUMN_NAME 优化引入性能并降低成本¶
配置管道,以映射源数据中的必要列,而不是将所有数据引入到单个 VARIANT 列中。为此,请使用 MATCH_BY_COLUMN_NAME = CASE_SENSITIVE 或者在管道定义中应用转换。这种最佳实践不仅可以减少引入成本,还可以提高流式传输数据管道的整体性能。
这种最佳实践具有以下重要优势:
使用
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE时,您只需为引入目标表的数据值付费。相比之下,将数据引入到单个 VARIANT 列会按所有 JSON 字节(包括键和值)计费。对于具有冗长或大量 JSON 键的数据,这可能会导致引入成本显著且不必要地增加。Snowflake 的处理引擎在计算上更加高效。与先将整个 JSON 对象解析到 VARIANT,再提取所需列的方法不同,该方法直接提取必要的值。