使用 UDF 和过程处理程序处理非结构化数据¶
本主题提供一些示例,以说明如何使用为以下函数和过程编写的处理程序代码来读取和处理暂存文件中的非结构化数据:
也可以使用以其他语言编写的处理程序来读取文件:
- Python:
- Scala:
备注
为了使您的代码能够抵御文件注入攻击,在将文件的位置传递给 UDF 时,尤其是在函数的调用方不是其所有者时,请务必使用带有作用域的 URL。您可以使用内置函数 BUILD_SCOPED_FILE_URL 在 SQL 中创建带有作用域的 URL。有关 BUILD_SCOPED_FILE_URL 用途的更多信息,请参阅 非结构化数据简介。
本主题内容:
使用 UDF 和过程处理 PDF¶
本部分中的示例使用 Java 处理程序代码处理暂存的非结构化文件(首先使用 UDF,然后使用过程)。两个处理程序都使用 Apache PDFBox 库 (https://pdfbox.apache.org/) 提取指定 PDF 文件的内容。
处理程序代码在 UDF 与过程之间非常相似。不同之处是读取传入的 PDF 文件的方式。
在 UDF 中,处理程序使用 Java
InputStream
读取文件。在过程中,处理程序使用 Snowflake
SnowflakeFile
读取文件。
这些示例使用内联的处理程序代码(而不是在暂存的 JAR 中编译),这意味着您无需编译和打包处理程序代码,然后将其上传到暂存区。有关内联的和暂存的处理程序有何区别的更多信息,请参阅 将处理程序代码保持内联或保留在暂存区。
下载 PDFBox 库¶
在开始编写 UDF 之前,如果您还没有 PDFBox 库 JAR 文件,请下载该文件。处理程序代码需要依赖该文件。稍后要将该库 JAR 文件上传到暂存区。
从 `Apache PDFBox 库下载页面<https://pdfbox.apache.org/download.html>`_ 下载该库的最新发布版本。
创建暂存区¶
创建暂存区,以存放处理程序代码依赖的库和处理程序代码将读取的数据文件。
使用下面的代码创建单独的内部暂存区,以存放:
处理程序依赖的库 JAR 文件。您将从 UDF 中引用该暂存区和 JAR 文件。
处理程序代码将读取的数据文件。
以下示例中的代码使用 CREATE STAGE 命令来创建您需要的暂存区。
-- Create an internal stage to store the JAR files.
CREATE OR REPLACE STAGE jars_stage;
-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');
上传所需的库和要读取的 PDF 文件¶
完成以下步骤,以上传要依赖的 JAR 文件(包含用于处理 PDF 的库代码)和数据文件(处理程序代码将处理的 PDF 文件)。
在此示例中,可以使用您选择的 PDF 文件。
将 Apache PDFBox 的 JAR 文件从本地临时目录复制到存储 JAR 文件的暂存区:
- Linux/Mac:
PUT file:///tmp/pdfbox-app-2.0.27.jar @jars_stage AUTO_COMPRESS=FALSE;
- Windows:
PUT file://C:\temp\pdfbox-app-2.0.27.jar @jars_stage AUTO_COMPRESS=FALSE;
将 PDF 文件从本地临时目录复制到存储数据文件的暂存区:
- Linux/Mac:
PUT file:///tmp/myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
- Windows:
PUT file://C:\temp\myfile.pdf @data_stage AUTO_COMPRESS=FALSE;
创建并调用 UDF¶
完成以下步骤,以创建用于读取和处理 PDF 文件的 UDF。
粘贴并运行以下代码,以创建 UDF。
此 UDF 的处理程序解析 PDF 文档并检索其内容。处理程序使用
InputStream
类来读取文件。有关使用InputStream
读取文件的更多信息,请参阅 使用 InputStream 读取动态指定的文件。CREATE FUNCTION process_pdf_func(file STRING) RETURNS STRING LANGUAGE JAVA RUNTIME_VERSION = 11 IMPORTS = ('@jars_stage/pdfbox-app-2.0.27.jar') HANDLER = 'PdfParser.readFile' AS $$ import org.apache.pdfbox.pdmodel.PDDocument; import org.apache.pdfbox.text.PDFTextStripper; import org.apache.pdfbox.text.PDFTextStripperByArea; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; public class PdfParser { public static String readFile(InputStream stream) throws IOException { try (PDDocument document = PDDocument.load(stream)) { document.getClass(); if (!document.isEncrypted()) { PDFTextStripperByArea stripper = new PDFTextStripperByArea(); stripper.setSortByPosition(true); PDFTextStripper tStripper = new PDFTextStripper(); String pdfFileInText = tStripper.getText(document); return pdfFileInText; } } return null; } } $$;
使用 ALTER STAGE 命令刷新
data_stage
暂存区的目录表:ALTER STAGE data_stage REFRESH;
调用 UDF,以读取暂存的 PDF 文件并提取内容。
以下示例中的代码调用 UDF,并传递带有作用域的 URL,以使代码能够抵御文件注入攻击。当函数的调用者不是其所有者时,请务必使用带有作用域的 URL。当 UDF 的调用者也是其所有者时,您可以将 URL 实参作为带有作用域的 URL 或其他形式传递。
SELECT process_pdf_func(BUILD_SCOPED_FILE_URL('@data_stage', '/myfile.pdf'));
创建并调用过程¶
完成以下步骤,以创建用于读取和处理 PDF 文件的过程。
粘贴并运行以下代码,以创建过程。
此过程的处理程序解析 PDF 文档并检索其内容。处理程序使用
SnowflakeFile
类来读取文件。有关使用 使用 SnowflakeFile 读取动态指定的文件 读取文件的更多信息,请参阅SnowflakeFile
。CREATE PROCEDURE process_pdf_proc(file STRING) RETURNS STRING LANGUAGE JAVA RUNTIME_VERSION = 11 IMPORTS = ('@jars_stage/pdfbox-app-2.0.28.jar') HANDLER = 'PdfParser.readFile' PACKAGES = ('com.snowflake:snowpark:latest') AS $$ import org.apache.pdfbox.pdmodel.PDDocument; import org.apache.pdfbox.text.PDFTextStripper; import org.apache.pdfbox.text.PDFTextStripperByArea; import com.snowflake.snowpark_java.types.SnowflakeFile; import com.snowflake.snowpark_java.Session; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; public class PdfParser { public static String readFile(Session session, String fileURL) throws IOException { SnowflakeFile file = SnowflakeFile.newInstance(fileURL); try (PDDocument document = PDDocument.load(file.getInputStream())) { document.getClass(); if (!document.isEncrypted()) { PDFTextStripperByArea stripper = new PDFTextStripperByArea(); stripper.setSortByPosition(true); PDFTextStripper tStripper = new PDFTextStripper(); String pdfFileInText = tStripper.getText(document); return pdfFileInText; } } return null; } } $$;
使用 ALTER STAGE 命令刷新
data_stage
暂存区的目录表:ALTER STAGE data_stage REFRESH;
调用过程,以读取暂存的 PDF 文件并提取内容。
以下示例中的代码传递一个带有作用域的 URL,其指向您创建的暂存区上的 PDF 文件。
CALL process_pdf_proc(BUILD_SCOPED_FILE_URL('@data_stage', '/UsingThird-PartyPackages.pdf'));
使用 UDTF 处理 CSV¶
本部分中的示例使用 Java UDTFs 从暂存文件中提取并返回数据。
创建数据暂存区¶
使用 CREATE STAGE 命令创建暂存区:
以下 SQL 语句创建一个内部暂存区,以存储示例的数据文件:
-- Create an internal stage to store the data files. The stage includes a directory table.
CREATE OR REPLACE STAGE data_stage DIRECTORY=(ENABLE=TRUE) ENCRYPTION = (TYPE='SNOWFLAKE_SSE');
上传要读取的 CSV 文件¶
将 CSV 文件从本地临时目录复制到存储数据文件的暂存区:
- Linux/Mac:
PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;
- Windows:
PUT file://C:\temp\sample.csv @data_stage AUTO_COMPRESS=FALSE;
创建并调用 UDTF¶
此示例提取一组指定的 CSV 文件的内容,并返回表中的行。通过在从源读取文件数据时对其进行处理,可以避免文件非常大时可能出现的内存不足错误。
以下 UDTF 处理程序示例中的代码使用 SnowflakeFile
从文件 URL 中生成 InputStream
,以读取 CSV 文件。(在 Java UDTF 处理程序中,行处理从 Snowflake 调用您实现的 process
方法时开始。)在构造处理程序本身定义的 CsvStreamingReader
类的实例时,该代码使用流。
CsvStreamingReader
类逐行读取收到的 CSV 文件流的内容,为其他代码提供了将每行作为记录检索的方式(在记录中,各列用逗号分隔)。process
方法在从流中读取每条记录时返回这些记录。
有关使用 Java 处理程序编写表格式用户定义函数 (UDTFs) 的更多信息,请参阅 表格 Java UDFs (UDTFs)。
完成以下步骤,以创建 Java UDTF 并上传所需的文件:
创建一个使用
SnowflakeFile
类的 Java UDTF:CREATE OR REPLACE FUNCTION parse_csv(file STRING) RETURNS TABLE (col1 STRING, col2 STRING, col3 STRING ) LANGUAGE JAVA HANDLER = 'CsvParser' AS $$ import org.xml.sax.SAXException; import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.stream.Stream; import com.snowflake.snowpark_java.types.SnowflakeFile; public class CsvParser { static class Record { public String col1; public String col2; public String col3; public Record(String col1_value, String col2_value, String col3_value) { col1 = col1_value; col2 = col2_value; col3 = col3_value; } } public static Class getOutputClass() { return Record.class; } static class CsvStreamingReader { private final BufferedReader csvReader; public CsvStreamingReader(InputStream is) { this.csvReader = new BufferedReader(new InputStreamReader(is)); } public void close() { try { this.csvReader.close(); } catch (IOException e) { e.printStackTrace(); } } Record getNextRecord() { String csvRecord; try { if ((csvRecord = csvReader.readLine()) != null) { String[] columns = csvRecord.split(",", 3); return new Record(columns[0], columns[1], columns[2]); } } catch (IOException e) { throw new RuntimeException("Reading CSV failed.", e); } finally { // No more records, we can close the reader. close(); } // Return null to indicate the end of the stream. return null; } } public Stream<Record> process(String file_url) throws IOException { SnowflakeFile file = SnowflakeFile.newInstance(file_url); CsvStreamingReader csvReader = new CsvStreamingReader(file.getInputStream()); return Stream.generate(csvReader::getNextRecord); } } $$ ;
刷新
data_stage
暂存区的目录表:ALTER STAGE data_stage REFRESH;
调用 Java UDTF 以读取一个或多个暂存的 CSV 文件,并以表格格式提取内容:
以下示例中的代码调用 UDF,并传递带有作用域的 URL,以减少文件注入攻击的风险。当函数的调用者不是其所有者时,请务必使用带有作用域的 URL。当 UDF 的调用者也是其所有者时,您可以将 URL 实参作为带有作用域的 URL 或其他受支持的形式传递。
-- Input a file URL. SELECT * FROM TABLE(PARSE_CSV(BUILD_SCOPED_FILE_URL(@data_stage, 'sample.csv')));