在 Python 中为 DataFrames 创建存储过程

Snowpark API 提供了可用于在 Python 中创建存储过程的方法。本主题说明如何创建存储过程。

本主题内容:

简介

使用 Snowpark,您可以为自定义 lambda 和函数创建存储过程,并且可以调用这些存储过程来处理 DataFrame 中的数据。

您可以创建仅存在于当前会话中的存储过程(临时存储过程),以及可在其他会话中使用的存储过程(永久存储过程)。

在存储过程中使用 Anaconda 中的第三方包

您可以在创建 Python 存储过程时指定要安装的 Anaconda 包。在 Snowflake 仓库中调用 Python 存储过程时,为您在虚拟仓库上无缝安装并缓存 Anaconda 包。有关最佳实践、如何查看可用包以及如何设置本地开发环境的更多信息,请参阅 使用第三方包

使用 session.add_packages 在会话级别添加包。

此代码示例演示如何导入包并返回其版本。

>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc

>>> session.add_packages("snowflake-snowpark-python", "pandas", "xgboost==1.5.0")

>>> @sproc
... def compute(session: snowflake.snowpark.Session) -> list:
...   return [pd.__version__, xgb.__version__]
Copy

您还可以使用 session.add_requirements 指定包含 `要求文件 <https://pip.pypa.io/en/stable/user_guide/#requirements-files (https://pip.pypa.io/en/stable/user_guide/#requirements-files)>`_的包。

>>> session.add_requirements("mydir/requirements.txt")
Copy

您可以添加存储过程级包,以覆盖之前可能添加的会话级包。

>>> import pandas as pd
>>> import snowflake.snowpark
>>> import xgboost as xgb
>>> from snowflake.snowpark.functions import sproc

>>> @sproc(packages=["snowflake-snowpark-python", "pandas", "xgboost==1.5.0"])
... def compute(session: snowflake.snowpark.Session) -> list:
...    return [pd.__version__, xgb.__version__]
Copy

重要

如果未指定包的版本,则 Snowflake 将在解析依赖项时使用最新版本。但是,在将存储过程部署到生产环境时,可能需要确保代码始终使用相同的依赖项版本。您可以对永久存储过程和临时存储过程执行此操作。

  • 创建永久存储过程时,仅创建和注册一次存储过程。这将解析依赖项一次,所选版本将用于生产工作负载。当存储过程执行时,它将始终使用相同的依赖项版本。

  • 创建临时存储过程时,请将依赖项版本指定为版本规范的一部分。这样,在注册存储过程时,包的解析将使用指定的版本。如果未指定版本,则当新版本可用时,可能会更新依赖项。

创建匿名存储过程

要创建匿名存储过程,您可以执行以下任一操作:

  • 调用 snowflake.snowpark.functions 模块中的 sproc 函数,传入匿名函数的定义。

  • 调用 StoredProcedureRegistration 类中的 register 方法,传入匿名函数的定义。若要访问 StoredProcedureRegistration 类的属性或方法,请调用 Session 类的 sproc 属性。

以下是匿名存储过程的示例:

>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType

>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0], return_type=IntegerType(), input_types=[IntegerType()], packages=["snowflake-snowpark-python"])
Copy

备注

编写可能在多个会话中执行的代码时,请使用 register 方法注册存储过程,而不是使用 sproc 函数。这可以防止找不到默认 Snowflake Session 对象的错误。

创建和注册命名存储过程

如果要按名称调用存储过程(例如,通过使用 Session 对象中的 call 函数),则可以创建并注册命名存储过程。要做到这一点,可以执行以下任一操作:

  • 调用 snowflake.snowpark.functions 模块中的 sproc 函数,传入匿名函数的 name 实参和定义。

  • 调用 StoredProcedureRegistration 类中的 register 方法,传入匿名函数的 name 实参和定义。若要访问 StoredProcedureRegistration 类的属性或方法,请调用 Session 类的 sproc 属性。

调用 registersproc 将创建可在当前会话中使用的临时存储过程。

要创建永久存储过程,请调用 register 方法或 sproc 函数,并将 is_permanent 实参设置为 True。创建永久存储过程时,还必须将 stage_location 实参设置为 Snowpark 使用的 Python Connector 上传存储过程及其依赖项的 Python 文件的暂存区位置。

以下示例演示了如何注册命名的临时存储过程:

>>> from snowflake.snowpark.functions import sproc
>>> from snowflake.snowpark.types import IntegerType

>>> add_one = sproc(lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()], name="my_sproc", replace=True,
packages=["snowflake-snowpark-python"])
Copy

以下示例演示了如何通过将 is_permanent 实参设置为 True 来注册命名的永久存储过程:

>>> import snowflake.snowpark
>>> from snowflake.snowpark.functions import sproc

>>> @sproc(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True, packages=["snowflake-snowpark-python"])
... def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
...  return session.sql(f"select {x} - 1").collect()[0][0]
Copy

以下示例演示了如何调用这些存储过程:

>>> add_one(1)
2
>>> session.call("minus_one", 1)
0
>>> session.sql("call minus_one(1)").collect()
[Row(MINUS_ONE(1)=0)]
Copy

使用存储过程读取文件

要使用存储过程读取文件的内容,可以执行以下操作:

读取静态指定的文件

  1. 指定文件是依赖项,可将文件上传到服务器。这与 UDFs 的完成方式相同。有关更多信息,请参阅 为 UDF 指定依赖项

    例如:

    >>> # Import a file from your local machine as a dependency.
    >>> session.add_import("/<path>/my_file.txt")
    
    >>> # Or import a file that you uploaded to a stage as a dependency.
    >>> session.add_import("@my_stage/<path>/my_file.txt")
    
    Copy
  2. 在存储过程中,读取文件。

    >>> def read_file(name: str) -> str:
    ...    import sys
    ...    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    ...    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    ...
    ...    with open(import_dir + 'my_file.txt', 'r') as file:
    ...        return file.read()
    
    Copy

使用 SnowflakeFile 读取动态指定的文件

您可以使用 Snowpark snowflake.snowpark.files 模块中的 SnowflakeFile 类从暂存区读取文件。SnowflakeFile 类提供动态文件访问权限,允许您流式传输任何大小的文件。当您想要迭代多个文件时,动态文件访问也非常有用。例如,请参阅 处理多个文件

有关使用 SnowflakeFile 读取文件的更多信息和示例,请参阅 使用 Python UDF 处理程序 中的 SnowflakeFile 类读取文件。

以下示例创建了一个永久存储过程,该过程使用 SnowflakeFile 从暂存区读取文件并返回文件长度。

创建存储过程:

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.types import StringType, IntegerType

@sproc(name="calc_size", is_permanent=True, stage_location="@my_procedures", replace=True, packages=["snowflake-snowpark-python"])
def calc_size(ignored_session: snowpark.Session, file_path: str) -> int:
  with SnowflakeFile.open(file_path) as f:
    s = f.read()
  return len(s);
Copy

调用存储过程:

file_size = session.sql("call calc_size(build_scoped_file_url('@my_stage', 'my_file.csv'))")
Copy
语言: 中文