在 Python 中为 DataFrames 创建用户定义函数 (UDFs)¶
Snowpark API 提供了一些方法,您可以使用这些方法在 Python 中通过 Lambda 或函数创建用户定义函数。本主题说明如何创建这些类型的函数。
本主题内容:
简介¶
使用 Snowpark,您可以为自定义 lambda 和函数创建用户定义函数 (UDFs),并且可以调用这些 UDFs 来处理 DataFrame 中的数据。
当您使用 Snowpark API 创建 UDF 时,Snowpark 库会将函数的代码上传到内部暂存区。当您调用 UDF 时,Snowpark 库会在数据所在的服务器上执行您的函数。因此,不需要将数据传输到客户端,函数就可以处理数据。
在自定义代码中,您还可以从 Python 文件或第三方包导入模块。
可以通过以下两种方式之一为自定义代码创建 UDF:
您可以 创建匿名 UDF 并将该函数赋值给变量。只要此变量在作用域中,您就可以使用此变量调用 UDF。
您可以 创建命名 UDF,并按名称调用 UDF。例如,如果需要按名称调用 UDF 或在后续会话中使用 UDF,您可以使用此方式。
接下来的章节将介绍如何使用本地开发环境或使用 Python 工作表 来创建这些 UDFs。
请注意,如果通过运行 CREATE FUNCTION
命令定义了 UDF,则可以在 Snowpark 中调用该 UDF。有关详细信息,请参阅 调用用户定义的函数 (UDFs)。
备注
通过矢量化 Python UDFs,您可以定义将批量输入行接收为 Pandas DataFrames 的 Python 函数。这样可以在机器学习推理场景中获得更好的性能。有关更多信息,请参阅 使用矢量化 UDFs。
备注
如果您使用的是 Python 工作表,请在处理程序函数中使用以下示例:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
df_table = session.table("sample_product_data")
如果示例返回 DataFrame 以外的内容(例如 Row
对象的 list
),请 更改返回类型 以匹配示例的返回类型。
运行代码示例后,使用 Results 选项卡查看返回的任何输出。有关更多详细信息,请参阅 运行 Python 工作表。
为 UDF 指定依赖项¶
要使用 Snowpark API 定义 UDF,必须导入包含 UDF 所依赖的任何模块的文件,例如 Python 文件、zip 文件、资源文件等。
要使用 Python 工作表来实现这一点,请参阅 将 Python 文件从暂存区添加到工作表。
要使用本地开发环境执行此操作,必须在代码中调用
Session.add_import()
。
您还可以指定一个目录,Snowpark 库会自动压缩该目录并将其上传为 zip 文件。(有关从 UDF 读取资源的详细信息,请参阅 使用 UDF 读取文件。)
调用 Session.add_import()
时,Snowpark 库将指定的文件上传到内部暂存区,并在执行 UDF 时导入文件。
以下示例演示了如何在暂存区中添加 zip 文件作为代码的依赖项:
>>> # Add a zip file that you uploaded to a stage.
>>> session.add_import("@my_stage/<path>/my_library.zip")
以下示例演示了如何从本地机器添加 Python 文件:
>>> # Import a Python file from your local machine.
>>> session.add_import("/<path>/my_module.py")
>>> # Import a Python file from your local machine and specify a relative Python import path.
>>> session.add_import("/<path>/my_module.py", import_path="my_dir.my_module")
以下示例演示了如何添加其他类型的依赖项:
>>> # Add a directory of resource files.
>>> session.add_import("/<path>/my-resource-dir/")
>>> # Add a resource file.
>>> session.add_import("/<path>/my-resource.xml")
备注
Python Snowpark 库不会自动上传。
您不需要指定以下依赖项:
Python 内置库。
这些库已在执行 UDFs 的服务器上的运行时环境中可用。
在 UDF 中使用 Anaconda 中的第三方包¶
您可以在 UDF 中使用来自 Snowflake Anaconda 通道的第三方包。
如果您在 Python 工作表中创建 Python UDF,则您的工作表已经可以使用 Anaconda 包。请参阅 将 Python 文件从暂存区添加到工作表。
如果您在本地开发环境中创建 Python UDF,则可以指定要安装的 Anaconda 包。
在 Snowflake 仓库中执行调用 Python UDFs 的查询时,为您在虚拟仓库上无缝安装并缓存 Anaconda 包。
有关最佳实践、如何查看可用包以及如何设置本地开发环境的更多信息,请参阅 使用第三方包。
如果在本地开发环境中编写 Python UDF,请使用 session.add_packages
在会话级别添加包。
此代码示例演示如何导入包并返回其版本。
>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf
>>> session.add_packages("numpy", "pandas", "xgboost==1.5.0")
>>> @udf
... def compute() -> list:
... return [np.__version__, pd.__version__, xgb.__version__]
您还可以使用 session.add_requirements
指定包含 要求文件 (https://pip.pypa.io/en/stable/user_guide/#requirements-files) 的包。
>>> session.add_requirements("mydir/requirements.txt")
您可以添加 UDF 级包,以覆盖之前可能添加的会话级包。
>>> import numpy as np
>>> import pandas as pd
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import udf
>>> @udf(packages=["numpy", "pandas", "xgboost==1.5.0"])
... def compute() -> list:
... return [np.__version__, pd.__version__, xgb.__version__]
重要
如果未指定包的版本,则 Snowflake 在解析依赖项时使用最新版本。但是,在将 UDF 部署到生产环境时,可能需要确保代码始终使用相同的依赖项版本。您可以对永久和临时 UDFs 执行此操作。
当创建永久 UDF 时,只会创建和注册一次 UDF。这将解析依赖项一次,所选版本将用于生产工作负载。执行 UDF 时,它始终使用相同的依赖项版本。
创建临时 UDF 时,请将依赖项版本指定为版本规范的一部分。这样,在注册 UDF 时,包的解析使用指定的版本。如果未指定版本,则当新版本可用时,可能会更新依赖项。
创建匿名 UDF¶
要创建匿名 UDF,您可以执行以下任一操作:
调用
snowflake.snowpark.functions
模块中的udf
函数,传入匿名函数的定义。调用
UDFRegistration
类中的register
方法,传入匿名函数的定义。
以下是匿名 UDF 的示例:
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()])
备注
编写可能在多个会话中执行的代码时,请使用 register
方法注册 UDFs,而不是使用 udf
函数。这可以防止找不到默认 Snowflake Session
对象的错误。
创建和注册命名的 UDF¶
如果要按名称调用 UDF(例如,通过使用 functions
模块中的 call_udf
函数),则可以创建并注册命名 UDF。要执行此操作,请使用以下内容之一:
UDFRegistration
类中带有name
实参的register
方法。snowflake.snowpark.functions
模块中带有name
实参的udf
函数。
要访问 UDFRegistration
类的属性或方法,请调用 Session
类的 udf
属性。
调用 register
或 udf
将创建可在当前会话中使用的临时 UDF。
要创建永久 UDF,请调用 register
方法或 udf
函数,并将 is_permanent
实参设置为 True
。创建永久 UDF 时,还必须将 stage_location
实参设置为上传 UDF 及其依赖项的 Python 文件的暂存区位置。
以下示例演示了如何注册命名的临时 UDF:
>>> from snowflake.snowpark.types import IntegerType
>>> from snowflake.snowpark.functions import udf
>>> add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()], name="my_udf", replace=True)
以下示例演示了如何通过将 is_permanent
实参设置为 True
来注册命名的永久 UDF:
>>> @udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
... def minus_one(x: int) -> int:
... return x-1
以下示例演示了如何调用这些 UDFs:
>>> df = session.create_dataframe([[1, 2], [3, 4]]).to_df("a", "b")
>>> df.select(add_one("a"), minus_one("b")).collect()
[Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]
您还可以使用 SQL 调用 UDF:
>>> session.sql("select minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
从 Python 源文件创建 UDF¶
如果在本地开发环境中创建 UDF,则可以在 Python 文件中定义 UDF 处理程序,然后使用 UDFRegistration
类中的 register_from_file
方法创建 UDF。
备注
您不能在 Python 工作表中使用此方法。
以下是使用 register_from_file
的示例。
假设您有一个 Python 文件 test_udf_file.py
,其中包含:
def mod5(x: int) -> int:
return x % 5
然后您可以从文件 test_udf_file.py
的此函数创建 UDF。
>>> # mod5() in that file has type hints
>>> mod5_udf = session.udf.register_from_file(
... file_path="tests/resources/test_udf_dir/test_udf_file.py",
... func_name="mod5",
... )
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
您还可以将该文件上传到暂存区位置,然后使用该文件来创建 UDF。
>>> from snowflake.snowpark.types import IntegerType
>>> # suppose you have uploaded test_udf_file.py to stage location @mystage.
>>> mod5_udf = session.udf.register_from_file(
... file_path="@mystage/test_udf_file.py",
... func_name="mod5",
... return_type=IntegerType(),
... input_types=[IntegerType()],
... )
>>> session.range(1, 8, 2).select(mod5_udf("id")).to_df("col1").collect()
[Row(COL1=1), Row(COL1=3), Row(COL1=0), Row(COL1=2)]
使用 UDF 读取文件¶
要读取文件的内容,Python 代码可以执行以下操作:
读取静态指定的文件,方法是导入一个文件,然后从 UDF 的主目录中读取该文件。
使用 SnowflakeFile 读取动态指定的文件。如果需要在计算过程中访问文件,则可以执行此操作。
读取静态指定的文件¶
Snowpark 库在服务器上上传和执行 UDFs。如果 UDF 需要从文件中读取数据,您必须确保使用 UDF 上传文件。
备注
如果在 Python 工作表中编写 UDF,则 UDF 只能从暂存区中读取文件。
设置 UDF 以读取文件:
指定文件是依赖项,可将文件上传到服务器。有关更多信息,请参阅 为 UDF 指定依赖项。
例如:
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
在 UDF 中,读取文件。在以下示例中,该文件在 UDF 创建期间只会读取一次,在 UDF 执行期间不会再次读取。这是通过第三方库 缓存工具 (https://pypi.org/project/cachetools/) 来实现的。
>>> import sys >>> import os >>> import cachetools >>> from snowflake.snowpark.types import StringType >>> @cachetools.cached(cache={}) ... def read_file(filename): ... import_dir = sys._xoptions.get("snowflake_import_directory") ... if import_dir: ... with open(os.path.join(import_dir, filename), "r") as f: ... return f.read() >>> >>> # create a temporary text file for test >>> temp_file_name = "/tmp/temp.txt" >>> with open(temp_file_name, "w") as t: ... _ = t.write("snowpark") >>> session.add_import(temp_file_name) >>> session.add_packages("cachetools") >>> >>> def add_suffix(s): ... return f"{read_file(os.path.basename(temp_file_name))}-{s}" >>> >>> concat_file_content_with_str_udf = session.udf.register( ... add_suffix, ... return_type=StringType(), ... input_types=[StringType()] ... ) >>> >>> df = session.create_dataframe(["snowflake", "python"], schema=["a"]) >>> df.select(concat_file_content_with_str_udf("a")).to_df("col1").collect() [Row(COL1='snowpark-snowflake'), Row(COL1='snowpark-python')] >>> os.remove(temp_file_name) >>> session.clear_imports()
使用 SnowflakeFile
读取动态指定的文件¶
您可以使用 Snowpark snowflake.snowpark.files
模块中的 SnowflakeFile
类从暂存区读取文件。SnowflakeFile
类提供动态文件访问权限,允许您流式传输任何大小的文件。当您想要迭代多个文件时,动态文件访问也非常有用。例如,请参阅 处理多个文件。
有关使用 SnowflakeFile
读取文件的更多信息和示例,请参阅 使用 Python UDF 处理程序 中的 SnowflakeFile 类读取文件。
以下示例注册了一个临时 UDF,该过程使用 SnowflakeFile
从暂存区读取文本文件并返回文件长度。
注册 UDF:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@udf(name="get_file_length", replace=True, input_types=[StringType()], return_type=IntegerType(), packages=['snowflake-snowpark-python'])
def get_file_length(file_path):
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
调用 UDF:
session.sql("select get_file_length(build_scoped_file_url(@my_stage, 'example-file.txt'));")
使用矢量化 UDFs¶
通过矢量化 Python UDFs,您可以定义 Python 函数,该函数接收批量输入行作为 Pandas DataFrames (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html),并返回批量结果作为 Pandas 数组 (https://pandas.pydata.org/docs/reference/api/pandas.array.html) 或 Series (https://pandas.pydata.org/docs/reference/series.html)。Snowpark dataframe
中的列将被矢量化为 UDF 内的 Pandas Series。
以下示例演示了如何使用批处理接口:
from sklearn.linear_model import LinearRegression
model = LinearRegression()
model.fit(X, y)
@udf(packages=['pandas', 'scikit-learn','xgboost'])
def predict(df: PandasDataFrame[float, float, float, float]) -> PandasSeries[float]:
# The input pandas DataFrame doesn't include column names. Specify the column names explicitly when needed.
df.columns = ["col1", "col2", "col3", "col4"]
return model.predict(df)
您调用矢量化 Python UDFs 的方式与调用其他 Python UDFs 的方式相同。有关更多信息,请参阅 矢量化 Python UDFs,其中说明了如何使用 SQL 语句创建矢量化 UDF。例如,当在 SQL 语句中指定 Python 代码时,可以使用 vectorized
装饰器。通过使用本文档中描述的 Snowpark Python API,您不必使用 SQL 语句来创建矢量化 UDF。所以不必使用 vectorized
装饰器。
可以限制每个批次的行数。有关更多信息,请参阅 设置目标批次大小。
有关使用 Snowpark Python API 创建矢量化 UDFs 的更多说明和示例,请参阅 Snowpark API 参考 <https://docs.snowflake.cn/en/developer-guide/snowpark/reference/python/latest/api/snowflake.snowpark.udf.UDFRegistration>`_ 的 `UDFs 部分。