Snowpipe REST API

您可以通过调用 REST 端点与管道进行交互。本主题介绍 Snowpipe REST API,它用于定义要引入的文件列表,提取加载历史记录报告。

Snowflake 还提供 Java 和 Python APIs,用于简化使用 Snowpipe REST API 的流程。

本主题内容:

数据文件引入

Snowpipe API 提供 REST 端点,用于定义要引入的文件列表。

端点:insertFiles

通知 Snowflake 要引入到表中的文件。来自此端点的成功响应意味着,Snowflake 已记录要添加到表中的文件列表。这并不一定意味着已引入文件。有关更多信息,请参阅下面的响应代码。

在大多数情况下,Snowflake 会在几分钟内将新数据插入到目标表中。

method

POST

post url

https://{account}.snowflakecomputing.cn/v1/data/pipes/{pipeName}/insertFiles?requestId={requestId}

post body

具有以下属性的 JSON 对象:

属性

必填

描述

account

Snowflake 账户的 账户标识符

pipeName

区分大小写、完全限定的管道名称。例如 myDatabase.mySchema.myPipe

requestId

用于通过系统跟踪请求的字符串。建议为每个请求提供一个随机字符串,例如 UUID。

content-type

text/plain

application/json

header fields

接受:text/plainapplication/json

授权:BEARER <jwt_token>

  • 对于 text/plain,内容是路径和文件名的列表,每行一个。不允许使用 size 参数。

  • 对于 application/json,内容是路径、文件名和文件大小的列表(可选,但建议使用以获得更好的性能)。有效负载示例如下:

    {
      "files":[
        {
          "path":"filePath/file1.csv",
          "size":100
        },
        {
          "path":"filePath/file2.csv",
          "size":100
        }
      ]
    }
    
    Copy

请注意,如果您遵循我们推荐的最佳实践,使用逻辑、粒度路径对暂存区中的数据进行分区,则有效负载中的路径值将包括暂存文件的完整路径。

备注

  • 帖子最多可包含 5000 个文件。

  • 序列化为 UTF-8 时,给定的每个文件路径长度必须 <= 1024 字节。

response body

响应代码:

  • 200 – 成功。文件已添加到要引入的文件队列中。

  • 400 – 故障。由于格式无效或超出限制而导致请求无效。

  • 404 – 故障。pipeName 无法识别。

    如果调用端点时使用的角色没有足够的权限,也可能返回此错误代码。有关更多信息,请参阅 授予访问权限

  • 429 – 故障。超出请求速率限制。

  • 500 – 故障。发生内部错误。

响应有效负载:

API 请求成功(即代码 200)后,响应有效负载包含 JSON 格式的 requestIdstatus 元素。如果发生错误,响应有效负载可能包含有关错误的详细信息。

如果管道定义中的 COPY INTO <table> 语句包含 PATTERN 复制选项,则 unmatchedPatternFiles 属性将在标头中列出提交的与正则表达式 匹配且因此跳过的所有文件。

加载历史记录报告

Snowpipe API 提供用于提取加载报告的 REST 端点。

端点:insertReport

检索通过 insertFiles 提交的文件的报告,这些文件的内容最近已引入表中。请注意,对于大型文件,这可能只是文件的一部分。

请注意此端点的以下限制:

  • 可保留 10,000 个最新事件。

  • 事件最多保留 10 分钟。

当通过 insertFiles 提交的文件中的数据已提交到表并可用于查询时,将发生事件。insertReport 端点可看作是 UNIX 命令 tail。通过重复调用此命令,可以查看管道上事件随时间推移的完整历史记录。请注意,必须经常调用该命令,以免错过事件。频率取决于文件发送到 insertFiles 的速率。

method

GET

get url

https://<account_identifier>.snowflakecomputing.cn/v1/data/pipes/<pipeName>/insertReport?requestId=<requestId>&beginMark=<beginMark>

header fields

接受:text/plain 或 application/json

授权:BEARER <jwt_token>

get body

具有以下属性的 JSON 对象:

属性

必填

描述

account_identifier

Snowflake 账户的唯一标识符。

账户标识符的首选格式如下:

organization_name-account_name

Snowflake 组织和账户的名称。有关详细信息,请参阅 格式 1(首选):您所在组织的账户名称

如果需要,还可以指定 账户定位器,以及托管该账户的 区域云平台。有关详细信息,请参阅 格式 2:区域中的账户定位器

pipeName

区分大小写、完全限定的管道名称。例如 myDatabase.mySchema.myPipe

requestId

用于通过系统跟踪请求的字符串。建议为每个请求提供一个随机字符串,例如 UUID。

beginMark

由上次调用 insertReport 返回的标记,可用于减少重复调用 insertReport 时显示的重复事件数。请注意,这是个提示,有时仍可能返回重复的事件。

response body

响应代码:

  • 200 – 成功。报告已返回。

  • 400 – 故障。由于格式无效或超出限制而导致请求无效。

  • 404 – 故障。pipeName 无法识别。

    如果调用端点时使用的角色没有足够的权限,也可能返回此错误代码。有关更多信息,请参阅 授予访问权限

  • 429 – 故障。超出请求速率限制。

  • 500 – 故障。发生内部错误。

响应有效负载:

成功响应 (200) 包含与最近添加到表中的文件有关的信息。请注意,此报告可能仅代表大型文件的一部分。

例如:

{
  "pipe": "TESTDB.TESTSCHEMA.pipe2",
  "completeResult": true,
  "nextBeginMark": "1_39",
  "files": [
    {
      "path": "data2859002086815673867.csv",
      "stageLocation": "s3://mybucket/",
      "fileSize": 57,
      "timeReceived": "2017-06-21T04:47:41.453Z",
      "lastInsertTime": "2017-06-21T04:48:28.575Z",
      "rowsInserted": 1,
      "rowsParsed": 1,
      "errorsSeen": 0,
      "errorLimit": 1,
      "complete": true,
      "status": "LOADED"
    }
  ]
}
Copy

响应字段:

字段

类型

描述

pipe

字符串

管道的完全限定名称。

completeResult

布尔

如果在提供的 beginMark 和此报告历史记录中的第一个事件之间缺少事件,则显示 false。否则,显示 true

nextBeginMark

字符串

beginMark 可用于下个请求,以避免显示重复的记录。请注意,此值是一个提示。重复记录仍可能偶尔发生。

files

数组

JSON 对象数组,每个文件对应一个对象,这是历史记录响应的一部分。

path

字符串

相对于暂存区位置的文件路径。

stageLocation

字符串

管道中定义的暂存区 ID(内部暂存区)或 S3 桶(外部暂存区)。

fileSize

长整型

文件大小,以字节为单位。

timeReceived

字符串

接收此文件进行处理的时间。格式为 UTC 时区的 ISO-8601。

lastInsertTime

字符串

上次将此文件中的数据引入表中的时间。格式为 UTC 时区的 ISO-8601。

rowsInserted

长整型

从文件插入到目标表中的行数。

rowsParsed

长整型

从文件中解析的行数。可能会跳过有错误的行。

errorsSeen

整数

文件中显示的错误数

errorLimit

整数

将文件视为有故障之前,文件中允许的错误数(基于 ON_ERROR 复制选项)。

firstError [1]

字符串

此文件中遇到的第一个错误的错误消息。

firstErrorLineNum [1]

长整型

第一个错误的行号。

firstErrorCharacterPos [1]

长整型

第一个错误的字符位置。

firstErrorColumnName [1]

字符串

发生第一个错误的列名。

systemError [1]

字符串

描述文件未处理原因的一般错误。

complete

布尔

指示是否已成功处理完文件。

status

字符串

文件的加载状态:

  • LOAD_IN_PROGRESS:部分文件已加载到表中,但加载流程尚未完成。

  • LOADED:整个文件已加载到表中。

  • LOAD_FAILED:文件加载失败。

  • PARTIALLY_LOADED:此文件中的某些行已成功加载,但其他行由于错误而未加载。已处理完此文件。

[1] 仅当文件包含错误时,才会为这些字段提供值。

端点:loadHistoryScan

提取有关其内容已添加到表中的引入文件的报告。请注意,对于大型文件,这可能只是文件的一部分。此端点与 insertReport 的不同之处在于,它显示两个时间点之间的历史记录。最多返回 10,000 个项目,但可能发出多个调用,以覆盖所需的时间范围。

重要

此端点受速率限制,避免过多的调用。为避免超过速率限制(错误代码 429),建议多依赖的是 insertReport,而不是 loadHistoryScan。调用 loadHistoryScan 时,指定包含一组数据加载的最窄时间范围。例如,可以每 8 分钟读取一次最近 10 分钟的历史记录。尝试每分钟读取过去 24 小时的历史记录会导致 429 错误,表明已达到速率限制。速率限制允许对每条历史记录进行几次读取。

为了获得更全面的视图,在没有这些限制的情况下,Snowflake 可提供一个 Information Schema 表函数 COPY_HISTORY,该函数返回管道或表的加载历史记录。

method

GET

get url

https://{account}.snowflakecomputing.cn/v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<startTime>&endTimeExclusive=<endTime>&requestId=<requestId>

header fields

接受:text/plain 或 application/json

授权:BEARER <jwt_token>

get body

具有以下属性的 JSON 对象:

属性

必填

描述

account

Snowflake 账户的 账户标识符

pipeName

区分大小写、完全限定的管道名称。例如 myDatabase.mySchema.myPipe

startTimeInclusive

ISO-8601 格式的时间戳。要检索加载历史记录数据的时间范围的开始时间。

endTimeExclusive

ISO-8601 格式的时间戳。要检索加载历史记录数据的时间范围的结束时间。如果省略,则 CURRENT_TIMESTAMP() 用作范围结束。

requestId

用于通过系统跟踪请求的字符串。建议为每个请求提供一个随机字符串(例如 UUID)。

response body

响应代码:

  • 200 – 成功。返回加载历史记录扫描结果。

  • 400 – 故障。由于格式无效或超出限制而导致请求无效。

  • 404 – 故障。pipeName 无法识别。

  • 429 – 故障。超出请求速率限制。

  • 500 – 故障。发生内部错误。

响应有效负载:

成功响应 (200) 包含与最近添加到表中的文件有关的信息。 请注意,此报告可能仅代表大型文件的一部分。

例如:

{
  "pipe": "TESTDB.TESTSCHEMA.pipe2",
  "completeResult": true,
  "startTimeInclusive": "2017-08-25T18:42:31.081Z",
  "endTimeExclusive":"2017-08-25T22:43:45.552Z",
  "rangeStartTime":"2017-08-25T22:43:45.383Z",
  "rangeEndTime":"2017-08-25T22:43:45.383Z",
  "files": [
    {
      "path": "data2859002086815673867.csv",
      "stageLocation": "s3://mystage/",
      "fileSize": 57,
      "timeReceived": "2017-08-25T22:43:45.383Z",
      "lastInsertTime": "2017-08-25T22:43:45.383Z",
      "rowsInserted": 1,
      "rowsParsed": 1,
      "errorsSeen": 0,
      "errorLimit": 1,
      "complete": true,
      "status": "LOADED"
    }
  ]
}
Copy

响应字段:

字段

类型

描述

pipe

字符串

管道的完全限定名称。

completeResult

布尔

如果报告不完整(即指定时间范围内的条目数超过 10,000 条限制),则显示 false。如果显示 false,用户可以将当前 rangeEndTime 值指定为下一个请求的 startTimeInclusive 值,以便继续执行下一组条目。

startTimeInclusive

字符串

请求中提供的起始时间戳(采用 ISO-8601 格式)。

endTimeExclusive

字符串

请求中提供的结束时间戳(采用 ISO-8601 格式)。

rangeStartTime

字符串

响应中包含的文件中最早条目的时间戳(采用 ISO-8601 格式)。

rangeEndTime

字符串

响应中包含的文件中最新条目的时间戳(采用 ISO-8601 格式)。

files

数组

JSON 对象数组,每个文件对应一个对象,这是历史记录响应的一部分。在数组中,响应字段与 insertReport 响应中返回的字段相同。

语言: 中文