Python UDF 处理程序示例¶
本主题包括用 Python 编写的 UDF 处理程序代码的简单示例。
有关使用 Python 创建 UDF 处理程序的信息,请参阅 创建 Python UDFs。
将 runtime_version
设置为代码所需 Python 运行时的版本。受支持的 Python 版本包括:
3.8
3.9
3.10
3.11
在内联处理程序中导入包¶
提供 Anaconda 的第三方软件包的精选列表。有关更多信息,请参阅 使用第三方包。
备注
在使用 Anaconda 提供的包之前,Snowflake 组织管理员必须确认 Snowflake 第三方条款。有关更多信息,请参阅 使用 Anaconda 的第三方包。
以下代码展示如何导入包并返回其版本。
创建 UDF:
CREATE OR REPLACE FUNCTION py_udf()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('numpy','pandas','xgboost==1.5.0')
HANDLER = 'udf'
AS $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
return [np.__version__, pd.__version__, xgb.__version__]
$$;
调用 UDF:
SELECT py_udf();
+-------------+
| PY_UDF() |
|-------------|
| [ |
| "1.19.2", |
| "1.4.0", |
| "1.5.0" |
| ] |
+-------------+
您可以使用 PACKAGES 关键字指定包版本,如下所示:
没有版本(例如
numpy
)固定到确切的版本(例如
numpy==1.25.2
)使用通配符限制为版本前缀(例如
numpy==1.*
)限制在某个版本范围内(例如
numpy>=1.25
)受到多个版本说明符的限制(例如
numpy>=1.25,<2
),以便选择满足所有版本说明符的包。
备注
使用多个范围运算符(例如 numpy>=1.25,<2
)在包策略中不受支持,但您可以在创建 Python UDF、UDTF 以及存储过程时使用它们。
下面是如何使用通配符 *
将包限制为版本前缀的示例。
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy==1.*')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
return 'hi'
$$;
此示例显示如何限制包大于或等于指定的版本。
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy>=1.2')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
return 'hi'
$$;
此示例显示如何使用多个包版本说明符。
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES=('numpy>=1.2,<2')
RUNTIME_VERSION=3.10
HANDLER='echo'
AS $$
def echo():
return 'hi'
$$;
读取文件¶
您可以使用 Python UDF 处理程序代码读取文件的内容。例如,您可能希望读取文件以处理非结构化数据。
要读取文件的内容,您可以执行以下操作:
使用 IMPORTS 子句静态指定文件路径和名称,然后从 UDF 的主目录中读取它。当文件名是静态的、在函数中一致并且您事先知道文件名时,此操作可能很有用。
动态指定文件并使用 SnowflakeFile 读取其内容。如果需要在计算过程中访问文件,则可以执行此操作。
使用 IMPORTS 读取静态指定的文件¶
您可以在 CREATE FUNCTION 命令的 IMPORTS 子句中指定文件名和暂存区名,从而读取文件。
在 IMPORTS 子句中指定文件时,Snowflake 会将文件从暂存区复制到 UDF 的 * 主目录 * (也称为 * 导入目录 *),UDF 正是从该目录中读取文件。
Snowflake 将导入的文件复制到单个目录。目录中的所有文件都必须具有唯一的名称,因此 IMPORTS 子句中的每个文件都必须具有不同的名称。即使文件在不同暂存区或暂存区内的不同子目录中启动,这点也适用。
备注
您只能从暂存区上的顶级目录导入文件,而不能从子文件夹导入文件。
以下示例使用内联 Python 处理程序,从名为 my_stage
的暂存区读取名为 file.txt
的文件。处理程序使用带 snowflake_import_directory
系统选项的 Python sys._xoptions
方法来检索 UDF 主目录的位置。
Snowflake 在 UDF 创建期间仅读取一次文件,如果是在目标处理程序外部读取文件,则在 UDF 执行期间不会再次读取该文件。
使用内联处理程序创建 UDF:
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
IMPORTS=('@my_stage/file.txt')
HANDLER='compute'
AS
$$
import sys
import os
with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
s = f.read()
def compute():
return s
$$;
使用 SnowflakeFile
读取动态指定的文件¶
您可以使用 Snowpark snowflake.snowpark.files
模块中的 SnowflakeFile
类从暂存区读取文件。SnowflakeFile
类提供动态文件访问权限,允许您流式传输任何大小的文件。当您想要迭代多个文件时,动态文件访问也非常有用。例如,请参阅 处理多个文件。
SnowflakeFile
类有一种打开文件的方法:open
。open
方法返回 SnowflakeFile
对象,该对象扩展了 Python 的 IOBase
文件对象。
SnowflakeFile
对象支持以下 IOBase
、BufferedIOBase
和 RawIOBase
方法:
IOBase.fileno
IOBase.isatty
IOBase.readable
IOBase.readinto
IOBase.readline
IOBase.readlines
IOBase.seek
IOBase.seekable
IOBase.tell
BufferedIOBase.readinto1
RawIOBase.read
RawIOBase.readall
有关更多信息,请参阅 ` 有关 IOBase 的 Python 3.8 文档 <https://docs.python.org/3.8/library/io.html (https://docs.python.org/3.8/library/io.html)>`_。在 Snowflake 服务器中调用不受支持的方法(例如 :code:`fileno`方法),这样将返回错误。
备注
默认情况下,SnowflakeFile
文件访问需要带作用域的 URLs,以使代码能够抵御文件注入攻击。您可以使用内置函数 BUILD_SCOPED_FILE_URL 在 SQL 中创建带有作用域的 URL。有关带作用域的 URLs 的更多信息,请参阅 可用于访问文件的 URLs 类型。只有有权访问文件的用户才能创建带作用域的 URL。
先决条件¶
在 Python 处理程序代码可以在暂存区上读取文件之前,必须执行以下操作以使该文件可供代码使用:
创建可供处理程序使用的暂存区。
您可以使用外部暂存区或内部暂存区。如果使用内部暂存区,则在计划创建调用方权限存储过程时,它可以是用户暂存区。否则,必须使用命名暂存区。Snowflake 目前不支持将表暂存区用于 UDF 依赖项。
有关创建暂存区的更多信息,请参阅 CREATE STAGE。有关选择内部暂存区类型的更多信息,请参阅 为本地文件选择内部暂存区。
您必须为以下角色分配足够的暂存区权限,具体取决于用例:
用例
角色
UDF 或所有者权限存储过程
拥有执行 UDF 或存储过程的角色。
调用方权限存储过程
用户角色。
有关更多信息,请参阅 向用户定义函数授予权限。
将您要读取的文件复制到暂存区。
您可以使用 PUT 命令将文件从本地驱动器复制到内部暂存区。有关使用 PUT 暂存文件的信息,请参阅 从本地文件系统暂存数据文件。
您可以使用云存储服务提供的任何工具将文件从本地驱动器复制到外部暂存区位置。如需帮助,请参阅云存储服务的文档。
使用内联 Python 处理程序计算图像的 pHash¶
此示例使用 SnowflakeFile
读取一对暂存图像文件,并使用每个文件的 ` pHash <https://www.phash.org/ (https://www.phash.org/)>`_ 来确定图像之间的相似度。
创建用于返回图像的 pHash 值的 UDF,通过传递 mode
实参的 rb
,将输入模式指定为二进制:
CREATE OR REPLACE FUNCTION calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile
def run(file_path):
with SnowflakeFile.open(file_path, 'rb') as f:
return imagehash.average_hash(Image.open(f))
$$;
创建秒 UDF,用于计算两个图像的 pHash 值之间的距离:
CREATE OR REPLACE FUNCTION calc_phash_distance(h1 string, h2 string)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('imagehash')
HANDLER = 'run'
as
$$
import imagehash
def run(h1, h2):
return imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2)
$$;
暂存图像文件并刷新目录表:
PUT file:///tmp/image1.jpg @images AUTO_COMPRESS=FALSE;
PUT file:///tmp/image2.jpg @images AUTO_COMPRESS=FALSE;
ALTER STAGE images REFRESH;
调用 UDFs:
SELECT
calc_phash_distance(
calc_phash(build_scoped_file_url(@images, 'image1.jpg')),
calc_phash(build_scoped_file_url(@images, 'image2.jpg'))
) ;
使用 UDTF 处理 CSV 文件¶
此示例使用 SnowflakeFile
创建 UDTF,用于提取 CSV 文件的内容并返回表中的行。
使用内联处理程序创建 UDTF:
CREATE FUNCTION parse_csv(file_path string)
RETURNS TABLE (col1 string, col2 string, col3 string)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'csvparser'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
class csvparser:
def process(self, stagefile):
with SnowflakeFile.open(stagefile) as f:
for line in f.readlines():
lineStr = line.strip()
row = lineStr.split(",")
try:
# Read the columns from the line.
yield (row[1], row[0], row[2], )
except:
pass
$$;
暂存 CSV 文件并刷新目录表:
PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;
ALTER STAGE data_stage REFRESH;
调用 UDTF,传递文件 URL:
SELECT * FROM TABLE(PARSE_CSV(build_scoped_file_url(@data_stage, 'sample.csv')));
处理多个文件¶
您可以将目录表的 RELATIVE_PATH 列传递给处理程序,从而读取和处理多个文件。有关 RELATIVE_PATH 列的更多信息,请参阅 目录表查询的输出。
备注
根据文件大小和计算需求,您可能希望在执行用于读取和处理多个文件的语句之前,使用 ALTER WAREHOUSE 来纵向扩展仓库。
- 调用 UDF 以处理多个文件:
下面的示例在 CREATE TABLE 语句内调用 UDF,以处理暂存区上的每个文件,然后将结果存储在新表中。
出于演示目的,该示例假定以下内容:
名为
my_stage
的暂存区上有多个文本文件。存在名为
get_sentiment
的现有 UDF,用于对非结构化文本执行情感分析。UDF 文件采用文本文件的路径作为输入,并返回表示情感的值。
CREATE OR REPLACE TABLE sentiment_results AS SELECT relative_path , get_sentiment(build_scoped_file_url(@my_stage, relative_path)) AS sentiment FROM directory(@my_stage);
- 调用 UDTF 以处理多个文件:
下一个示例调用名为
parse_excel_udtf
的 UDTF。该示例从名为my_excel_stage
的暂存区的目录表中传递relative_path
。SELECT t.* FROM directory(@my_stage) d, table(parse_excel_udtf(build_scoped_file_url(@my_excel_stage, relative_path)) t;
使用暂存区 URIs 和 URLs 读取文件¶
默认情况下, SnowflakeFile
文件访问需要带作用域的 URLs。这样会使代码能够抵御文件注入攻击。但是,您可以使用暂存区 URI 或暂存区 URL 来引用文件位置。为此,您必须使用关键字实参 require_scoped_url = False
来调用 SnowflakeFile.open
方法。
当您希望让调用方提供只有 UDF 所有者才能访问的 URI 时,此操作很有用。例如,如果您拥有 UDF 暂存区,并且希望读取配置文件或机器学习模型,您可以使用暂存区 URI 进行文件访问。当您使用名称不可预测的文件(例如基于用户输入创建的文件)时,我们不建议使用此选项。
此示例从文件中读取机器学习模型,并在函数中使用该模型以执行自然语言处理,从而进行情感分析。该示例使用 require_scoped_url = False
调用 open
。在两种文件位置格式(暂存区 URI 和暂存区 URL)中,UDF 所有者必须有权访问模型文件。
使用内联处理程序创建 UDF:
CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle
def run(input_data):
model_file = '@models/NLP_model.pickle'
# Specify 'mode = rb' to open the file in binary mode.
with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
model = pickle.load(f)
return model.predict([input_data])[0]
$$;
暂存模型文件并刷新目录表:
PUT file:///tmp/NLP_model.pickle @models AUTO_COMPRESS=FALSE;
ALTER STAGE models REFRESH;
或者,您可以使用模型的暂存区 URL 来指定 UDF,以提取情感。
例如,创建带内联处理程序的 UDF,使用暂存区 URL 指定文件:
CREATE OR REPLACE FUNCTION extract_sentiment(input_data string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle
def run(input_data):
model_file = 'https://my_account/api/files/my_db/my_schema/models/NLP_model.pickle'
# Specify 'rb' to open the file in binary mode.
with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
model = pickle.load(f)
return model.predict([input_data])[0]
$$;
使用输入数据调用 UDF:
SELECT extract_sentiment('I am writing to express my interest in a recent posting made.');
写入文件¶
UDF 处理程序可以将文件写入针对调用 UDF 的查询创建的 /tmp
目录。
请记住,/tmp
目录为单个调用查询预留,但多个 Python 工作进程可能同时运行。为防止冲突,您必须确保对 /tmp 目录的访问与其他 Python 工作进程同步,或者写入 /tmp 的文件的名称是唯一的。
有关示例代码,请参阅 解压缩暂存文件 (本主题内容)。
以下示例中的代码将输入 text
写入 /tmp
目录。它还追加函数的进程 ID,以确保文件位置的唯一性。
def func(text):
# Append the function's process ID to ensure the file name's uniqueness.
file_path = '/tmp/content' + str(os.getpid())
with open(file_path, "w") as file:
file.write(text)
解压缩暂存文件¶
您可以将 .zip 文件存储在暂存区,然后使用 Python zipfile 模块,在 UDF 中将其解压。
例如,您可以将 .zip 文件上传到暂存区,然后在创建 UDF 时,在 IMPORTS 子句的暂存位置引用 .zip 文件。在运行时,Snowflake 会将暂存文件复制到导入目录,从该目录中代码可以访问文件。
有关读取和写入文件的更多信息,请参阅 读取文件 和 写入文件。
在以下示例中,UDF 代码使用 NLP 模型来发现文本中的实体。代码返回这些实体的数组。要设置用于处理文本的 NLP 模型,代码首先使用 zipfile 模块从 .zip 文件中提取模型 (en_core_web_sm-2.3.1) 的文件。然后,代码使用 spaCy 模块从文件加载模型。
请注意,代码将提取的文件内容写入为调用此函数的查询创建的 /tmp 目录。代码使用文件锁来确保在 Python 工作进程之间同步提取;这样,内容只需解压缩一次即可。有关写入文件的更多信息,请参阅 写入文件。
有关 zipfile 模块的更多信息,请参阅 ` zipfile 参考 <https://docs.python.org/3/library/zipfile.html (https://docs.python.org/3/library/zipfile.html)>`_。有关 spaCy 模块的更多信息,请参阅 ` spaCy API 文档 <https://spacy.io/api (https://spacy.io/api)>`_。
使用内联处理程序创建 UDF:
CREATE OR REPLACE FUNCTION py_spacy(str string)
RETURNS ARRAY
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'func'
PACKAGES = ('spacy')
IMPORTS = ('@spacy_stage/spacy_en_core_web_sm.zip')
AS
$$
import fcntl
import os
import spacy
import sys
import threading
import zipfile
# File lock class for synchronizing write access to /tmp.
class FileLock:
def __enter__(self):
self._lock = threading.Lock()
self._lock.acquire()
self._fd = open('/tmp/lockfile.LOCK', 'w+')
fcntl.lockf(self._fd, fcntl.LOCK_EX)
def __exit__(self, type, value, traceback):
self._fd.close()
self._lock.release()
# Get the location of the import directory. Snowflake sets the import
# directory location so code can retrieve the location via sys._xoptions.
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
# Get the path to the ZIP file and set the location to extract to.
zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
extracted = '/tmp/en_core_web_sm'
# Extract the contents of the ZIP. This is done under the file lock
# to ensure that only one worker process unzips the contents.
with FileLock():
if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
with zipfile.ZipFile(zip_file_path, 'r') as myzip:
myzip.extractall(extracted)
# Load the model from the extracted file.
nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")
def func(text):
doc = nlp(text)
result = []
for ent in doc.ents:
result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
return result
$$;
处理 NULL 值¶
以下代码展示如何处理 NULL 值。有关更多信息,请参阅 NULL 值。
创建 UDF:
CREATE OR REPLACE FUNCTION py_udf_null(a variant)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'udf'
AS $$
def udf(a):
if not a:
return 'JSON null'
elif getattr(a, "is_sql_null", False):
return 'SQL null'
else:
return 'not null'
$$;
调用 UDF:
SELECT py_udf_null(null);
SELECT py_udf_null(parse_json('null'));
SELECT py_udf_null(10);
+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null |
+-------------------+
+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null |
+---------------------------------+
+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null |
+-----------------+