Snowpipe Streaming API REST 端点¶
备注
我们建议您优先选择 Snowpipe Streaming SDK 而非 REST API,以获得更优的性能和更顺畅的入门体验。
Snowpipe Streaming REST API 专为轻量级工作负载而设计,提供了一种灵活的方式,无需使用 Snowpipe Streaming SDK 即可与外部应用程序集成。
以下示意图直观展示了数据从客户端到 Snowflake 服务器的流动过程,并详细说明了流程中每个关键的 API 端点。
请求标头¶
以下请求标头适用于 Snowpipe Streaming REST API 的所有端点:
标头 |
描述 |
|---|---|
|
身份验证令牌 |
:code:`X-Snowflake-Authorization-Token-Type`(可选) |
JWT/OAuth |
备注
单个请求负载的最大允许大小为 16 MB。如果您的数据更大,则必须将其拆分为多个请求。
Get Hostname¶
Get Hostname 返回用于与 Snowpipe Streaming REST API 进行交互的主机名。每个账户都有唯一的主机名。
GET /v2/streaming/hostname
响应:
{
"hostname": "string"
}
响应字段说明:
字段 |
类型 |
描述 |
|---|---|---|
主机名 |
字符串 |
账户的主机名。 |
交换作用域令牌¶
Exchange Scoped Token 返回一个安全令牌,该令牌只能用于访问与 Snowpipe Streaming API 相关的服务。这为客户提供了安全保护。
POST /oauth/token
请求:
属性 |
必填 |
组件 |
描述 |
|---|---|---|---|
content_type |
是 |
标头 |
"application/x-www-form-urlencoded" |
grant_type |
是 |
有效负载 |
|
scope |
是 |
有效负载 |
账户的主机名。 |
响应:
{
"token": "string"
}
响应字段说明:
字段 |
类型 |
描述 |
|---|---|---|
令牌 |
字符串 |
作用域令牌。 |
Open Channel¶
Open Channel 操作针对管道或表创建或打开新通道。如果该通道已存在,Snowflake 会触发通道的客户端排序器,并返回最后提交的偏移令牌。
PUT /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}
请求:
属性 |
必填 |
组件 |
描述 |
|---|---|---|---|
databaseName |
是 |
URI |
数据库名称,不区分大小写。 |
schemaName |
是 |
URI |
架构名称,不区分大小写。 |
pipeName |
是 |
URI |
管道名称,不区分大小写。 |
channelName |
是 |
URI |
您创建或重新打开的通道名称,不区分大小写。 |
offset_token |
否 |
有效负载 |
用于在打开通道时设置偏移令牌的字符串。 |
requestId |
否 |
查询参数 |
用于在系统中追踪请求的通用唯一标识符 (UUID)。 |
响应:
{
"next_continuation_token": "string",
"channel_status": {
"database_name": "string",
"schema_name": "string",
"pipe_name": "string",
"channel_name": "string",
"channel_status_code": "string",
"last_committed_offset_token": "string",
"created_on_ms": "long",
"rows_inserted": "int",
"rows_parsed": "int",
"rows_error_count": "int",
"last_error_offset_upper_bound": "string",
"last_error_message": "string",
"last_error_timestamp": "timestamp_utc",
"snowflake_avg_processing_latency_ms": "int"
}
}
响应字段说明:
字段 |
类型 |
描述 |
|---|---|---|
next_continuation_token |
字符串 |
必须在后续的“追加行”请求中使用的 API 管理的令牌。该令牌链接了一系列调用,确保数据流按顺序连续传输,并保持会话状态以实现仅一次传递。 |
channel_status |
对象 |
嵌套对象,其中包含有关通道的以下详细信息:
|
Append Row(s)¶
Append Rows 操作将一批行插入给定通道。
POST /v2/streaming/data/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}/rows
请求:
属性 |
必填 |
组件 |
描述 |
|---|---|---|---|
databaseName |
是 |
URI |
数据库名称,不区分大小写。 |
schemaName |
是 |
URI |
架构名称,不区分大小写。 |
pipeName |
是 |
URI |
管道,不区分大小写。 |
channelName |
是 |
URI |
通道名称,不区分大小写。 |
continuationToken |
是 |
查询参数 |
来自 Snowflake 的延续令牌,封装了客户端和行排序器。 |
offsetToken |
否 |
查询参数 |
用于设置每批偏移令牌的字符串。 |
rows |
是 |
有效负载 |
要以 NDJSON 格式引入的实际数据负载。此属性允许的最大大小为 4 MB。 |
requestId |
否 |
查询参数 |
用于通过系统跟踪请求的 UUID。 |
备注
NDJSON 负载中的 JSON 文本必须严格符合 RFC 8259 标准。每个 JSON 文本后面都必须有一个换行符 \n (0x0A)。您也可以在换行符前插入回车符 \r (0x0D)。
响应:
{
"next_continuation_token": "string"
}
响应字段说明:
字段 |
类型 |
描述 |
|---|---|---|
next_continuation_token |
字符串 |
来自 Snowflake 的下一个延续令牌,封装了客户端和行排序器。它应该用于插入下一批。 |
Drop Channel¶
Drop Channel 操作会在服务器端删除一个通道及其元数据。
DELETE /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}
请求:
属性 |
必填 |
组件 |
描述 |
|---|---|---|---|
databaseName |
是 |
URI |
数据库名称,不区分大小写 |
schemaName |
是 |
URI |
架构名称,不区分大小写 |
pipeOrTableName |
是 |
URI |
管道或表名,不区分大小写 |
channelName |
是 |
URI |
通道名称,不区分大小写 |
requestId |
否 |
查询参数 |
用于通过系统跟踪请求的 UUID |
响应:
此操作返回的负载除了 HTTP 状态代码外,没有具体的成功响应。
Bulk Get Channel Status¶
Bulk Get Channel Status 操作返回特定客户端排序器的通道状态。
POST /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}:bulk-channel-status
请求:
属性 |
必填 |
组件 |
描述 |
|---|---|---|---|
databaseName |
是 |
URI |
数据库名称,不区分大小写 |
schemaName |
是 |
URI |
架构名称,不区分大小写 |
pipeName |
是 |
URI |
管道名称,不区分大小写 |
channel_names |
是 |
有效负载 |
客户想要获取状态的字符串通道名称数组;名称区分大小写。例如 |
响应:
{
"channel_statuses": {
"channel1": {
"channel_status_code": "String",
"last_committed_offset_token": "String",
"database_name": "String",
"schema_name": "String",
"pipe_name": "String",
"channel_name": "String",
"rows_inserted": "int",
"rows_parsed": "int",
"rows_errors": "int",
"last_error_offset_upper_bound": "String",
"last_error_message": "String",
"last_error_timestamp": "timestamp_utc",
"snowflake_avg_processing_latency_ms": "int"
},
"channel2": {
"comment": "same structure as channel1"
}
"comment": "potentially other channels"
}
}
备注
如果在服务中找不到请求的通道,则响应负载的 channel_statuses 对象中没有该通道的条目。
每个通道的 channel_statuses 字段描述:
字段 |
类型 |
描述 |
|---|---|---|
channel_status_code |
字符串 |
表示通道的状态。 |
last_committed_offset_token |
字符串 |
最新提交偏移令牌。 |
database_name |
字符串 |
该通道所属数据库名称。 |
schema_name |
字符串 |
该通道所属架构名称。 |
pipe_name |
字符串 |
该通道所属管道名称。 |
channel_name |
字符串 |
通道名称。 |
rows_inserted |
整数 |
插入该通道的所有行的计数。 |
rows_parsed |
整数 |
已解析但不一定要插入到此通道中的所有行的计数。 |
rows_error |
整数 |
计算所有在插入该通道时出错并因此被拒绝的行数。 |
last_error_offset_upper_bound |
字符串 |
引入错误的上限。错误将位于此已提交的偏移令牌处或之前。 |
last_error_message |
字符串 |
一条人类可读的消息,对应该通道的最新错误代码,且敏感客户数据已隐去。 |
last_error_timestamp |
timestamp_utc |
上次发生错误时的时间戳。 |
snowflake_avg_processing_latency_ms |
整数 |
此通道的平均端到端处理时间。 |
错误响应结构¶
Snowpipe Streaming REST APIs 会为错误响应返回 JSON 有效负载。该结构为自动化错误处理和人工分析提供了可操作的信息。
响应有效负载具有以下结构:
{
"code": "...",
"message": "..."
}
响应字段¶
字段 |
类型 |
描述 |
|---|---|---|
代码 |
字符串 |
稳定的编程错误代码。此值可用于自动错误处理和日志记录。例如,应用程序的逻辑可以检查特定代码以触发预定义的操作。 |
消息 |
字符串 |
描述错误的人类可读消息。此消息可能会发生变化,不应用于自动解析。 |
示例¶
以下示例显示了您可能收到的错误响应:
{
"code": "STALE_CONTINUATION_TOKEN_SEQUENCER",
"message": "Channel sequencer in the continuation token is stale. Please reopen the channel"
}
此示例显示了尝试将延续令牌与过时的通道排序器一起使用的响应。代码为错误提供清晰的机器可读标识符,而消息则为用户提供有用的描述性文本。