Snowpipe REST API

您可以通过调用 REST 端点与管道进行交互。本主题介绍 Snowpipe REST API,它用于定义要引入的文件列表,提取加载历史记录报告。

Snowflake 还提供 Java 和 Python APIs,用于简化使用 Snowpipe REST API 的流程。

数据文件引入

Snowpipe API 提供 REST 端点,用于定义要引入的文件列表。

Endpoint: insertFiles

通知 Snowflake 要引入到表中的文件。来自此端点的成功响应意味着,Snowflake 已记录要添加到表中的文件列表。这并不一定意味着已引入文件。有关更多信息,请参阅下面的响应代码。

在大多数情况下,Snowflake 会在几分钟内将新数据插入到目标表中。

Method: POST

POST URL:

https://{account}.snowflakecomputing.cn/v1/data/pipes/{pipeName}/insertFiles?requestId={requestId}

URL 参数:

  • account (Required): Account identifier for your Snowflake account.
  • pipeName (Required): Case-sensitive, fully qualified pipe name. For example, myDatabase.mySchema.myPipe.
  • requestId (Optional): String used to track requests through the system. We recommend providing a random string with each request, for example, a UUID. This should be appended to the URL like this: ?requestId=<your_uuid>.

请求标头

  • Content-Type::

    • text/plain: For a plain text list of file paths and filenames, one per line. The size parameter is not allowed in this format.
    • application/json: For a JSON object containing a list of files with optional size information.
  • Authorization: BEARER <jwt_token>

请求正文(适用于 application/json Content-Type)

请求正文必须是具有一个名为“files”的键的 JSON 对象。与该键关联的值是 JSON 对象数组,其中每个对象代表一个要引入的文件。

{
  "files":[
    {
      "path":"filePath/file1.csv",
      "size":100
    },
    {
      "path":"filePath/file2.csv",
      "size":100
    }
   ]
}

“files”数组中的每个元素都是具有以下属性的 JSON 对象:

  • path (Required): The path and filename of the staged file. If you follow our recommended best practices by partitioning your data in the stage using logical, granular paths, the path values in the payload include the complete paths to the staged files.
  • size (Optional, but recommended for better performance): The size of the file in bytes.

请求正文(适用于 text/plain Content-Type)

请求正文应是文件路径和文件名的纯文本列表,每行一个条目。

filePath/file_a.csv
another/path/file_b.json
yet/another/file_c.txt

Note

The post can contain at most 5000 files. Each file path given must be <= 1024 bytes long when serialized as UTF-8.

响应正文

响应代码:

  • 200 — Success. Files added to the queue of files to ingest.

  • 400 — Failure. Invalid request due to an invalid format, or limit exceeded.

  • 404 — Failure. pipeName not recognized.

    This error code can also be returned if the role used when calling the endpoint does not have sufficient privileges. For more information, see Grant access privileges.

  • 429 — Failure. Request rate limit exceeded.

  • 500 — Failure. Internal error occurred.

响应有效负载:

With a successful API request (i.e. code 200), the response payload contains the requestId and status elements in JSON format. If an error occurs, the response payload may contain details about the error.

{
  "requestId": "your_request_uuid",
  "status": "success"
}

If the COPY INTO <table> statement in the pipe definition includes the PATTERN copy option, the unmatchedPatternFiles attribute lists any files submitted in the header that did not match the regular expression and were therefore skipped.

{
  "requestId": "your_request_uuid",
  "status": "success",
  "unmatchedPatternFiles": ["some_file.txt", "another_file.dat"]
}

加载历史记录报告

Snowpipe API 提供用于提取加载报告的 REST 端点。

Endpoint: insertReport

Retrieves a report of files submitted via insertFiles whose contents were recently ingested into a table. Note that for large files, this may only be part of the file.

请注意此端点的以下限制:

  • 可保留 10,000 个最新事件。
  • 事件最多保留 10 分钟。

An event occurs when data from a file submitted via insertFiles has been committed to the table and is available to queries. The insertReport endpoint can be thought of like the UNIX command tail. By calling this command repeatedly, it is possible to see the full history of events on a pipe over time. Note that the command must be called often enough to not miss events. How often depends on the rate files are sent to insertFiles.

Method: GET

GET URL:

https://<account_identifier>.snowflakecomputing.cn/v1/data/pipes/<pipeName>/insertReport?requestId=<requestId>&beginMark=<beginMark>

URL 参数:

  • account_identifier (Required): Your unique Snowflake account identifier. The preferred format is organization_name-account_name. For alternative formats (account locator with region and cloud platform), see Format 1 (preferred): Account name in your organization.
  • pipeName (Required): The case-sensitive, fully qualified name of the Snowpipe. For example, myDatabase.mySchema.myPipe.
  • requestId (Optional): A string you can provide to track this specific request through Snowflake’s system. Using a random string like a UUID is highly recommended for easier debugging and monitoring. Append this to the URL like so: ?requestId=<your_uuid>.
  • beginMark (Optional): A marker value returned in the nextBeginMark field of a previous insertReport response. Including this marker helps optimize subsequent calls by potentially reducing the number of duplicate events returned. Note: While beginMark is intended as a hint to avoid duplicates, occasional repetition of events might still occur. If beginMark is not specified, the report will show the ingestion history from the last 10 minutes. Append this to the URL like so: ?beginMark=<previous_nextBeginMark>.

请求标头:

  • Accept: Specifies the desired response format. Accepted values are text/plain or application/json.
  • Authorization : Your Snowflake authentication token. Use the format BEARER <jwt_token>.

请求正文:

此端点不接受 GET 请求的请求正文。URL 和标头中提供了所需的参数。

响应正文:

响应代码:

  • 200 — Success. Report returned.

  • 400 — Failure. Invalid request due to an invalid format, or limit exceeded.

  • 404 — Failure. pipeName not recognized.

    This error code can also be returned if the role used when calling the endpoint does not have sufficient privileges. For more information, see Grant access privileges.

  • 429 — Failure. Request rate limit exceeded.

  • 500 — Failure. Internal error occurred.

响应有效负载:

成功响应 (200) 包含与最近添加到表中的文件有关的信息。请注意,此报告可能仅代表大型文件的一部分。

例如:

{
  "pipe": "TESTDB.TESTSCHEMA.pipe2",
  "completeResult": true,
  "nextBeginMark": "1_39",
  "files": [
    {
      "path": "data2859002086815673867.csv",
      "stageLocation": "s3china://mybucket/",
      "fileSize": 57,
      "timeReceived": "2017-06-21T04:47:41.453Z",
      "lastInsertTime": "2017-06-21T04:48:28.575Z",
      "rowsInserted": 1,
      "rowsParsed": 1,
      "errorsSeen": 0,
      "errorLimit": 1,
      "complete": true,
      "status": "LOADED"
    }
  ]
}

响应字段:

FieldTypeDescription
pipeStringThe fully-qualified name of the pipe.
completeResultBooleanfalse if an event was missed between the supplied beginMark and the first event in this report history. Otherwise, true.
nextBeginMarkStringbeginMark to use on the next request to avoid seeing duplicate records. Note that this value is a hint. Duplicates can still occasionally occur.
filesArrayAn array of JSON objects, one object for each file that is part of the history response.
pathStringThe file path relative to the stage location.
stageLocationStringEither the stage ID (internal stage) or the S3 bucket (external stage) defined in the pipe.
fileSizeLongFile size, in bytes.
timeReceivedStringTime that this file was received for processing. Format is ISO-8601 in UTC time zone.
lastInsertTimeStringTime that data from this file was last inserted into the table. Format is ISO-8601 in UTC time zone.
rowsInsertedLongNumber of rows inserted into the target table from the file.
rowsParsedLongNumber of rows parsed from the file. Rows with errors may be skipped.
errorsSeenIntegerNumber of errors seen in the file
errorLimitIntegerNumber of errors allowed in the file before it is considered failed (based on ON_ERROR copy option).
firstError [1]StringError message for the first error encountered in this file.
firstErrorLineNum [1]LongLine number of the first error.
firstErrorCharacterPos [1]LongCharacter position of the first error.
firstErrorColumnName [1]StringColumn name where the first error occurred.
systemError [1]StringGeneral error describing why the file was not processed.
completeBooleanIndicates whether the file was completely processed successfully.
statusStringLoad status for the file:
- LOAD_IN_PROGRESS: Part of the file has been loaded into the table, but the load process has not completed yet.
- LOADED: The entire file has been loaded into the table.
- LOAD_FAILED: The file load failed.
- PARTIALLY_LOADED: Some rows from this file were loaded successfully, but others were not loaded due to errors. Processing of this file is completed.
[1] Values are only supplied for these fields when files include errors.

Endpoint: loadHistoryScan

Fetches a report about ingested files whose contents have been added to table. Note that for large files, this may only be part of the file. This endpoint differs from insertReport in that it views the history between two points in time. There is a maximum of 10,000 items returned, but multiple calls can be issued to cover the desired time range.

Important

This endpoint is rate limited to avoid excessive calls. To help avoid exceeding the rate limit (error code 429), we recommend relying more heavily on insertReport than loadHistoryScan. When calling loadHistoryScan, specify the most narrow time range that includes a set of data loads. For example, reading the last 10 minutes of history every 8 minutes would work well. Trying to read the last 24 hours of history every minute will result in 429 errors indicating a rate limit has been reached. The rate limits are designed to allow each history record to be read a handful of times.

For a more comprehensive view, without these limits, Snowflake provides an Information Schema table function, COPY_HISTORY, that returns the load history of a pipe or table.

Method: GET

GET URL:

https://{account}.snowflakecomputing.cn/v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<startTime>&endTimeExclusive=<endTime>&requestId=<requestId>

URL 参数:

  • account (Required): Your unique Snowflake account identifier.
  • pipeName (Required): The case-sensitive, fully qualified name of the Snowpipe. Example: myDatabase.mySchema.myPipe.
  • startTimeInclusive (Required): The beginning of the time range for retrieving load history data, specified as a timestamp in ISO-8601 format (for example, 2023-10-26T10:00:00Z). This timestamp marks the inclusive lower bound of the query.
  • endTimeExclusive (Optional): The end of the time range for retrieving load history data, specified as a timestamp in ISO-8601 format (for example, 2023-10-26T10:15:00Z). This timestamp marks the exclusive upper bound of the query. If this parameter is omitted, the current server timestamp (CURRENT_TIMESTAMP()) will be used as the end of the time range.
  • requestId (Optional): A string you can provide to track this specific request through Snowflake’s system. We recommend using a random string like a UUID for easier debugging and monitoring. Append this to the URL like so: ?requestId=<your_uuid>.

请求标头:

  • Accept: Specifies the desired response format. Accepted values are text/plain or application/json.
  • Authorization: Your Snowflake authentication token. Use the format BEARER <jwt_token>.

请求正文:

This endpoint does not accept a request body for GET requests. All necessary parameters are provided in the URL and headers.

响应正文:

响应代码:

  • 200 — Success. Load History scan results are returned.
  • 400 — Failure. Invalid request due to an invalid format, or limit exceeded.
  • 404 — Failure. pipeName not recognized.
  • 429 — Failure. Request rate limit exceeded.
  • 500 — Failure. Internal error occurred.

响应有效负载:

成功响应 (200) 包含与最近添加到表中的文件有关的信息。请注意,此报告可能仅代表大型文件的一部分。

例如:

{
  "pipe": "TESTDB.TESTSCHEMA.pipe2",
  "completeResult": true,
  "startTimeInclusive": "2017-08-25T18:42:31.081Z",
  "endTimeExclusive":"2017-08-25T22:43:45.552Z",
  "rangeStartTime":"2017-08-25T22:43:45.383Z",
  "rangeEndTime":"2017-08-25T22:43:45.383Z",
  "files": [
    {
      "path": "data2859002086815673867.csv",
      "stageLocation": "s3china://mystage/",
      "fileSize": 57,
      "timeReceived": "2017-08-25T22:43:45.383Z",
      "lastInsertTime": "2017-08-25T22:43:45.383Z",
      "rowsInserted": 1,
      "rowsParsed": 1,
      "errorsSeen": 0,
      "errorLimit": 1,
      "complete": true,
      "status": "LOADED"
    }
  ]
}

响应字段:

FieldTypeDescription
pipeStringFully-qualified name of the pipe.
completeResultBooleanfalse if the report is incomplete (i.e. the number of entries in the specified time range exceeds the 10,000 entry limit). If false, the user can specify the current rangeEndTime value as the startTimeInclusive value for the next request to proceed to the next set of entries.
startTimeInclusiveStringStarting timestamp (in ISO-8601 format) provided in the request.
endTimeExclusiveStringEnding timestamp (in ISO-8601 format) provided in the request.
rangeStartTimeStringTimestamp (in ISO-8601 format) of the oldest entry in the files included in the response.
rangeEndTimeStringTimestamp (in ISO-8601 format) of the latest entry in the files included in the response.
filesArrayAn array of JSON objects, one object for each file that is part of the history response. Within the array, the response fields are the same as those returned in the insertReport response.