Snowpipe Streaming API REST 端点

以下请求标头适用于 Snowpipe Streaming REST API 的所有端点:

标头

描述

Authorization

auth token

:code:`X-Snowflake-Authorization-Token-Type`(可选)

JWT/OAuth

备注

单个请求负载的最大允许大小为 16 MB。如果您的数据更大,则必须将其拆分为多个请求。

Get Hostname

Get Hostname 返回用于与 Snowpipe Streaming REST API 进行交互的主机名。每个账户都有唯一的主机名。

GET /v2/streaming/hostname

响应:

字段

类型

描述

主机名

字符串

账户的主机名。

交换作用域令牌

Exchange Scoped Token 返回一个安全令牌,该令牌只能用于访问与 Snowpipe Streaming API 相关的服务。这为客户提供了安全保护。

POST /oauth/token

请求:

属性

必填

组件

描述

content_type

标头

"application/x-www-form-urlencoded"

grant_type

payload

"urn:ietf:params:oauth:grant-type:jwt-bearer"

scope

payload

账户的主机名

响应:

字段

类型

描述

token

字符串

作用域令牌。

Open Channel

Open Channel 操作针对管道或表创建或打开新通道。如果该通道已存在,Snowflake 会触发通道的客户端排序器,并返回最后提交的偏移令牌。

PUT /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}

请求:

属性

必填

组件

描述

databaseName

URI

数据库名称,不区分大小写

schemaName

URI

架构名称,不区分大小写

pipeName

URI

管道名称,不区分大小写

channelName

URI

您创建或重新打开的通道名称,不区分大小写

offset_token

payload

用于在打开通道时设置偏移令牌的字符串。

requestId

查询参数

用于通过系统跟踪请求的 UUID

响应:

字段

类型

描述

next_continuation_token

字符串

必须在后续的“追加行”请求中使用的 API 管理的令牌。该令牌链接了一系列调用,确保数据流按顺序连续传输,并保持会话状态以实现仅一次传递。

channel_status

对象

嵌套对象,其中包含有关通道的以下详细信息:

  • database_name(字符串):管道所在的数据库的名称

  • schema_name(字符串):管道所在的架构的名称

  • pipe_name(字符串):正在使用的特定管道的名称。

  • channel_name(字符串):流式传输通道的名称。

  • channel_status_code(字符串):指示通道当前状态的代码;例如,“ACTIVE”。

  • last_committed_offset_token(字符串):代表上次成功提交的偏移的令牌。

  • created_on_ms(长整型):创建通道时的时间戳,以毫秒为单位。

  • rows_inserted(整数):成功插入的总行数。

  • rows_parsed(整数):解析的总行数。

  • rows_error_count(整数):遇到错误的总行数。

  • last_error_offset_upper_bound(字符串):表示上次发生错误的偏移上限的令牌。

  • last_error_message(字符串):上次发生的错误的消息。

  • last_error_timestamp(长整型):上次错误的时间戳,以毫秒为单位。

  • snowflake_avg_processing_latency_ms(整数):Snowflake 的平均处理延迟,以毫秒为单位。

Append Row(s)

Append Rows 操作将一批行插入给定通道。

POST /v2/streaming/data/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}/rows

请求:

属性

必填

组件

描述

databaseName

URI

数据库名称,不区分大小写

schemaName

URI

架构名称,不区分大小写

pipeName

URI

管道,不区分大小写

channelName

URI

通道名称,不区分大小写

continuationToken

查询参数

来自 Snowflake 的延续令牌,封装了客户端和行排序器

offsetToken

查询参数

用于设置每批偏移令牌的字符串。

rows

payload

要以 NDJSON 格式引入的实际数据负载。

requestId

查询参数

用于通过系统跟踪请求的 UUID。

备注

NDJSON 负载中的 JSON 文本必须严格符合 RFC 8259 标准。每个 JSON 文本后面都必须有一个换行符 \n (0x0A)。您也可以在换行符前插入回车符 \r (0x0D)。

响应:

字段

类型

描述

next_continuation_token

字符串

来自 Snowflake 的下一个延续令牌,封装了客户端和行排序器。它应该用于插入下一批。

Drop Channel

Drop Channel 操作会在服务器端删除一个通道及其元数据。

DELETE /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}

请求:

属性

必填

组件

描述

databaseName

URI

数据库名称,不区分大小写

schemaName

URI

架构名称,不区分大小写

pipeOrTableName

URI

管道或表名,不区分大小写

channelName

URI

通道名称,不区分大小写

requestId

查询参数

用于通过系统跟踪请求的 UUID

响应:

此操作返回的负载除了 HTTP 状态代码外,没有具体的成功响应。

Bulk Get Channel Status

Bulk Get Channel Status 操作返回特定客户端排序器的通道状态。

POST /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}:bulk-channel-status

请求:

属性

必填

组件

描述

databaseName

URI

数据库名称,不区分大小写

schemaName

URI

架构名称,不区分大小写

pipeName

URI

管道名称,不区分大小写

channel_names

有效负载

客户想要获取状态的字符串通道名称数组;名称区分大小写。例如 {"channel_names":["channel1", "channel2"]}

响应:

{
  "channel_statuses": {
    "channel1": {
      "channel_status_code": "String",
      "last_committed_offset_token": "String",
      "database_name": "String",
      "schema_name": "String",
      "pipe_name": "String",
      "channel_name": "String",
      "rows_inserted": "int",
      "rows_parsed": "int",
      "rows_errors": "int",
      "last_error_offset_upper_bound": "String",
      "last_error_message": "String",
      "last_error_timestamp": "timestamp_utc",
      "snowflake_avg_processing_latency_ms": "int"
    },
    "channel2": {
      "comment": "same structure as channel1"
    }
    "comment": "potentially other channels"
  }
}
Copy

备注

如果在服务中找不到请求的通道,则响应负载的 channel_statuses 对象中没有该通道的条目。

每个通道的 channel_statuses 字段描述:

字段

类型

描述

channel_status_code

字符串

表示通道的状态。

last_committed_offset_token

字符串

最新提交偏移令牌

database_name

字符串

该通道所属数据库名称。

schema_name

字符串

该通道所属架构名称。

pipe_name

字符串

该通道所属管道名称。

channel_name

字符串

通道名称。

rows_inserted

整数

插入该通道的所有行的计数。

rows_parsed

整数

已解析但不一定要插入到此通道中的所有行的计数。

rows_error

整数

在插入此通道时出现错误因此被拒绝的所有行的计数。

last_error_offset_upper_bound

字符串

插入行集中最后一个与错误相对应的最新偏移令牌的上限。最后出现错误的行集的实际偏移令牌要么是这个,要么在通道引入顺序中严格排在它之前。

last_error_message

字符串

一条人类可读的消息,对应该通道的最新错误代码,且敏感客户数据已隐去。

last_error_timestamp

timestamp_utc

上次发生错误的时间戳。

snowflake_avg_processing_latency_ms

整数

该通道的平均 e2e 处理时间。

错误响应结构

对于所有 APIs 的错误响应,您应该会看到以下 JSON 负载形状:

{
   "error_code": "",
   "message": ""
}
Copy
语言: 中文