Snowpipe REST API¶
您可以通过调用 REST 端点与管道进行交互。本主题介绍 Snowpipe REST API,它用于定义要引入的文件列表,提取加载历史记录报告。
Snowflake 还提供 Java 和 Python APIs,用于简化使用 Snowpipe REST API 的流程。
本主题内容:
数据文件引入¶
Snowpipe API 提供 REST 端点,用于定义要引入的文件列表。
端点:insertFiles
¶
通知 Snowflake 要引入到表中的文件。来自此端点的成功响应意味着,Snowflake 已记录要添加到表中的文件列表。这并不一定意味着已引入文件。有关更多信息,请参阅下面的响应代码。
在大多数情况下,Snowflake 会在几分钟内将新数据插入到目标表中。
- method
POST
- post url
https://{account}.snowflakecomputing.cn/v1/data/pipes/{pipeName}/insertFiles?requestId={requestId}
- post body
具有以下属性的 JSON 对象:
属性
必填
描述
account
是
Snowflake 账户的 账户标识符。
pipeName
是
区分大小写、完全限定的管道名称。例如
myDatabase.mySchema.myPipe
。requestId
否
用于通过系统跟踪请求的字符串。建议为每个请求提供一个随机字符串,例如 UUID。
- content-type
text/plain
application/json
- header fields
接受:
text/plain
或application/json
授权:BEARER <jwt_token>
对于
text/plain
,内容是路径和文件名的列表,每行一个。不允许使用size
参数。对于
application/json
,内容是路径、文件名和文件大小的列表(可选,但建议使用以获得更好的性能)。有效负载示例如下:{ "files":[ { "path":"filePath/file1.csv", "size":100 }, { "path":"filePath/file2.csv", "size":100 } ] }
请注意,如果您遵循我们推荐的最佳实践,使用逻辑、粒度路径对暂存区中的数据进行分区,则有效负载中的路径值将包括暂存文件的完整路径。
备注
帖子最多可包含 5000 个文件。
序列化为 UTF-8 时,给定的每个文件路径长度必须 <= 1024 字节。
- response body
响应代码:
200 – 成功。文件已添加到要引入的文件队列中。
400 – 故障。由于格式无效或超出限制而导致请求无效。
404 – 故障。
pipeName
无法识别。如果调用端点时使用的角色没有足够的权限,也可能返回此错误代码。有关更多信息,请参阅 授予访问权限。
429 – 故障。超出请求速率限制。
500 – 故障。发生内部错误。
响应有效负载:
API 请求成功(即代码 200)后,响应有效负载包含 JSON 格式的
requestId
和status
元素。如果发生错误,响应有效负载可能包含有关错误的详细信息。如果管道定义中的 COPY INTO <table> 语句包含 PATTERN 复制选项,则
unmatchedPatternFiles
属性将在标头中列出提交的与正则表达式 不 匹配且因此跳过的所有文件。
加载历史记录报告¶
Snowpipe API 提供用于提取加载报告的 REST 端点。
端点:insertReport
¶
检索通过 insertFiles
提交的文件的报告,这些文件的内容最近已引入表中。请注意,对于大型文件,这可能只是文件的一部分。
请注意此端点的以下限制:
可保留 10,000 个最新事件。
事件最多保留 10 分钟。
当通过 insertFiles
提交的文件中的数据已提交到表并可用于查询时,将发生事件。insertReport
端点可看作是 UNIX 命令 tail。通过重复调用此命令,可以查看管道上事件随时间推移的完整历史记录。请注意,必须经常调用该命令,以免错过事件。频率取决于文件发送到 insertFiles
的速率。
- method
GET
- get url
https://<account_identifier>.snowflakecomputing.cn/v1/data/pipes/<pipeName>/insertReport?requestId=<requestId>&beginMark=<beginMark>
- header fields
接受:text/plain 或 application/json
授权:BEARER <jwt_token>
- get body
具有以下属性的 JSON 对象:
属性
必填
描述
account_identifier
是
Snowflake 账户的唯一标识符。
账户标识符的首选格式如下:
organization_name-account_name
Snowflake 组织和账户的名称。有关详细信息,请参阅 格式 1(首选):您所在组织的账户名称。
如果需要,还可以指定 账户定位器,以及托管该账户的 区域 和 云平台。有关详细信息,请参阅 格式 2:区域中的账户定位器。
pipeName
是
区分大小写、完全限定的管道名称。例如
myDatabase.mySchema.myPipe
。requestId
否
用于通过系统跟踪请求的字符串。建议为每个请求提供一个随机字符串,例如 UUID。
beginMark
是
由上次调用
insertReport
返回的标记,可用于减少重复调用insertReport
时显示的重复事件数。请注意,这是个提示,有时仍可能返回重复的事件。
- response body
响应代码:
200 – 成功。报告已返回。
400 – 故障。由于格式无效或超出限制而导致请求无效。
404 – 故障。
pipeName
无法识别。如果调用端点时使用的角色没有足够的权限,也可能返回此错误代码。有关更多信息,请参阅 授予访问权限。
429 – 故障。超出请求速率限制。
500 – 故障。发生内部错误。
响应有效负载:
成功响应 (200) 包含与最近添加到表中的文件有关的信息。请注意,此报告可能仅代表大型文件的一部分。
例如:
{ "pipe": "TESTDB.TESTSCHEMA.pipe2", "completeResult": true, "nextBeginMark": "1_39", "files": [ { "path": "data2859002086815673867.csv", "stageLocation": "s3://mybucket/", "fileSize": 57, "timeReceived": "2017-06-21T04:47:41.453Z", "lastInsertTime": "2017-06-21T04:48:28.575Z", "rowsInserted": 1, "rowsParsed": 1, "errorsSeen": 0, "errorLimit": 1, "complete": true, "status": "LOADED" } ] }
响应字段:
字段
类型
描述
pipe
字符串
管道的完全限定名称。
completeResult
布尔
如果在提供的
beginMark
和此报告历史记录中的第一个事件之间缺少事件,则显示false
。否则,显示true
。nextBeginMark
字符串
beginMark
可用于下个请求,以避免显示重复的记录。请注意,此值是一个提示。重复记录仍可能偶尔发生。files
数组
JSON 对象数组,每个文件对应一个对象,这是历史记录响应的一部分。
path
字符串
相对于暂存区位置的文件路径。
stageLocation
字符串
管道中定义的暂存区 ID(内部暂存区)或 S3 桶(外部暂存区)。
fileSize
长整型
文件大小,以字节为单位。
timeReceived
字符串
接收此文件进行处理的时间。格式为 UTC 时区的 ISO-8601。
lastInsertTime
字符串
上次将此文件中的数据引入表中的时间。格式为 UTC 时区的 ISO-8601。
rowsInserted
长整型
从文件插入到目标表中的行数。
rowsParsed
长整型
从文件中解析的行数。可能会跳过有错误的行。
errorsSeen
整数
文件中显示的错误数
errorLimit
整数
将文件视为有故障之前,文件中允许的错误数(基于 ON_ERROR 复制选项)。
firstError
[1]字符串
此文件中遇到的第一个错误的错误消息。
firstErrorLineNum
[1]长整型
第一个错误的行号。
firstErrorCharacterPos
[1]长整型
第一个错误的字符位置。
firstErrorColumnName
[1]字符串
发生第一个错误的列名。
systemError
[1]字符串
描述文件未处理原因的一般错误。
complete
布尔
指示是否已成功处理完文件。
status
字符串
文件的加载状态:
LOAD_IN_PROGRESS:部分文件已加载到表中,但加载流程尚未完成。
LOADED:整个文件已加载到表中。
LOAD_FAILED:文件加载失败。
PARTIALLY_LOADED:此文件中的某些行已成功加载,但其他行由于错误而未加载。已处理完此文件。
[1] 仅当文件包含错误时,才会为这些字段提供值。
端点:loadHistoryScan
¶
提取有关其内容已添加到表中的引入文件的报告。请注意,对于大型文件,这可能只是文件的一部分。此端点与 insertReport
的不同之处在于,它显示两个时间点之间的历史记录。最多返回 10,000 个项目,但可能发出多个调用,以覆盖所需的时间范围。
重要
此端点受速率限制,避免过多的调用。为避免超过速率限制(错误代码 429),建议多依赖的是 insertReport
,而不是 loadHistoryScan
。调用 loadHistoryScan
时,指定包含一组数据加载的最窄时间范围。例如,可以每 8 分钟读取一次最近 10 分钟的历史记录。尝试每分钟读取过去 24 小时的历史记录会导致 429 错误,表明已达到速率限制。速率限制允许对每条历史记录进行几次读取。
为了获得更全面的视图,在没有这些限制的情况下,Snowflake 可提供一个 Information Schema 表函数 COPY_HISTORY,该函数返回管道或表的加载历史记录。
- method
GET
- get url
https://{account}.snowflakecomputing.cn/v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<startTime>&endTimeExclusive=<endTime>&requestId=<requestId>
- header fields
接受:text/plain 或 application/json
授权:BEARER <jwt_token>
- get body
具有以下属性的 JSON 对象:
属性
必填
描述
account
是
Snowflake 账户的 账户标识符。
pipeName
是
区分大小写、完全限定的管道名称。例如
myDatabase.mySchema.myPipe
。startTimeInclusive
是
ISO-8601 格式的时间戳。要检索加载历史记录数据的时间范围的开始时间。
endTimeExclusive
否
ISO-8601 格式的时间戳。要检索加载历史记录数据的时间范围的结束时间。如果省略,则 CURRENT_TIMESTAMP() 用作范围结束。
requestId
否
用于通过系统跟踪请求的字符串。建议为每个请求提供一个随机字符串(例如 UUID)。
- response body
响应代码:
200 – 成功。返回加载历史记录扫描结果。
400 – 故障。由于格式无效或超出限制而导致请求无效。
404 – 故障。
pipeName
无法识别。429 – 故障。超出请求速率限制。
500 – 故障。发生内部错误。
响应有效负载:
成功响应 (200) 包含与最近添加到表中的文件有关的信息。 请注意,此报告可能仅代表大型文件的一部分。
例如:
{ "pipe": "TESTDB.TESTSCHEMA.pipe2", "completeResult": true, "startTimeInclusive": "2017-08-25T18:42:31.081Z", "endTimeExclusive":"2017-08-25T22:43:45.552Z", "rangeStartTime":"2017-08-25T22:43:45.383Z", "rangeEndTime":"2017-08-25T22:43:45.383Z", "files": [ { "path": "data2859002086815673867.csv", "stageLocation": "s3://mystage/", "fileSize": 57, "timeReceived": "2017-08-25T22:43:45.383Z", "lastInsertTime": "2017-08-25T22:43:45.383Z", "rowsInserted": 1, "rowsParsed": 1, "errorsSeen": 0, "errorLimit": 1, "complete": true, "status": "LOADED" } ] }
响应字段:
字段
类型
描述
pipe
字符串
管道的完全限定名称。
completeResult
布尔
如果报告不完整(即指定时间范围内的条目数超过 10,000 条限制),则显示
false
。如果显示false
,用户可以将当前rangeEndTime
值指定为下一个请求的startTimeInclusive
值,以便继续执行下一组条目。startTimeInclusive
字符串
请求中提供的起始时间戳(采用 ISO-8601 格式)。
endTimeExclusive
字符串
请求中提供的结束时间戳(采用 ISO-8601 格式)。
rangeStartTime
字符串
响应中包含的文件中最早条目的时间戳(采用 ISO-8601 格式)。
rangeEndTime
字符串
响应中包含的文件中最新条目的时间戳(采用 ISO-8601 格式)。
files
数组
JSON 对象数组,每个文件对应一个对象,这是历史记录响应的一部分。在数组中,响应字段与
insertReport
响应中返回的字段相同。