Snowpipe Streaming API REST 端点

备注

我们建议您优先选择 Snowpipe Streaming SDK 而非 REST API,以获得更优的性能和更顺畅的入门体验。

Snowpipe Streaming REST API 专为轻量级工作负载而设计,提供了一种灵活的方式,无需使用 Snowpipe Streaming SDK 即可与外部应用程序集成。

以下示意图直观展示了数据从客户端到 Snowflake 服务器的流动过程,并详细说明了流程中每个关键的 API 端点。

Snowpipe Streaming REST API 概述

请求标头

以下请求标头适用于 Snowpipe Streaming REST API 的所有端点:

标头

描述

Authorization

身份验证令牌

:code:`X-Snowflake-Authorization-Token-Type`(可选)

JWT/OAuth

备注

单个请求负载的最大允许大小为 16 MB。如果您的数据更大,则必须将其拆分为多个请求。

Get Hostname

Get Hostname 返回用于与 Snowpipe Streaming REST API 进行交互的主机名。每个账户都有唯一的主机名。

GET /v2/streaming/hostname

响应:

{
  "hostname": "string"
}
Copy

响应字段说明:

字段

类型

描述

主机名

字符串

账户的主机名。

交换作用域令牌

Exchange Scoped Token 返回一个安全令牌,该令牌只能用于访问与 Snowpipe Streaming API 相关的服务。这为客户提供了安全保护。

POST /oauth/token

请求:

属性

必填

组件

描述

content_type

标头

"application/x-www-form-urlencoded"

grant_type

有效负载

"urn:ietf:params:oauth:grant-type:jwt-bearer"

scope

有效负载

账户的主机名。

响应:

{
  "token": "string"
}
Copy

响应字段说明:

字段

类型

描述

令牌

字符串

作用域令牌。

Open Channel

Open Channel 操作针对管道或表创建或打开新通道。如果该通道已存在,Snowflake 会触发通道的客户端排序器,并返回最后提交的偏移令牌。

PUT /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}

请求:

属性

必填

组件

描述

databaseName

URI

数据库名称,不区分大小写。

schemaName

URI

架构名称,不区分大小写。

pipeName

URI

管道名称,不区分大小写。

channelName

URI

您创建或重新打开的通道名称,不区分大小写。

offset_token

有效负载

用于在打开通道时设置偏移令牌的字符串。

requestId

查询参数

用于在系统中追踪请求的通用唯一标识符 (UUID)。

响应:

{
  "next_continuation_token": "string",
  "channel_status": {
    "database_name": "string",
    "schema_name": "string",
    "pipe_name": "string",
    "channel_name": "string",
    "channel_status_code": "string",
    "last_committed_offset_token": "string",
    "created_on_ms": "long",
    "rows_inserted": "int",
    "rows_parsed": "int",
    "rows_error_count": "int",
    "last_error_offset_upper_bound": "string",
    "last_error_message": "string",
    "last_error_timestamp": "timestamp_utc",
    "snowflake_avg_processing_latency_ms": "int"
  }
}
Copy

响应字段说明:

字段

类型

描述

next_continuation_token

字符串

必须在后续的“追加行”请求中使用的 API 管理的令牌。该令牌链接了一系列调用,确保数据流按顺序连续传输,并保持会话状态以实现仅一次传递。

channel_status

对象

嵌套对象,其中包含有关通道的以下详细信息:

  • database_name(字符串):管道所在的数据库的名称。

  • schema_name(字符串):管道所在的架构的名称。

  • pipe_name(字符串):正在使用的特定管道的名称。

  • channel_name(字符串):流式传输通道的名称。

  • channel_status_code(字符串):指示通道当前状态的代码;例如,“ACTIVE”。

  • last_committed_offset_token(字符串):代表上次成功提交的偏移的令牌。

  • created_on_ms(长整型):创建通道时的时间戳,以毫秒为单位。

  • rows_inserted(整数):成功插入的总行数。

  • rows_parsed(整数):解析的总行数。

  • rows_error_count(整数):遇到错误的总行数。

  • last_error_offset_upper_bound(字符串):表示上次发生错误的偏移上限的令牌。

  • last_error_message(字符串):上次发生的错误的消息。

  • last_error_timestamp(长整型):上次错误的时间戳,以毫秒为单位。

  • snowflake_avg_processing_latency_ms(整数):Snowflake 的平均处理延迟,以毫秒为单位。

Append Row(s)

Append Rows 操作将一批行插入给定通道。

POST /v2/streaming/data/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}/rows

请求:

属性

必填

组件

描述

databaseName

URI

数据库名称,不区分大小写。

schemaName

URI

架构名称,不区分大小写。

pipeName

URI

管道,不区分大小写。

channelName

URI

通道名称,不区分大小写。

continuationToken

查询参数

来自 Snowflake 的延续令牌,封装了客户端和行排序器。

offsetToken

查询参数

用于设置每批偏移令牌的字符串。

rows

有效负载

要以 NDJSON 格式引入的实际数据负载。此属性允许的最大大小为 4 MB。

requestId

查询参数

用于通过系统跟踪请求的 UUID。

备注

NDJSON 负载中的 JSON 文本必须严格符合 RFC 8259 标准。每个 JSON 文本后面都必须有一个换行符 \n (0x0A)。您也可以在换行符前插入回车符 \r (0x0D)。

响应:

{
  "next_continuation_token": "string"
}
Copy

响应字段说明:

字段

类型

描述

next_continuation_token

字符串

来自 Snowflake 的下一个延续令牌,封装了客户端和行排序器。它应该用于插入下一批。

Drop Channel

Drop Channel 操作会在服务器端删除一个通道及其元数据。

DELETE /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}

请求:

属性

必填

组件

描述

databaseName

URI

数据库名称,不区分大小写

schemaName

URI

架构名称,不区分大小写

pipeOrTableName

URI

管道或表名,不区分大小写

channelName

URI

通道名称,不区分大小写

requestId

查询参数

用于通过系统跟踪请求的 UUID

响应:

此操作返回的负载除了 HTTP 状态代码外,没有具体的成功响应。

Bulk Get Channel Status

Bulk Get Channel Status 操作返回特定客户端排序器的通道状态。

POST /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}:bulk-channel-status

请求:

属性

必填

组件

描述

databaseName

URI

数据库名称,不区分大小写

schemaName

URI

架构名称,不区分大小写

pipeName

URI

管道名称,不区分大小写

channel_names

有效负载

客户想要获取状态的字符串通道名称数组;名称区分大小写。例如 {"channel_names":["channel1", "channel2"]}

响应:

{
  "channel_statuses": {
    "channel1": {
      "channel_status_code": "String",
      "last_committed_offset_token": "String",
      "database_name": "String",
      "schema_name": "String",
      "pipe_name": "String",
      "channel_name": "String",
      "rows_inserted": "int",
      "rows_parsed": "int",
      "rows_errors": "int",
      "last_error_offset_upper_bound": "String",
      "last_error_message": "String",
      "last_error_timestamp": "timestamp_utc",
      "snowflake_avg_processing_latency_ms": "int"
    },
    "channel2": {
      "comment": "same structure as channel1"
    }
    "comment": "potentially other channels"
  }
}
Copy

备注

如果在服务中找不到请求的通道,则响应负载的 channel_statuses 对象中没有该通道的条目。

每个通道的 channel_statuses 字段描述:

字段

类型

描述

channel_status_code

字符串

表示通道的状态。

last_committed_offset_token

字符串

最新提交偏移令牌。

database_name

字符串

该通道所属数据库名称。

schema_name

字符串

该通道所属架构名称。

pipe_name

字符串

该通道所属管道名称。

channel_name

字符串

通道名称。

rows_inserted

整数

插入该通道的所有行的计数。

rows_parsed

整数

已解析但不一定要插入到此通道中的所有行的计数。

rows_error

整数

计算所有在插入该通道时出错并因此被拒绝的行数。

last_error_offset_upper_bound

字符串

引入错误的上限。错误将位于此已提交的偏移令牌处或之前。

last_error_message

字符串

一条人类可读的消息,对应该通道的最新错误代码,且敏感客户数据已隐去。

last_error_timestamp

timestamp_utc

上次发生错误时的时间戳。

snowflake_avg_processing_latency_ms

整数

此通道的平均端到端处理时间。

错误响应结构

Snowpipe Streaming REST APIs 会为错误响应返回 JSON 有效负载。该结构为自动化错误处理和人工分析提供了可操作的信息。

响应有效负载具有以下结构:

{
  "code": "...",
  "message": "..."
}
Copy

响应字段

字段

类型

描述

代码

字符串

稳定的编程错误代码。此值可用于自动错误处理和日志记录。例如,应用程序的逻辑可以检查特定代码以触发预定义的操作。

消息

字符串

描述错误的人类可读消息。此消息可能会发生变化,不应用于自动解析。

示例

以下示例显示了您可能收到的错误响应:

{
  "code": "STALE_CONTINUATION_TOKEN_SEQUENCER",
  "message": "Channel sequencer in the continuation token is stale. Please reopen the channel"
}
Copy

此示例显示了尝试将延续令牌与过时的通道排序器一起使用的响应。代码为错误提供清晰的机器可读标识符,而消息则为用户提供有用的描述性文本。

语言: 中文