使用目录表构建数据处理管道

您可以将跟踪和存储暂存区中文件级元数据的目录表与其他 Snowflake 对象(如流和任务)组合起来,以构建数据处理管道。

记录对视图中的目录表、表、外部表或基础表所进行的数据操作语言 (DML) 更改。任务 执行一项操作,该操作可以是 SQL 命令或扩展 UDF。您可以计划任务或按需运行任务。

示例:创建一个简单的管道来处理 PDFs

此示例生成一个简单的数据处理管道,用于执行以下操作:

  1. 检测 PDF 添加到暂存区的文件。

  2. 从文件中提取数据。

  3. 将数据插入到 Snowflake 表中。

该管道使用流来检测暂存区中目录表的更改,并使用一个任务执行用户定义的函数 (UDF),以处理文件。

下图总结了示例管道的工作原理:

一个简单的数据处理管道,该管道使用流跟踪目录表的更改。

第 1 步:创建启用了目录表的暂存区

创建启用了目录表的内部暂存区。示例语句将 ENCRYPTION 类型设置为 SNOWFLAKE_SSE,以 在暂存区中启用非结构化数据的访问

CREATE OR REPLACE STAGE my_pdf_stage
  ENCRYPTION = ( TYPE = 'SNOWFLAKE_SSE')
  DIRECTORY = ( ENABLE = TRUE);
Copy

第 2 步:在目录表上创建流

接下来,指定目录表所属的暂存区,从而在目录表上创建流。该流将跟踪对目录表的更改。在本示例的第 5 步中,我们使用此流来构造任务。

CREATE STREAM my_pdf_stream ON STAGE my_pdf_stage;
Copy

第 3 步:创建用户定义的函数来解析 PDFs

创建用户定义的函数 (UDF),以从 PDF 文件中提取数据。我们在第 5 步中创建的任务将调用此 UDF,来处理暂存区中新增的文件。

下面的示例语句创建一个名为 PDF_PARSE 的 UDF,用于处理包含产品评论数据的 PDF 文件。该 UDF 使用 PyPDF2 库提取表单字段数据。它会返回一个字典,其中包含键值对形式的表单名称和值。

备注

该 UDF 使用 SnowflakeFile 类读取动态指定的文件。要了解有关 SnowflakeFile 的更多信息,请参阅 使用 SnowflakeFile 读取动态指定的文件

CREATE OR REPLACE FUNCTION PDF_PARSE(file_path string)
  RETURNS VARIANT
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.8'
  HANDLER = 'parse_pdf_fields'
  PACKAGES=('typing-extensions','PyPDF2','snowflake-snowpark-python')
  AS
  $$
  from pathlib import Path
  import PyPDF2 as pypdf
  from io import BytesIO
  from snowflake.snowpark.files import SnowflakeFile

  def parse_pdf_fields(file_path):
      with SnowflakeFile.open(file_path, 'rb') as f:
        buffer = BytesIO(f.readall())
      reader = pypdf.PdfFileReader(buffer)
      fields = reader.getFields()
      field_dict = {}
      for k, v in fields.items():
          if "/V" in v.keys():
              field_dict[v["/T"]] = v["/V"].replace("/", "") if v["/V"].startswith("/") else v["/V"]

      return field_dict
  $$;
Copy

第 4 步:创建表以存储文件内容

接下来创建一个表,其中每一行将有关暂存区中文件的信息存储在名为 file_namefile_data 的列中。在本示例的第 5 步中创建的任务会将数据加载到此表中。

CREATE OR REPLACE TABLE prod_reviews (
  file_name varchar,
  file_data variant
);
Copy

第 5 步:创建任务

创建一个计划任务,用于检查暂存区中的新文件流,并将文件数据插入到 prod_reviews 表中。

以下语句使用在第 2 步中创建的流来创建计划任务。该任务使用 SYSTEM$STREAM_HAS_DATA 函数,检查流中是否包含变更数据获取 (CDC) 记录。

CREATE OR REPLACE TASK load_new_file_data
  WAREHOUSE = 'MY_WAREHOUSE'
  SCHEDULE = '1 minute'
  COMMENT = 'Process new files on the stage and insert their data into the prod_reviews table.'
  WHEN
  SYSTEM$STREAM_HAS_DATA('my_pdf_stream')
  AS
  INSERT INTO prod_reviews (
    SELECT relative_path as file_name,
    PDF_PARSE(build_scoped_file_url('@my_pdf_stage', relative_path)) as file_data
    FROM my_pdf_stream
    WHERE METADATA$ACTION='INSERT'
  );
Copy

第 6 步:运行任务以测试管道

要检查管道能否正常工作,您可以将文件添加到暂存区、手动执行任务,然后查询 product_reviews 表。

首先向 my_pdf_stage 暂存区添加一些 PDF 文件,然后刷新暂存区。

备注

此示例使用 PUT 命令,此命令不能从 Snowflake Web 界面中的工作表执行。要使用 Snowsight 上传文件,请参阅 将文件上传到指定内部暂存区

PUT file:///my/file/path/prod_review1.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;
PUT file:///my/file/path/prod_review2.pdf @my_pdf_stage AUTO_COMPRESS = FALSE;

ALTER STAGE my_pdf_stage REFRESH;
Copy

您可以查询流,以验证它是否记录了我们添加到暂存区的两个 PDF 文件。

SELECT * FROM my_pdf_stream;
Copy

现在,执行任务以处理 PDF 文件,并更新 product_reviews 表。

EXECUTE TASK load_new_file_data;
+----------------------------------------------------------+
| status                                                   |
|----------------------------------------------------------|
| Task LOAD_NEW_FILE_DATA is scheduled to run immediately. |
+----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.178s
Copy

查询 product_reviews 表,以查看任务是否为每个 PDF 文件添加了一行。

select * from prod_reviews;
+------------------+----------------------------------+
| FILE_NAME        | FILE_DATA                        |
|------------------+----------------------------------|
| prod_review1.pdf | {                                |
|                  |   "FirstName": "John",           |
|                  |   "LastName": "Johnson",         |
|                  |   "Middle Name": "Michael",      |
|                  |   "Product": "Tennis Shoes",     |
|                  |   "Purchase Date": "03/15/2022", |
|                  |   "Recommend": "Yes"             |
|                  | }                                |
| prod_review2.pdf | {                                |
|                  |   "FirstName": "Emily",          |
|                  |   "LastName": "Smith",           |
|                  |   "Middle Name": "Ann",          |
|                  |   "Product": "Red Skateboard",   |
|                  |   "Purchase Date": "01/10/2023", |
|                  |   "Recommend": "MayBe"           |
|                  | }                                |
+------------------+----------------------------------+
Copy

最后,您可以创建一个视图,将列中的 FILE_DATA 对象解析为单独的列。然后可以查询视图,以分析和处理文件内容。

CREATE OR REPLACE VIEW prod_review_info_v
  AS
  WITH file_data
  AS (
      SELECT
        file_name
        , parse_json(file_data) AS file_data
      FROM prod_reviews
  )
  SELECT
      file_name
      , file_data:FirstName::varchar AS first_name
      , file_data:LastName::varchar AS last_name
      , file_data:"Middle Name"::varchar AS middle_name
      , file_data:Product::varchar AS product
      , file_data:"Purchase Date"::date AS purchase_date
      , file_data:Recommend::varchar AS recommended
      , build_scoped_file_url(@my_pdf_stage, file_name) AS scoped_review_url
  FROM file_data;

SELECT * FROM prod_review_info_v;

+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| FILE_NAME        | FIRST_NAME | LAST_NAME | MIDDLE_NAME | PRODUCT        | PURCHASE_DATE | RECOMMENDED | SCOPED_REVIEW_URL                                                                                                                                                                                                                                                                                                                                                                                                              |
|------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| prod_review1.pdf | John       | Johnson   | Michael     | Tennis Shoes   | 2022-03-15    | Yes         | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.cn/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/RZ4s%2bJLa6iHmLouHA79b94tg%2f3SDA%2bOQX01pAYo%2bl6gAxiLK8FGB%2bv8L2QSB51tWP%2fBemAbpFd%2btKfEgKibhCXN2QdMCNraOcC1uLdR7XV40JRIrB4gDYkpHxx3HpCSlKkqXeuBll%2fyZW9Dc6ZEtwF19GbnEBR9FwiUgyqWjqSf4KTmgWKv5gFCpxwqsQgofJs%2fqINOy%2bOaRPa%2b65gcnPpY2Dc1tGkJGC%2fT110Iw30cKuMGZ2HU%3d              |
| prod_review2.pdf | Emily      | Smith     | Ann         | Red Skateboard | 2023-01-10    | MayBe       | https://mydeployment.us-west-2.aws.privatelink.snowflakecomputing.cn/api/files/01aefcdc-0000-6f92-0000-012900fdc73e/1275606224902/g3glgIbGik3VOmgcnltZxVNQed8%2fSBehlXbgdZBZqS1iAEsFPd8pkUNB1DSQEHoHfHcWLsaLblAdSpPIZm7wDwaHGvbeRbLit6nvE%2be2LHOsPR1UEJrNn83o%2fZyq4kVCIgKeSfMeGH2Gmrvi82JW%2fDOyZJITgCEZzpvWGC9Rmnr1A8vux47uZj9MYjdiN2Hho3uL9ExeFVo8FUtR%2fHkdCJKIzCRidD5oP55m9p2ml2yHOkDJW50%3d                            |
+------------------+------------+-----------+-------------+----------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Copy
语言: 中文