Snowpipe REST API¶
您可以通过调用 REST 端点与管道进行交互。本主题介绍 Snowpipe REST API,它用于定义要引入的文件列表,提取加载历史记录报告。
Snowflake 还提供 Java 和 Python APIs,用于简化使用 Snowpipe REST API 的流程。
本主题内容:
数据文件引入¶
Snowpipe API 提供 REST 端点,用于定义要引入的文件列表。
端点:insertFiles
¶
通知 Snowflake 要引入到表中的文件。来自此端点的成功响应意味着,Snowflake 已记录要添加到表中的文件列表。这并不一定意味着已引入文件。有关更多信息,请参阅下面的响应代码。
在大多数情况下,Snowflake 会在几分钟内将新数据插入到目标表中。
方法: POST
POST URL:
https://{account}.snowflakecomputing.cn/v1/data/pipes/{pipeName}/insertFiles?requestId={requestId}
URL 参数:
:code:`account`(必填):Snowflake 账户的账户标识符。
pipeName`(必填):区分大小写、完全限定的管道名称。例如,:code:`myDatabase.mySchema.myPipe
。requestId`(可选):用于在系统中跟踪请求的字符串。建议为每个请求提供一个随机字符串,例如,UUID。应将其追加到 URL 中,如下所示::code:
?requestId=<your_uuid>`。
请求标头
Content-Type:
:text/plain
:对于文件路径和文件名的纯文本列表,每行一个。此格式不允许使用文件大小参数。application/json
:适用于包含带有可选文件大小信息的文件列表的 JSON 对象。
Authorization
:BEARER <jwt_token>
请求正文(适用于 application/json Content-Type)
请求正文必须是具有一个名为“files”的键的 JSON 对象。与该键关联的值是 JSON 对象数组,其中每个对象代表一个要引入的文件。
{
"files":[
{
"path":"filePath/file1.csv",
"size":100
},
{
"path":"filePath/file2.csv",
"size":100
}
]
}
“files”数组中的每个元素都是具有以下属性的 JSON 对象:
:code:`path`(必填):暂存文件的路径和文件名。如果您遵循我们推荐的最佳实践,使用逻辑、粒度路径对暂存区中的数据进行分区,则有效负载中的路径值将包括暂存文件的完整路径。
:code:`size`(可选,但建议使用以获得更好的性能):文件大小(以字节为单位)。
请求正文(适用于 text/plain Content-Type)
请求正文应是文件路径和文件名的纯文本列表,每行一个条目。
filePath/file_a.csv
another/path/file_b.json
yet/another/file_c.txt
备注
帖子最多可包含 5000 个文件。序列化为 UTF-8 时,给定的每个文件路径长度必须 <= 1024 字节。
响应正文
响应代码:
200 – 成功。文件已添加到要引入的文件队列中。
400 – 故障。由于格式无效或超出限制而导致请求无效。
404 – 故障。
pipeName
无法识别。如果调用端点时使用的角色没有足够的权限,也可能返回此错误代码。有关更多信息,请参阅 授予访问权限。
429 – 故障。超出请求速率限制。
500 – 故障。发生内部错误。
响应有效负载:
API 请求成功(即代码 200)后,响应有效负载包含 JSON 格式的
requestId
和status
元素。如果发生错误,响应有效负载可能包含有关错误的详细信息。{ "requestId": "your_request_uuid", "status": "success" }如果管道定义中的 COPY INTO <table> 语句包含 PATTERN 复制选项,则
unmatchedPatternFiles
属性将在标头中列出提交的与正则表达式 不 匹配且因此跳过的所有文件。{ "requestId": "your_request_uuid", "status": "success", "unmatchedPatternFiles": ["some_file.txt", "another_file.dat"] }
加载历史记录报告¶
Snowpipe API 提供用于提取加载报告的 REST 端点。
端点:insertReport
¶
检索通过 insertFiles
提交的文件的报告,这些文件的内容最近已引入表中。请注意,对于大型文件,这可能只是文件的一部分。
请注意此端点的以下限制:
可保留 10,000 个最新事件。
事件最多保留 10 分钟。
当通过 insertFiles
提交的文件中的数据已提交到表并可用于查询时,将发生事件。insertReport
端点可看作是 UNIX 命令 tail。通过重复调用此命令,可以查看管道上事件随时间推移的完整历史记录。请注意,必须经常调用该命令,以免错过事件。频率取决于文件发送到 insertFiles
的速率。
方法: GET
GET URL:
https://<account_identifier>.snowflakecomputing.cn/v1/data/pipes/<pipeName>/insertReport?requestId=<requestId>&beginMark=<beginMark>
URL 参数:
account_identifier`(必填):您的唯一 Snowflake 账户标识符。首选格式是 :samp:`{organization_name}-{account_name}
。有关其他格式(带有区域和云平台的账户定位器),请参阅 格式 1(首选):您所在组织的账户名称。pipeName`(必填):Snowpipe 的完全限定名称,区分大小写。例如,:code:`myDatabase.mySchema.myPipe
。requestId`(可选):您可以提供一个字符串,用于在 Snowflake 系统中跟踪此特定请求。强烈建议使用像 UUID 这样的随机字符串,以便于更轻松地进行调试和监控。将其追加到 URL 中,如下所示::code:
?requestId=<your_uuid>`。beginMark`(可选):在先前 :code:`insertReport
响应的nextBeginMark
字段中返回的标记值。添加此标记可能减少返回的重复事件的数量,从而有助于优化后续调用。注意:虽然beginMark
是为了提示避免重复,但偶尔仍可能发生重复事件。如果未指定beginMark
,则报告将显示最近 10 分钟的引入历史记录。将其追加到 URL 中,如下所示:?beginMark=<previous_nextBeginMark>
。
请求标头:
Accept:指定所需的响应格式。可接受的值为
text/plain
或application/json
。Authorization:您的 Snowflake 身份验证令牌。使用格式 BEARER <jwt_token>。
请求正文:
此端点不接受 GET 请求的请求正文。URL 和标头中提供了所需的参数。
响应正文:
响应代码:
200 – 成功。报告已返回。
400 – 故障。由于格式无效或超出限制而导致请求无效。
404 – 故障。
pipeName
无法识别。如果调用端点时使用的角色没有足够的权限,也可能返回此错误代码。有关更多信息,请参阅 授予访问权限。
429 – 故障。超出请求速率限制。
500 – 故障。发生内部错误。
响应有效负载:
成功响应 (200) 包含与最近添加到表中的文件有关的信息。请注意,此报告可能仅代表大型文件的一部分。
例如:
{ "pipe": "TESTDB.TESTSCHEMA.pipe2", "completeResult": true, "nextBeginMark": "1_39", "files": [ { "path": "data2859002086815673867.csv", "stageLocation": "s3://mybucket/", "fileSize": 57, "timeReceived": "2017-06-21T04:47:41.453Z", "lastInsertTime": "2017-06-21T04:48:28.575Z", "rowsInserted": 1, "rowsParsed": 1, "errorsSeen": 0, "errorLimit": 1, "complete": true, "status": "LOADED" } ] }响应字段:
字段
类型
描述
pipe
字符串
管道的完全限定名称。
completeResult
布尔
如果在提供的
beginMark
和此报告历史记录中的第一个事件之间缺少事件,则显示false
。否则,显示true
。
nextBeginMark
字符串
beginMark
可用于下个请求,以避免显示重复的记录。请注意,此值是一个提示。重复记录仍可能偶尔发生。
files
数组
JSON 对象数组,每个文件对应一个对象,这是历史记录响应的一部分。
path
字符串
相对于暂存区位置的文件路径。
stageLocation
字符串
管道中定义的暂存区 ID(内部暂存区)或 S3 桶(外部暂存区)。
fileSize
长整型
文件大小,以字节为单位。
timeReceived
字符串
接收此文件进行处理的时间。格式为 UTC 时区的 ISO-8601。
lastInsertTime
字符串
上次将此文件中的数据引入表中的时间。格式为 UTC 时区的 ISO-8601。
rowsInserted
长整型
从文件插入到目标表中的行数。
rowsParsed
长整型
从文件中解析的行数。可能会跳过有错误的行。
errorsSeen
整数
文件中显示的错误数
errorLimit
整数
将文件视为有故障之前,文件中允许的错误数(基于 ON_ERROR 复制选项)。
firstError
[1]字符串
此文件中遇到的第一个错误的错误消息。
firstErrorLineNum
[1]长整型
第一个错误的行号。
firstErrorCharacterPos
[1]长整型
第一个错误的字符位置。
firstErrorColumnName
[1]字符串
发生第一个错误的列名。
systemError
[1]字符串
描述文件未处理原因的一般错误。
complete
布尔
指示是否已成功处理完文件。
status
字符串
文件的加载状态:
LOAD_IN_PROGRESS:部分文件已加载到表中,但加载流程尚未完成。
LOADED:整个文件已加载到表中。
LOAD_FAILED:文件加载失败。
PARTIALLY_LOADED:此文件中的某些行已成功加载,但其他行由于错误而未加载。已处理完此文件。
[1] 仅当文件包含错误时,才会为这些字段提供值。
端点:loadHistoryScan
¶
提取有关其内容已添加到表中的引入文件的报告。请注意,对于大型文件,这可能只是文件的一部分。此端点与 insertReport
的不同之处在于,它显示两个时间点之间的历史记录。最多返回 10,000 个项目,但可能发出多个调用,以覆盖所需的时间范围。
重要
此端点受速率限制,避免过多的调用。为避免超过速率限制(错误代码 429),建议多依赖的是 insertReport
,而不是 loadHistoryScan
。调用 loadHistoryScan
时,指定包含一组数据加载的最窄时间范围。例如,可以每 8 分钟读取一次最近 10 分钟的历史记录。尝试每分钟读取过去 24 小时的历史记录会导致 429 错误,表明已达到速率限制。速率限制允许对每条历史记录进行几次读取。
为了获得更全面的视图,在没有这些限制的情况下,Snowflake 可提供一个 Information Schema 表函数 COPY_HISTORY,该函数返回管道或表的加载历史记录。
方法: GET
GET URL:
https://{account}.snowflakecomputing.cn/v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<startTime>&endTimeExclusive=<endTime>&requestId=<requestId>
URL 参数:
:code:`account`(必填):您的唯一 Snowflake 账户标识符。
pipeName`(必填):Snowpipe 的完全限定名称,区分大小写。示例::code:`myDatabase.mySchema.myPipe
。:code:`startTimeInclusive`(必填):检索加载历史数据的时间范围的开始,指定为 ISO-8601 格式的时间戳(例如,2023-10-26T10:00:00Z)。该时间戳表示查询的闭区间下界。
:code:`endTimeExclusive`(可选):检索加载历史数据的时间范围的结束,指定为 ISO-8601 格式的时间戳(例如,2023-10-26T10:15:00Z)。该时间戳表示查询的开区间上界。如果省略此参数,则将使用当前服务器时间戳 (CURRENT_TIMESTAMP()) 作为时间范围的结束。
requestId`(可选):您可以提供一个字符串,用于在 Snowflake 系统中跟踪此特定请求。建议使用像 UUID 这样的随机字符串,以便于更轻松地进行调试和监控。将其追加到 URL 中,如下所示::code:
?requestId=<your_uuid>`。
请求标头:
Accept
:指定所需的响应格式。可接受的值为text/plain
或application/json
。Authorization
:您的 Snowflake 身份验证令牌。使用格式BEARER <jwt_token>
。
请求正文:
此端点不接受 GET
请求的请求正文。URL 和标头中提供了所有必需的参数。
响应正文:
响应代码:
200 – 成功。返回加载历史记录扫描结果。
400 – 故障。由于格式无效或超出限制而导致请求无效。
404 – 故障。
pipeName
无法识别。429 – 故障。超出请求速率限制。
500 – 故障。发生内部错误。
响应有效负载:
成功响应 (200) 包含与最近添加到表中的文件有关的信息。 请注意,此报告可能仅代表大型文件的一部分。
例如:
{ "pipe": "TESTDB.TESTSCHEMA.pipe2", "completeResult": true, "startTimeInclusive": "2017-08-25T18:42:31.081Z", "endTimeExclusive":"2017-08-25T22:43:45.552Z", "rangeStartTime":"2017-08-25T22:43:45.383Z", "rangeEndTime":"2017-08-25T22:43:45.383Z", "files": [ { "path": "data2859002086815673867.csv", "stageLocation": "s3://mystage/", "fileSize": 57, "timeReceived": "2017-08-25T22:43:45.383Z", "lastInsertTime": "2017-08-25T22:43:45.383Z", "rowsInserted": 1, "rowsParsed": 1, "errorsSeen": 0, "errorLimit": 1, "complete": true, "status": "LOADED" } ] }响应字段:
字段
类型
描述
pipe
字符串
管道的完全限定名称。
completeResult
布尔
如果报告不完整(即指定时间范围内的条目数超过 10,000 条限制),则显示
false
。如果显示false
,用户可以将当前rangeEndTime
值指定为下一个请求的startTimeInclusive
值,以便继续执行下一组条目。
startTimeInclusive
字符串
请求中提供的起始时间戳(采用 ISO-8601 格式)。
endTimeExclusive
字符串
请求中提供的结束时间戳(采用 ISO-8601 格式)。
rangeStartTime
字符串
响应中包含的文件中最早条目的时间戳(采用 ISO-8601 格式)。
rangeEndTime
字符串
响应中包含的文件中最新条目的时间戳(采用 ISO-8601 格式)。
files
数组
JSON 对象数组,每个文件对应一个对象,这是历史记录响应的一部分。在数组中,响应字段与
insertReport
响应中返回的字段相同。