在 Python 中为 DataFrames 创建用户定义函数 (UDFs)¶
Snowpark API 提供了一些方法,您可以使用这些方法在 Python 中通过 Lambda 或函数创建用户定义函数。本主题说明如何创建这些类型的函数。
本主题内容:
简介¶
使用 Snowpark,您可以为自定义 lambda 和函数创建用户定义函数 (UDFs),并且可以调用这些 UDFs 来处理 DataFrame 中的数据。
当您使用 Snowpark API 创建 UDF 时,Snowpark 库会将函数的代码上传到内部暂存区。当您调用 UDF 时,Snowpark 库会在数据所在的服务器上执行您的函数。因此,不需要将数据传输到客户端,函数就可以处理数据。
在自定义代码中,您还可以从 Python 文件或第三方包导入模块。
可以通过以下两种方式之一为自定义代码创建 UDF:
您可以 创建匿名 UDF 并将该函数赋值给变量。只要此变量在作用域中,您就可以使用此变量调用 UDF。
您可以 创建命名 UDF,并按名称调用 UDF。例如,如果需要按名称调用 UDF 或在后续会话中使用 UDF,您可以使用此方式。
接下来的章节将介绍如何使用本地开发环境或使用 Python 工作表 来创建这些 UDFs。
请注意,如果通过运行 CREATE FUNCTION
命令定义了 UDF,则可以在 Snowpark 中调用该 UDF。有关详细信息,请参阅 调用用户定义的函数 (UDFs)。
备注
通过矢量化 Python UDFs,您可以定义将批量输入行接收为 Pandas DataFrames 的 Python 函数。这样可以在机器学习推理场景中获得更好的性能。有关更多信息,请参阅 使用矢量化 UDFs。
备注
如果您使用的是 Python 工作表,请在处理程序函数中使用以下示例:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
df_table = session.table("sample_product_data")
如果示例返回 DataFrame 以外的内容(例如 Row
对象的 list
),请 更改返回类型 以匹配示例的返回类型。
运行代码示例后,使用 Results 选项卡查看返回的任何输出。有关更多详细信息,请参阅 运行 Python 工作表。
为 UDF 指定依赖项¶
要使用 Snowpark API 定义 UDF,必须导入包含 UDF 所依赖的任何模块的文件,例如 Python 文件、zip 文件、资源文件等。
要使用 Python 工作表来实现这一点,请参阅 将 Python 文件从暂存区添加到工作表。
要使用本地开发环境执行此操作,必须在代码中调用
Session.add_import()
。
您还可以指定一个目录,Snowpark 库会自动压缩该目录并将其上传为 zip 文件。(有关从 UDF 读取资源的详细信息,请参阅 使用 UDF 读取文件。)
调用 Session.add_import()
时,Snowpark 库将指定的文件上传到内部暂存区,并在执行 UDF 时导入文件。
以下示例演示了如何在暂存区中添加 zip 文件作为代码的依赖项:
# Add a zip file that you uploaded to a stage.
session.add_import("@my_stage/<path>/my_library.zip")
以下示例演示了如何从本地机器添加 Python 文件:
# Import a Python file from your local machine.
session.add_import("/<path>/my_module.py")
# Import a Python file from your local machine and specify a relative Python import path.
session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")
以下示例演示了如何添加其他类型的依赖项:
# Add a directory of resource files.
session.add_import("/<path>/my-resource-dir/")
# Add a resource file.
session.add_import("/<path>/my-resource.xml")
备注
Python Snowpark 库不会自动上传。
您不需要指定以下依赖项:
Python 内置库。
这些库已在执行 UDFs 的服务器上的运行时环境中可用。
在 UDF 中使用 Anaconda 中的第三方包¶
您可以在 UDF 中使用来自 Snowflake Anaconda 通道的第三方包。
如果您在 Python 工作表中创建 Python UDF,则您的工作表已经可以使用 Anaconda 包。请参阅 将 Python 文件从暂存区添加到工作表。
如果您在本地开发环境中创建 Python UDF,则可以指定要安装的 Anaconda 包。
在 Snowflake 仓库中执行调用 Python UDFs 的查询时,为您在虚拟仓库上无缝安装并缓存 Anaconda 包。
有关最佳实践、如何查看可用包以及如何设置本地开发环境的更多信息,请参阅 使用第三方包。
如果在本地开发环境中编写 Python UDF,请使用 session.add_packages
在会话级别添加包。
此代码示例演示如何导入包并返回其版本。
import numpy as np
import pandas as pd
import xgboost as xgb
from snowflake.snowpark.functions import udf
session.add_packages("numpy", "pandas", "xgboost==1.5.0")
@udf
def compute() -> list:
return [np.__version__, pd.__version__, xgb.__version__]
您还可以使用 session.add_requirements
指定包含 要求文件 (https://pip.pypa.io/en/stable/user_guide/#requirements-files) 的包。
session.add_requirements("mydir/requirements.txt")
您可以添加 UDF 级包,以覆盖之前可能添加的会话级包。
import numpy as np
import pandas as pd
import xgboost as xgb
from snowflake.snowpark.functions import udf
@udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
def compute() -> list:
return [np.__version__, pd.__version__, xgb.__version__]
重要
如果未指定包的版本,则 Snowflake 在解析依赖项时使用最新版本。但是,在将 UDF 部署到生产环境时,可能需要确保代码始终使用相同的依赖项版本。您可以对永久和临时 UDFs 执行此操作。
当创建永久 UDF 时,只会创建和注册一次 UDF。这将解析依赖项一次,所选版本将用于生产工作负载。执行 UDF 时,它始终使用相同的依赖项版本。
创建临时 UDF 时,请将依赖项版本指定为版本规范的一部分。这样,在注册 UDF 时,包的解析使用指定的版本。如果未指定版本,则当新版本可用时,可能会更新依赖项。
创建匿名 UDF¶
要创建匿名 UDF,您可以执行以下任一操作:
调用
snowflake.snowpark.functions
模块中的udf
函数,传入匿名函数的定义。调用
UDFRegistration
类中的register
方法,传入匿名函数的定义。
以下是匿名 UDF 的示例:
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udf
add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
备注
编写可能在多个会话中执行的代码时,请使用 register
方法注册 UDFs,而不是使用 udf
函数。这可以防止找不到默认 Snowflake Session
对象的错误。
创建和注册命名的 UDF¶
如果要按名称调用 UDF(例如,通过使用 functions
模块中的 call_udf
函数),则可以创建并注册命名 UDF。要执行此操作,请使用以下内容之一:
UDFRegistration
类中带有name
实参的register
方法。snowflake.snowpark.functions
模块中带有name
实参的udf
函数。
若要访问 Session
类的属性或方法,请调用 udf
类的 属性。
调用 register
或 udf
将创建可在当前会话中使用的临时 UDF。
要创建永久 UDF,请调用 register
方法或 udf
函数,并将 is_permanent
实参设置为 True
。创建永久 UDF 时,还必须将 stage_location
实参设置为上传 UDF 及其依赖项的 Python 文件的暂存区位置。
以下示例演示了如何注册命名的临时 UDF:
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udf
add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
以下示例演示了如何通过将 is_permanent
实参设置为 True
来注册命名的永久 UDF:
@udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
def minus_one(x: int) -> int:
return x-1
以下示例演示了如何调用这些 UDFs:
df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
您还可以使用 SQL 调用 UDF:
session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
从 Python 源文件创建 UDF¶
如果在本地开发环境中创建 UDF,则可以在 Python 文件中定义 UDF 处理程序,然后使用 UDFRegistration
类中的 register_from_file
方法创建 UDF。
备注
您不能在 Python 工作表中使用此方法。
以下是使用 register_from_file
的示例。
假设您有一个 Python 文件 test_udf_file.py
,其中包含:
def mod5(x: int) -> int:
return x % 5
然后您可以从文件 test_udf_file.py
的此函数创建 UDF。
# mod5() in that file has type hints
mod5_udf = session.udf.register_from_file(
file_path="tests/resources/test_udf_dir/test_udf_file.py",
func_name="mod5",
)
session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
您还可以将该文件上传到暂存区位置,然后使用该文件来创建 UDF。
from snowflake.snowpark.types import IntegerType
# suppose you have uploaded test_udf_file.py to stage location @mystage.
mod5_udf = session.udf.register_from_file(
file_path="@mystage/test_udf_file.py",
func_name="mod5",
return_type=IntegerType(),
input_types=[IntegerType()],
)
session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
使用 UDF 读取文件¶
要读取文件的内容,Python 代码可以执行以下操作:
读取静态指定的文件,方法是导入一个文件,然后从 UDF 的主目录中读取该文件。
使用 SnowflakeFile 读取动态指定的文件。如果需要在计算过程中访问文件,则可以执行此操作。
读取静态指定的文件¶
Snowpark 库在服务器上上传和执行 UDFs。如果 UDF 需要从文件中读取数据,您必须确保使用 UDF 上传文件。
备注
如果在 Python 工作表中编写 UDF,则 UDF 只能从暂存区中读取文件。
设置 UDF 以读取文件:
指定文件是依赖项,可将文件上传到服务器。有关更多信息,请参阅 为 UDF 指定依赖项。
例如:
# Import a file from your local machine as a dependency. session.add_import("/<path>/my_file.txt") # Or import a file that you uploaded to a stage as a dependency. session.add_import("@my_stage/<path>/my_file.txt")
在 UDF 中,读取文件。在以下示例中,该文件在 UDF 创建期间只会读取一次,在 UDF 执行期间不会再次读取。这是通过第三方库 缓存工具 (https://pypi.org/project/cachetools/) 来实现的。
import sys import os import cachetools from snowflake.snowpark.types import StringType @cachetools.cached(cache={}) def read_file(filename): import_dir = sys._xoptions.get("snowflake_import_directory") if import_dir: with open(os.path.join(import_dir, filename), "r") as f: return f.read() # create a temporary text file for test temp_file_name = "/tmp/temp.txt" with open(temp_file_name, "w") as t: _ = t.write("snowpark") session.add_import(temp_file_name) session.add_packages("cachetools") def add_suffix(s): return f"{read_file(os.path.basename(temp_file_name))}-{s}" concat_file_content_with_str_udf = session.udf.register( add_suffix, return_type=StringType(), input_types=[StringType()] ) df = session.create_dataframe(["snowflake", "python"], schema=["a"]) df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect()
[Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')]
os.remove(temp_file_name) session.clear_imports()
使用 SnowflakeFile
读取动态指定的文件¶
您可以使用 Snowpark snowflake.snowpark.files
模块中的 SnowflakeFile
类从暂存区读取文件。SnowflakeFile
类提供动态文件访问权限,允许您流式传输任何大小的文件。当您想要迭代多个文件时,动态文件访问也非常有用。例如,请参阅 处理多个文件。
有关使用 SnowflakeFile
读取文件的更多信息和示例,请参阅 使用 Python UDF 处理程序 中的 SnowflakeFile 类读取文件。
以下示例注册了一个临时 UDF,该过程使用 SnowflakeFile
从暂存区读取文本文件并返回文件长度。
注册 UDF:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
调用 UDF:
session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
从 Snowpark Python UDFs 和 UDTFs 写入文件¶
借助 Snowpark Python,您现在可以使用用户定义的函数 (UDFs)、矢量化 UDFs、用户定义的表函数 (UDTFs) 和矢量化 UDTFs 将文件写入到暂存区。在函数处理程序中,您可以使用 SnowflakeFile API 打开并写入文件。当您通过函数返回文件时,该文件将与查询结果一起写入。
用于写入文件的简单 UDF 可能如下所示:
CREATE OR REPLACE FUNCTION write_file()
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'write_file'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
def write_file():
file = SnowflakeFile.open_new_result("w") # Open a new result file
file.write("Hello world") # Write data
return file # File must be returned
$$;
执行此 UDF 将为您提供一个引用结果文件的作用域 URL。
访问结果文件¶
文件处理程序作为作用域 URL 返回到调用 UDF 的查询。您可以使用此特定作用域的 URL 从 Snowflake 内部访问文件(通过其他 UDF 或 COPY FILES 命令),但不能从 Snowflake 外部以预签名的 URL 形式访问文件。作用域 URL 有效期为 24 小时。
UDF 返回文件后,您可以使用以下任意存储工具访问该文件,具体取决于您的用例:
COPY FILES:将文件复制到其他暂存区位置。复制文件后,您可以像使用典型的暂存文件一样使用该文件,例如使用以下工具:
目录表:如有必要,请使用 WHERE 子句查询暂存区上的文件列表来进行筛选。
GET_PRESIGNED_URL:生成指向 @stage/file 的 URL。
外部暂存区:通过云提供商 APIs 在 Snowflake 外部访问文件。
UDF:在另一个查询中读取文件。
例如,您可以使用以下 SQL 语法从 UDF 查询结果中 COPY FILES 到内部或外部暂存区:
COPY FILES INTO @[<namespace>.]<stage_name>[/<path>/]
FROM ( SELECT <existing_url> [ , <new_filename> ] FROM ... )
[ DETAILED_OUTPUT = { TRUE | FALSE } ]
限制¶
此功能不适用于 Java 或 Scala。
存储过程也支持文件写入,但不能轻松地用 COPY FILES 命令进行链接。因此,对于使用存储过程的文件写入,建议使用文件暂存 PUT 命令。
示例¶
本部分包含的代码示例展示了如何将文件写入不同用例的暂存区。
文件转换¶
下面是一个转换文件的 UDF 处理程序示例。您可以修改以下示例以执行不同类型的文件转换,例如:
从一种文件格式转换为另一种格式。
调整镜像大小。
将文件转换为相同或不同桶中带时间戳格式的文件夹中的“黄金状态”。
CREATE OR REPLACE FUNCTION convert_to_foo(filename string)
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'convert_to_foo'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
def convert_to_foo(filename):
input_file = SnowflakeFile.open(filename, "r")
converted_file = SnowflakeFile.open_new_result("w")
# Foo-type is just adding foo at the end of every line
for line in input_file.readlines():
converted_file.write(line[:-1] + 'foo' + '\n')
return converted_file
$$;
您可以在查询中调用此 UDF,然后访问由 UDF 写入的 converted_file
结果文件。
以下 SQL 示例展示了您可以对 UDFs 返回的结果文件执行哪些操作,例如将结果文件复制到暂存区,或在后续查询或其他 UDF 中使用结果文件。这些基本 SQL 模式适用于本主题中包含的任何 UDF 文件写入示例。例如,您可以将预签名的 URL 查询用于以下任何 UDF 示例,方法是使用该查询来代替其他 SQL 语句。
示例 1:转换单个文件并将其复制到最终暂存区¶
COPY FILES INTO @output_stage FROM
(SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, 'in.txt')), 'out.foo.txt');
示例 2:转换文件表并将其复制到最终暂存区¶
CREATE TABLE files_to_convert(file string);
-- Populate files_to_convert with input files:
INSERT INTO files_to_convert VALUES ('file1.txt');
INSERT INTO files_to_convert VALUES ('file2.txt');
COPY FILES INTO @output_stage FROM
(SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file)),
REPLACE(file, '.txt', '.foo.txt') FROM files_to_convert);
示例 3:转换暂存区中的所有文件并将其复制到最终暂存区¶
COPY FILES INTO @output_stage FROM
(SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, RELATIVE_PATH)),
REPLACE(RELATIVE_PATH, 'format1', 'format2') FROM DIRECTORY(@input_stage));
示例 4:转换表中的所有文件,并在不复制的情况下读取这些文件¶
-- A basic UDF to read a file:
CREATE OR REPLACE FUNCTION read_udf(filename string)
RETURNS STRING
LANGUAGE PYTHON
VOLATILE
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'read'
AS
'
from snowflake.snowpark.files import SnowflakeFile
def read(filename):
return SnowflakeFile.open(filename, "r").read()
';
-- Create files_to_convert as in Example 2.
SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file)) as new_file
FROM files_to_convert;
-- The following query must be run within 24 hours from the prior one
SELECT read_udf(new_file) FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()));
示例 5:转换表中的所有文件,并通过 UDF 立即读取这些文件¶
-- Set up files_to_convert as in Example 2.
-- Set up read_udf as in Example 4.
SELECT read_udf(
convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file))) FROM files_to_convert;
示例 6:使用预签名的 URLs 进行读取¶
以下示例仅适用于进行了服务器端加密的暂存区。默认情况下,内部暂存区进行了客户端加密。
COPY FILES INTO @output_stage FROM
(SELECT convert_to_foo(BUILD_SCOPED_FILE_URL(@input_stage, file)) FROM files_to_convert);
-- Refresh the directory to get new files in output_stage.
ALTER STAGE output_stage REFRESH;
从表数据的分区中创建 PDF,并将其复制到最终位置¶
以下 UDF 处理程序示例对输入数据进行分区,并为每个数据分区编写 PDF 报告。以下示例根据 location
字符串对报告进行分区。
您还可以修改此示例以写入其他类型的文件,例如每个分区的 ML 模型和其他自定义格式。
-- Create a stage that includes the font (for PDF creation)
CREATE OR REPLACE STAGE fonts
URL = 's3://sfquickstarts/misc/';
-- UDF to write the data
CREATE OR REPLACE FUNCTION create_report_pdf(data string)
RETURNS TABLE (file string)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER='CreateReport'
PACKAGES = ('snowflake-snowpark-python', 'fpdf')
IMPORTS = ('@fonts/DejaVuSans.ttf')
AS $$
from snowflake.snowpark.files import SnowflakeFile
from fpdf import FPDF
import shutil
import sys
import uuid
import_dir = sys._xoptions["snowflake_import_directory"]
class CreateReport:
def __init__(self):
self.pdf = None
def process(self, data):
if self.pdf == None:
# PDF library edits this file, make sure it's unique.
font_file = f'/tmp/DejaVuSans-{uuid.uuid4()}.ttf'
shutil.copy(f'{import_dir}/DejaVuSans.ttf', font_file)
self.pdf = FPDF()
self.pdf.add_page()
self.pdf.add_font('DejaVu', '', font_file, uni=True)
self.pdf.set_font('DejaVu', '', 14)
self.pdf.write(8, data)
self.pdf.ln(8)
def end_partition(self):
f = SnowflakeFile.open_new_result("wb")
f.write(self.pdf.output(dest='S').encode('latin-1'))
yield f,
$$;
以下 SQL 示例首先使用虚构数据设置 reportData
表,并创建 output_stage
暂存区。然后,调用 create_report_pdf
UDF,以使用从 reportData
表查询的数据创建 PDF 文件。在同一 SQL 语句中,COPY FILES 命令将结果文件从 UDF 复制到 output_stage
。
备注
我们使用服务器端加密的 (SSE) 输出暂存区,因为 SSE 暂存区上文件的预签名 URL 将下载未加密的文件。通常,我们建议在暂存区上使用默认暂存区加密,因为文件是客户端加密的,更安全。正常暂存区上的文件仍可通过 UDFs 或 GET/PUT 读取,只是不能通过预签名的 URLs 读取。在生产环境中使用 SSE 暂存区之前,请确保了解安全影响。
-- Fictitious data
CREATE OR REPLACE TABLE reportData(location string, item string);
INSERT INTO reportData VALUES ('SanMateo' ,'Item A');
INSERT INTO reportData VALUES ('SanMateo' ,'Item Z');
INSERT INTO reportData VALUES ('SanMateo' ,'Item X');
INSERT INTO reportData VALUES ('Bellevue' ,'Item B');
INSERT INTO reportData VALUES ('Bellevue' ,'Item Q');
-- Presigned URLs only work with SSE stages, see note above.
CREATE OR REPLACE STAGE output_stage ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
COPY FILES INTO @output_stage
FROM (SELECT reports.file, location || '.pdf'
FROM reportData, TABLE(create_report_pdf(item)
OVER (partition BY location)) AS reports);
-- Check the results
LIST @output_stage;
SELECT GET_PRESIGNED_URL(@output_stage, 'SanMateo.pdf');
拆分文件并将其卸载到多个表中¶
以下 UDF 处理程序示例根据每行的第一个字符逐行拆分 CSV 文件。然后,UDF 将拆分的文件卸载到多个表中。
CREATE OR REPLACE FUNCTION split_file(path string)
RETURNS TABLE(file string, name string)
LANGUAGE PYTHON
VOLATILE
PACKAGES = ('snowflake-snowpark-python')
RUNTIME_VERSION = 3.9
HANDLER = 'SplitCsvFile'
AS $$
import csv
from snowflake.snowpark.files import SnowflakeFile
class SplitCsvFile:
def process(self, file):
toTable1 = SnowflakeFile.open_new_result("w")
toTable1Csv = csv.writer(toTable1)
toTable2 = SnowflakeFile.open_new_result("w")
toTable2Csv = csv.writer(toTable2)
toTable3 = SnowflakeFile.open_new_result("w")
toTable3Csv = csv.writer(toTable3)
with SnowflakeFile.open(file, 'r') as file:
# File is of the format 1:itemA \n 2:itemB \n [...]
for line in file.readlines():
forTable = line[0]
if (forTable == "1"):
toTable1Csv.writerow([line[2:-1]])
if (forTable == "2"):
toTable2Csv.writerow([line[2:-1]])
if (forTable == "3"):
toTable3Csv.writerow([line[2:-1]])
yield toTable1, 'table1.csv'
yield toTable2, 'table2.csv'
yield toTable3, 'table3.csv'
$$;
-- Create a stage with access to an import file.
CREATE OR REPLACE STAGE staged_files url="s3://sfquickstarts/misc/";
-- Add the files to be split into a table - we just add one.
CREATE OR REPLACE TABLE filesToSplit(path string);
INSERT INTO filesToSplit VALUES ( 'items.txt');
-- Create output tables
CREATE OR REPLACE TABLE table1(item string);
CREATE OR REPLACE TABLE table2(item string);
CREATE OR REPLACE TABLE table3(item string);
-- Create output stage
CREATE OR REPLACE stage output_stage;
-- Creates files named path-tableX.csv
COPY FILES INTO @output_stage FROM
(SELECT file, path || '-' || name FROM filesToSplit, TABLE(split_file(build_scoped_file_url(@staged_files, path))));
-- We use pattern and COPY INTO (not COPY FILES INTO) to upload to a final table.
COPY INTO table1 FROM @output_stage PATTERN = '.*.table1.csv';
COPY INTO table2 FROM @output_stage PATTERN = '.*.table2.csv';
COPY INTO table3 FROM @output_stage PATTERN = '.*.table3.csv';
-- See results
SELECT * from table1;
SELECT * from table2;
SELECT * from table3;
使用矢量化 UDFs¶
通过矢量化 Python UDFs,您可以定义 Python 函数,该函数接收批量输入行作为 Pandas DataFrames (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html),并返回批量结果作为 Pandas 数组 (https://pandas.pydata.org/docs/reference/api/pandas.array.html) 或 Series (https://pandas.pydata.org/docs/reference/series.html)。Snowpark dataframe
中的列将被矢量化为 UDF 内的 Pandas Series。
以下示例演示了如何使用批处理接口:
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)
@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
# The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
df.columns = ["col1", "col2", "col3", "col4"]
return model.predict(df)
您调用矢量化 Python UDFs 的方式与调用其他 Python UDFs 的方式相同。有关更多信息,请参阅 矢量化 Python UDFs,其中说明了如何使用 SQL 语句创建矢量化 UDF。例如,当在 SQL 语句中指定 Python 代码时,可以使用 vectorized
装饰器。通过使用本文档中描述的 Snowpark Python API,您不必使用 SQL 语句来创建矢量化 UDF。所以不必使用 vectorized
装饰器。
可以限制每个批次的行数。有关更多信息,请参阅 设置目标批次大小。
有关使用 Snowpark Python API 创建矢量化 UDFs 的更多说明和示例,请参阅 Snowpark API 参考 <https://docs.snowflake.cn/en/developer-guide/snowpark/reference/python/latest/api/snowflake.snowpark.udf.UDFRegistration>`_ 的 `UDFs 部分。