存储过程 Python 处理程序示例¶
使用工作进程运行并发任务¶
您可以使用 Python 工作进程运行并发任务。当您需要运行的并行任务利用仓库节点上的多个 CPU 核心时,您可能会发现此操作很有用。
备注
Snowflake 建议您不要使用内置的 Python 多处理模块。
要解决 ` Python 全局解释器锁 <https://wiki.python.org/moin/GlobalInterpreterLock (https://wiki.python.org/moin/GlobalInterpreterLock)>`_ 阻止多任务处理方法扩展到所有 CPU 核心,您可以使用单独的工作进程而不是线程来执行并发任务。
您可以使用 joblib 库的 Parallel 类对 Snowflake 仓库执行此操作,如以下示例所示。
备注
Snowflake Standard 和 Snowpark-Optimized Warehouses 用于 joblib.Parallel 的默认后端有所不同。
Standard Warehouse 默认值:
threadingSnowpark-Optimized Warehouse 默认值:
loky(多处理)
您可以通过调用 joblib.parallel_backend 函数来替换默认后端设置,如下例所示。
使用 Snowpark APIs 进行异步处理¶
以下示例说明了如何使用 Snowpark APIs 启动异步子作业,以及这些作业在不同条件下的行为方式。
检查异步子作业的状态¶
在下面的示例中,过程 checkStatus 会执行一个等待 60 秒的异步子作业。然后,过程会在任务完成之前检查任务的状态,因此检查返回 False。
以下代码会调用该过程。
取消异步子作业¶
在下面的示例中,过程 cancelJob 使用 SQL 将数据插入 test_tb 表,异步子作业需要 10 秒才能完成。然后,它会在子作业完成并插入数据之前取消子作业。
以下代码会查询 test_tb 表,但由于没有插入数据,所以不返回结果。
异步子作业运行时的等待和阻塞¶
在下面的示例中,存储过程 blockUntilDone 会执行一项异步子作业,需要 5 秒才能完成。使用 snowflake.snowpark.AsyncJob.result 方法,过程会等待并在任务完成后返回。
以下代码会调用 blockUntilDone 过程,等待 5 秒后返回。
在请求未完成异步子作业的结果后返回错误¶
在下面的示例中,过程 earlyReturn 会执行一项异步子作业,需要 60 秒才能完成。然后,在作业完成之前,过程会尝试从作业结果中返回 DataFrame。结果是返回错误。
以下代码调用 earlyReturn 过程,返回错误。
在子作业完成前完成父作业,取消子作业¶
在下面的示例中,过程 earlyCancelJob 会执行异步子作业,向表中插入数据,需要 10 秒才能完成。但是,父作业 async_handler 会在子作业完成前返回,从而取消子作业。
以下代码会调用 earlyCancelJob 过程。然后查询 test_tb 表,但由于取消的子作业没有插入数据,因此没有返回结果。
读取文件和资产¶
使用 IMPORTS 读取静态指定的文件¶
您可以通过在 IMPORTS 命令的 CREATE PROCEDURE 子句中指定文件名和暂存区名来读取文件。
在 IMPORTS 子句中指定文件时,Snowflake 会将文件从暂存区复制到存储过程的 * 主目录 (也称为 *导入目录),存储过程正是从该目录中读取文件。
Snowflake 将导入的文件复制到单个目录。目录中的所有文件都必须具有唯一的名称,因此 IMPORTS 子句中的每个文件都必须具有不同的名称。即使文件在不同暂存区或暂存区内的不同子目录中启动,这点也适用。
以下示例使用内联 Python 处理程序,从名为 file.txt 的暂存区读取名为 my_stage 的文件。处理程序使用带 snowflake_import_directory 系统选项的 Python sys._xoptions 方法来检索存储过程主目录的位置。
Snowflake 在存储过程创建期间仅读取一次文件,如果是在目标处理程序外部读取文件,则在存储过程执行期间不会再次读取该文件。
使用内联处理程序创建存储过程:
使用 IMPORTS 导入目录¶
您可以使用 CREATE PROCEDURE 命令的 IMPORTS 子句导入目录。
备注
目录的导入路径必须以斜杠 (
/) 结尾。例如IMPORTS = ('@my_stage/my_dir/')。要在导入时重命名目录,请将
/=custom_name/追加到暂存区路径。自定义名称必须是单个目录名称,而不是路径。例如IMPORTS = ('@my_stage/my_dir/=custom_name/')。原生应用不支持目录导入。
以下示例从名为 my_stage 的暂存区导入名为 my_dir 的目录,并列出其中包含的文件。
