在 Python 中为 DataFrames 创建存储过程¶
Snowpark API 提供了可用于在 Python 中创建存储过程的方法。本主题说明如何创建存储过程。
本主题内容:
简介¶
使用 Snowpark,您可以为自定义 lambda 和函数创建存储过程,并且可以调用这些存储过程来处理 DataFrame 中的数据。
您可以创建仅存在于当前会话中的存储过程(临时存储过程),以及可在其他会话中使用的存储过程(永久存储过程)。
在存储过程中使用 Anaconda 中的第三方包¶
您可以在创建 Python 存储过程时指定要安装的 Anaconda 包。在 Snowflake 仓库中调用 Python 存储过程时,为您在虚拟仓库上无缝安装并缓存 Anaconda 包。有关最佳实践、如何查看可用包以及如何设置本地开发环境的更多信息,请参阅 使用第三方包。
使用 session.add_packages
在会话级别添加包。
此代码示例演示如何导入包并返回其版本。
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> session.add_packages("snowflake-snowpark-python", "pandas", "xgboost==1.5.0")
>>> @sproc
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
您还可以使用 session.add_requirements
指定包含 `要求文件 <https://pip.pypa.io/en/stable/user_guide/#requirements-files (https://pip.pypa.io/en/stable/user_guide/#requirements-files)>`_的包。
>>> session.add_requirements("mydir/requirements.txt")
您可以添加存储过程级包,以覆盖之前可能添加的会话级包。
>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(packages=["snowflake-snowpark-python", "pandas", "xgboost==1.5.0"])
... def compute(session: snowflake.snowpark.Session) -> list:
... return [pd.__version__, xgb.__version__]
重要
如果未指定包的版本,则 Snowflake 将在解析依赖项时使用最新版本。但是,在将存储过程部署到生产环境时,可能需要确保代码始终使用相同的依赖项版本。您可以对永久存储过程和临时存储过程执行此操作。
创建永久存储过程时,仅创建和注册一次存储过程。这将解析依赖项一次,所选版本将用于生产工作负载。当存储过程执行时,它将始终使用相同的依赖项版本。
创建临时存储过程时,请将依赖项版本指定为版本规范的一部分。这样,在注册存储过程时,包的解析将使用指定的版本。如果未指定版本,则当新版本可用时,可能会更新依赖项。
创建匿名存储过程¶
要创建匿名存储过程,您可以执行以下任一操作:
调用
snowflake.snowpark.functions
模块中的sproc
函数,传入匿名函数的定义。调用
StoredProcedureRegistration
类中的register
方法,传入匿名函数的定义。若要访问StoredProcedureRegistration
类的属性或方法,请调用Session
类的sproc
属性。
以下是匿名存储过程的示例:
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0], return_type=IntegerType(), input_types=[IntegerType()], packages=["snowflake-snowpark-python"])
备注
编写可能在多个会话中执行的代码时,请使用 register
方法注册存储过程,而不是使用 sproc
函数。这可以防止找不到默认 Snowflake Session
对象的错误。
创建和注册命名存储过程¶
如果要按名称调用存储过程(例如,通过使用 Session
对象中的 call
函数),则可以创建并注册命名存储过程。要做到这一点,可以执行以下任一操作:
调用
snowflake.snowpark.functions
模块中的sproc
函数,传入匿名函数的name
实参和定义。调用
StoredProcedureRegistration
类中的register
方法,传入匿名函数的name
实参和定义。若要访问StoredProcedureRegistration
类的属性或方法,请调用Session
类的sproc
属性。
调用 register
或 sproc
将创建可在当前会话中使用的临时存储过程。
要创建永久存储过程,请调用 register
方法或 sproc
函数,并将 is_permanent
实参设置为 True
。创建永久存储过程时,还必须将 stage_location
实参设置为 Snowpark 使用的 Python Connector 上传存储过程及其依赖项的 Python 文件的暂存区位置。
以下示例演示了如何注册命名的临时存储过程:
>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType
>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()], name="my_sproc", replace=True,
packages=["snowflake-snowpark-python"])
以下示例演示了如何通过将 is_permanent
实参设置为 True
来注册命名的永久存储过程:
>>> import snowflake.snowpark
>>> from snowflake.snowpark.functions import sproc
>>> @sproc(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True, packages=["snowflake-snowpark-python"])
... def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
... return session.sql(f"select {x} - 1").collect()[0][0]
以下示例演示了如何调用这些存储过程:
>>> add_one(1)
2
>>> session.call("minus_one", 1)
0
>>> session.sql("call minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
使用存储过程读取文件¶
要使用存储过程读取文件的内容,可以执行以下操作:
读取静态指定的文件,方法是导入一个文件,然后从存储过程的主目录中读取该文件。
使用 SnowflakeFile 读取动态指定的文件。如果需要在计算过程中访问文件,则可以执行此操作。
读取静态指定的文件¶
指定文件是依赖项,可将文件上传到服务器。这与 UDFs 的完成方式相同。有关更多信息,请参阅 为 UDF 指定依赖项。
例如:
>>> # Import a file from your local machine as a dependency. >>> session.add_import("/<path>/my_file.txt") >>> # Or import a file that you uploaded to a stage as a dependency. >>> session.add_import("@my_stage/<path>/my_file.txt")
在存储过程中,读取文件。
>>> def read_file(name: str) -> str: ... import sys ... IMPORT_DIRECTORY_NAME = "snowflake_import_directory" ... import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME] ... ... with open(import_dir + 'my_file.txt', 'r') as file: ... return file.read()
使用 SnowflakeFile
读取动态指定的文件¶
您可以使用 Snowpark snowflake.snowpark.files
模块中的 SnowflakeFile
类从暂存区读取文件。SnowflakeFile
类提供动态文件访问权限,允许您流式传输任何大小的文件。当您想要迭代多个文件时,动态文件访问也非常有用。例如,请参阅 处理多个文件。
有关使用 SnowflakeFile
读取文件的更多信息和示例,请参阅 使用 Python UDF 处理程序 中的 SnowflakeFile 类读取文件。
以下示例创建了一个永久存储过程,该过程使用 SnowflakeFile
从暂存区读取文件并返回文件长度。
创建存储过程:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType
@sproc(name="calc_size", is_permanent=True, stage_location="@my_procedures", replace=True, packages=["snowflake-snowpark-python"])
def calc_size(ignored_session: snowpark.Session, file_path: str) -> int:
with SnowflakeFile.open(file_path) as f:
s = f.read()
return len(s);
调用存储过程:
file_size = session.sql("call calc_size(build_scoped_file_url('@my_stage', 'my_file.csv'))")