使用 Python 编写存储过程

本主题介绍如何使用 Python 编写存储过程。您可以在存储过程中使用 Snowpark 库对 Snowflake 中的表执行查询、更新和其他操作。

本主题内容:

简介

借助 Snowpark Stored Procedures,您可以使用Snowflake 仓库作为计算框架,在 Snowflake 中构建和运行数据管道。使用适用于 Python 的 Snowpark API 编写存储过程来构建数据管道。若要计划这些存储过程的执行,请使用 任务

有关机器学习模型和 Snowpark Python 的信息,请参阅 使用 Snowpark Python 训练机器学习模型

您可以 :doc:` 使用 Python 工作表 </developer-guide/snowpark/python/python-worksheets>` 或 :ref:` 使用本地开发环境 <label-stored_proc_python_intro>` 编写适用于 Python 的 Snowpark Stored Procedures。

您可以在处理程序代码执行时获取日志和跟踪数据。有关更多信息,请参阅 日志记录和跟踪概述

备注

要创建和调用匿名过程,请使用 CALL(使用匿名过程)。创建和调用匿名过程不需要具有 CREATE PROCEDURE 架构权限的角色。

本地编写存储过程的先决条件

要在本地开发环境中编写 Python 存储过程,需满足以下先决条件:

  • 必须使用版本 0.4.0 或更新版本的 Snowpark 库。

  • 启用 Anaconda 包,以便 Snowpark Python 可以加载所需的第三方依赖项。请参阅 使用 Anaconda 的第三方包

  • 受支持的 Python 版本包括:

    • 3.8

    • 3.9

    • 3.10

    • 3.11

请确保设置您的开发环境以使用 Snowpark 库。请参阅 为 Snowpark 设置开发环境

为存储过程编写 Python 代码

对于过程的逻辑,您可以编写在调用过程时执行的处理程序代码。本节介绍处理程序的设计。

您可以通过多种方式通过处理程序代码创建存储过程:

限制

Snowpark Stored Procedures 具有以下限制:

  • 存储过程不支持创建进程。

  • 存储过程不支持运行并发查询。

  • 您不能使用执行 PUT 和 GET 命令的 APIs,包括 Session.sql("PUT ...")Session.sql("GET ...")

  • 当您使用 session.file.get 从暂存区下载文件时,不支持模式匹配。

  • 如果从任务执行存储过程,则必须在创建任务时指定仓库。您不能使用无服务器计算资源来运行任务。

  • 所有者权限存储过程不支持创建指定的临时对象。所有者权限存储过程是以存储过程所有者的权限运行的存储过程。有关更多信息,请参阅 :doc:` 调用方的权限或所有者的权限 </developer-guide/stored-procedure/stored-procedures-rights>`。

计划编写存储过程

存储过程在 Snowflake 内部运行,因此您在规划代码时必须考虑到这一点。

  • 限制消耗的内存量。Snowflake 根据所需的内存量对方法进行了限制。如需指导,请参阅 设计保持在 Snowflake 施加的约束范围内的处理程序

  • 确保您的处理程序方法或函数是线程安全的。

  • 遵守规则和安全限制。请参阅 UDFs 和过程的安全实践

  • 确定是否希望以 :doc:` 调用者权限或所有者权限 </developer-guide/stored-procedure/stored-procedures-rights>` 运行存储过程。

  • 考虑用于运行存储过程的 snowflake-snowpark-python 版本。由于存储过程发布流程的限制,Python 存储过程环境中可用的 snowflake-snowpark-python 库通常比公开发布的版本落后一个版本。使用以下 SQL 查找最新可用版本:

    select * from information_schema.packages where package_name = 'snowflake-snowpark-python' order by version desc;
    
    Copy

编写方法或函数

为存储过程编写方法或函数时,请注意以下事项:

  • 指定 Snowpark Session 对象作为方法或函数的第一个实参。调用存储过程时,Snowflake 会自动创建一个 Session 对象并将其传递给存储过程。(您不能自行创建 Session 对象。)

  • 对于其余实参和返回值,请使用与 Snowflake 数据类型 中列出的 Python 数据类型。

  • 在过程的处理程序中运行异步子作业(例如使用 DataFrame .collect_nowait)时,不支持 “发后即忘”。

    换句话说,如果处理程序发出的子查询在父存储过程作业完成时仍在运行,子作业就会自动取消。

处理错误

您可以使用正常的 Python 异常处理技术来捕获过程中的错误。

如果方法内部出现未捕获的异常,Snowflake 会引发一个包含异常堆栈跟踪的错误。启用 记录未处理异常 后,Snowflake 会在事件表中记录有关未处理异常的数据。

为代码提供依赖项

如果处理程序代码依赖在处理程序之外(如模块中定义的代码)定义的代码或依赖资源文件,则可以通过将这些依赖项上传到暂存区来为代码提供这些依赖项。请参阅 为代码提供依赖项,或者对于 Python 工作表,请参阅 将 Python 文件从暂存区添加到工作表

如果使用 SQL 创建存储过程,请在编写 CREATE PROCEDURE 语句 时使用 IMPORTS 子句,以指向依赖项文件。

通过存储过程访问 Snowflake 中的数据

要访问 Snowflake 中的数据,请使用 Snowpark 库 APIs。

处理对 Python 存储过程的调用时,Snowflake 会创建一个 Snowpark Session 对象,并将该对象传递给存储过程的方法或函数。

与使用其他语言的存储过程一样,会话的上下文(例如权限、当前数据库和架构等)取决于存储过程是以调用方权限还是所有者权限运行。有关详细信息,请参阅 访问和设置会话状态

您可以使用此 Session 对象调用 Snowpark 库 中的 APIs。例如,您可以 :ref:` 为表创建 DataFrame <label-snowpark_python_dataframes>`,或执行 SQL 语句。

有关更多信息,请参阅 :doc:` Snowpark 开发者指南 </developer-guide/snowpark/python/index>`。

数据访问示例

以下是 Python 方法的示例,该方法将指定数量的行从一个表复制到另一个表。该方法采用以下实参:

  • Snowpark Session 对象

  • 要从中复制行的表的名称

  • 要将行保存到的表的名称

  • 要复制的行数

此示例中的方法返回一个字符串。如果您在 :doc:` Python 工作表 </developer-guide/snowpark/python/python-worksheets>` 中运行此示例,:ref:` 请将工作表的返回类型更改 <label-snowsight_py_worksheets_return>` 为 String

def run(session, from_table, to_table, count):

  session.table(from_table).limit(count).write.save_as_table(to_table)

  return "SUCCESS"
Copy

读取文件

您可以使用 Snowpark snowflake.snowpark.files 模块中的 SnowflakeFile 类从 Python 处理程序中的暂存区动态读取文件。

Snowflake 支持使用 SnowflakeFile 读取存储过程和用户定义函数的文件。有关在处理程序代码中读取文件的更多信息以及更多示例,请参阅 使用 Python UDF 处理程序读取文件

此示例演示如何创建和调用使用 SnowflakeFile 类读取文件的所有者权限存储过程。

使用内联处理程序程序创建存储过程,通过传递 rb 作为 mode 实参来将输入模式指定为二进制:

CREATE OR REPLACE PROCEDURE calc_phash(file_path string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS
$$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile

def run(ignored_session, file_path):
    with SnowflakeFile.open(file_path, 'rb') as f:
        return imagehash.average_hash(Image.open(f))
$$;
Copy

调用存储过程:

CALL calc_phash(build_scoped_file_url(@my_files, 'my_image.jpg'));
Copy

使用 Anaconda 的第三方包

您可以在创建 Python 存储过程时指定要安装的 Anaconda 包。要查看 Anaconda 的第三方包列表,请参阅 ` Anaconda Snowflake 通道 <https://repo.anaconda.com/pkgs/snowflake (https://repo.anaconda.com/pkgs/snowflake)>`__。这些第三方包由 Anaconda 构建和提供。您可以根据 Anaconda 服务条款的补充嵌入式软件条款免费使用 Snowflake conda 通道进行本地测试和开发。

有关限制,请参阅 限制

开始使用

在开始使用 Snowflake 中的 Anaconda 提供的包之前,您必须确认 外部产品条款

备注

您必须是组织管理员(使用 ORGADMIN 角色)才能接受这些条款。您只需为自己的 Snowflake 账户接受一次这些条款。请参阅 在账户中启用 ORGADMIN 角色

  1. 登录 Snowsight。

  2. 选择 Admin » Billing & Terms

  3. Anaconda 部分中,选择 Enable

  4. Anaconda Packages 对话框中,单击链接查看 外部产品条款页面

  5. 如果您同意条款,请选择 Acknowledge & Continue

如果在尝试接受服务条款时出现错误,您的用户简介中可能缺少名、姓或电子邮件地址。如果您有管理员角色,请参阅 向您的用户简介添加用户详细信息,以使用 Snowsight 更新您的简介。否则,请联系管理员 更新您的账户

备注

如果您不确认上述 Snowflake 第三方条款,您仍然可以使用存储过程,但受到以下限制:

  • 您不能使用 Anaconda 的任何第三方包。

  • 您仍然可以将 Snowpark Python 指定为存储过程中的包,但不能指定特定版本。

  • 与 DataFrame 对象交互时不能使用 to_pandas 方法。

显示和使用包

您可以通过查询 Information Schema 中的 PACKAGES 视图来显示所有可用的包及其版本信息:

select * from information_schema.packages where language = 'python';
Copy

有关更多信息,请参阅 Snowflake Python UDF 文档中 :doc:` 使用第三方包 </developer-guide/udf/python/udf-python-packages>`。

创建存储过程

您可以从 Python 工作表创建存储过程,或者使用 SQL。

创建 Python 存储过程以自动执行 Python 工作表代码

从 Python 工作表创建 Python 存储过程来自动执行代码。有关编写 Python 工作表的详细信息,请参阅 在 Python 工作表中编写 Snowpark 代码

先决条件

您的角色必须对运行 Python 工作表的数据库架构具有 OWNERSHIP 或 CREATE PROCEDURE 权限,才能将其部署为存储过程。

将 Python 工作表部署为存储过程

要创建 Python 存储过程来自动执行 Python 工作表中的代码,请执行以下操作:

  1. 登录 Snowsight。

  2. 打开 Projects » Worksheets

  3. 打开要部署为存储过程的 Python 工作表。

  4. 选择 Deploy

  5. 输入存储过程的名称。

  6. (可选)输入有关存储过程的详细信息的注释。

  7. (可选)选择 Replace if exists 替换具有相同名称的现有存储过程。

  8. 对于 Handler,选择存储过程的处理程序函数。例如 main

  9. 检查处理程序函数使用的实参,如果需要,替换类型化实参的 SQL 数据类型映射。有关如何将 Python 类型映射到 SQL 类型的详细信息,请参阅 SQL-Python 数据类型映射

  10. (可选)选择在 Open in Worksheets 以在 SQL 工作表中打开存储过程定义。

  11. 选择 Deploy 创建存储过程。

  12. 创建存储过程后,您可以转到过程详细信息或选择 Done

您可以从一张 Python 工作表创建多个存储过程。

创建存储过程后,您可以将其作为任务的一部分自动执行。请参阅 任务简介

返回表格数据

您可以编写一个过程,以表格形式返回数据。若要编写返回表格数据的过程,请执行以下操作:

  • CREATE PROCEDURE 语句中指定 TABLE(...) 作为过程的返回类型。

    您可以指定返回数据的列名和 :doc:` 类型 </sql-reference-data-types>` (如果您知道)作为 TABLE 参数。如果在定义过程时不知道返回的列(例如在运行时指定列时),则可以省略参数 TABLE。如果执行此操作,过程的返回值列将通过其处理程序返回的 DataFrame 中的列进行转换。列数据类型将根据 SQL-Python 数据类型映射 中指定的映射转换为 SQL。

  • 编写处理程序,使其在 Snowpark DataFrame 中返回表格结果。

    有关 DataFrame 的更多信息,请参阅 在 Snowpark Python 中使用 DataFrames

示例

本节中的示例演示如何通过筛选列与字符串匹配的行的过程返回表格值。

定义数据

以下示例中的代码创建一个员工表。

CREATE OR REPLACE TABLE employees(id NUMBER, name VARCHAR, role VARCHAR);
INSERT INTO employees (id, name, role) VALUES (1, 'Alice', 'op'), (2, 'Bob', 'dev'), (3, 'Cindy', 'dev');
Copy

指定返回列名称和类型

此示例在 RETURNS TABLE() 语句中指定列名称和类型。

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE(id NUMBER, name VARCHAR, role VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col

def filter_by_role(session, table_name, role):
   df = session.table(table_name)
   return df.filter(col("role") == role)
$$;
Copy

省略返回列名称和类型

以下示例中的代码声明一个过程,该过程允许通过处理程序返回值中的列推断返回值列名称和类型。它会忽略 RETURNS TABLE() 语句中的列名称和类型。

CREATE OR REPLACE PROCEDURE filterByRole(tableName VARCHAR, role VARCHAR)
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'filter_by_role'
AS
$$
from snowflake.snowpark.functions import col

def filter_by_role(session, table_name, role):
  df = session.table(table_name)
  return df.filter(col("role") == role)
$$;
Copy

调用过程

下面的示例调用存储过程:

CALL filterByRole('employees', 'dev');
Copy

过程调用生成以下输出:

+----+-------+------+
| ID | NAME  | ROLE |
+----+-------+------+
| 2  | Bob   | dev  |
| 3  | Cindy | dev  |
+----+-------+------+

调用存储过程

创建存储过程后,您可以通过 SQL 调用它或将其作为计划任务的一部分调用。

  • 有关通过 SQL 调用存储过程的信息,请参阅 调用存储过程

  • 有关将存储过程作为计划任务的一部分进行调用的信息,请参阅 任务简介

示例

使用工作进程运行并发任务

您可以使用 Python 工作进程运行并发任务。当您需要运行的并行任务利用仓库节点上的多个 CPU 核心时,您可能会发现此操作很有用。

备注

Snowflake 建议您不要使用内置的 Python 多处理模块。

要解决 ` Python 全局解释器锁 <https://wiki.python.org/moin/GlobalInterpreterLock (https://wiki.python.org/moin/GlobalInterpreterLock)>`_ 阻止多任务处理方法扩展到所有 CPU 核心,您可以使用单独的工作进程而不是线程来执行并发任务。

您可以使用 joblib 库的 Parallel 类对 Snowflake 仓库执行此操作,如以下示例所示。

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.8
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

备注

Snowflake Standard 和 Snowpark-Optimized Warehouses 用于 joblib.Parallel 的默认后端有所不同。

  • Standard Warehouse 默认值:threading

  • Snowpark-Optimized Warehouse 默认值: loky (多处理)

您可以通过调用 joblib.parallel_backend 函数来替换默认后端设置,如下例所示。

import joblib
joblib.parallel_backend('loky')
Copy

使用 Snowpark APIs 进行异步处理

以下示例说明了如何使用 Snowpark APIs 启动异步子作业,以及这些作业在不同条件下的行为方式。

例 1:检查异步子作业的状态

在下面的示例中,过程 checkStatus 会执行一个等待 60 秒的异步子作业。然后,过程会在任务完成之前检查任务的状态,因此检查返回 False

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;
Copy

以下代码会调用该过程。

CALL checkStatus();
Copy
+------------+
| checkStatus |
|------------|
| False      |
+------------+

示例 2:取消异步子作业

在下面的示例中,过程 cancelJob 使用 SQL 将数据插入 test_tb 表,异步子作业需要 10 秒才能完成。然后,它会在子作业完成并插入数据之前取消子作业。

CREATE OR REPLACE TABLE test_tb(c1 STRING);
Copy
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();
Copy

以下代码会查询 test_tb 表,但由于没有插入数据,所以不返回结果。

SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+

示例 3:异步子作业运行时的等待和阻塞

在下面的示例中,存储过程 blockUntilDone 会执行一项异步子作业,需要 5 秒才能完成。使用 snowflake.snowpark.AsyncJob.result 方法,过程会等待并在任务完成后返回。

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;
Copy

以下代码会调用 blockUntilDone 过程,等待 5 秒后返回。

CALL blockUntilDone();
Copy
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

示例 4:在请求未完成异步子作业的结果后返回错误

在下面的示例中,过程 earlyReturn 会执行一项异步子作业,需要 60 秒才能完成。然后,在作业完成之前,过程会尝试从作业结果中返回 DataFrame。结果是返回错误。

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;
Copy

以下代码调用 earlyReturn 过程,返回错误。

CALL earlyReturn();
Copy
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

实例 5:在子作业完成前完成父作业,取消子作业

在下面的示例中,过程 earlyCancelJob 会执行异步子作业,向表中插入数据,需要 10 秒才能完成。但是,父作业 async_handler 会在子作业完成前返回,从而取消子作业。

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
Copy

以下代码会调用 earlyCancelJob 过程。然后查询 test_tb 表,但由于取消的子作业没有插入数据,因此没有返回结果。

CALL earlyCancelJob();
SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+
语言: 中文