使用第三方包

暂存区可用于导入第三方包。还可以在创建 Python UDFs 时指定要安装的 Anaconda 包。

本主题内容:

Artifact Repository 概述

Artifact Repository 允许您在 Snowpark Python 用户定义函数 (UDFs) 和存储过程中直接使用来自 Python Package Index (PyPI (https://pypi.org/)) 的 Python 包。这使得在 Snowflake 中构建和扩展基于 Python 的应用程序变得更加容易。

开始使用

Snowflake 有一个名为 snowflake.snowpark.pypi_shared_repository 的默认 Artifact Repository,用于在 Snowpark UDFs 和过程中连接和安装 PyPI 包。要让用户使用此存储库,账户管理员(已被授予 ACCOUNTADMIN 角色的用户)需要向用户的角色授予 SNOWFLAKE PYPI_REPOSITORY_USER 数据库角色:

GRANT DATABASE ROLE SNOWFLAKE.PYPI_REPOSITORY_USER TO ROLE some_user_role;
Copy

要将此角色授予账户中的所有用户,账户管理员可以向 PUBLIC 角色授予 SNOWFLAKE.PYPI_REPOSITORY_USER 数据库角色:

GRANT DATABASE ROLE SNOWFLAKE.PYPI_REPOSITORY_USER TO ROLE PUBLIC;
Copy

然后,从存储库中安装该包。创建 UDF 时,将 ARTIFACT_REPOSITORY 参数设置为 Artifact Repository 的名称。您还可以将 PACKAGES 参数设置为将来自 Artifact Repository 的包的名称列表。在以下示例中,由于 Artifact Repository 配置了 PyPI,因此包 scikit-learn 来自于 PyPI。

备注

要指定包版本,请按如下所示进行添加:

PACKAGES = ('scikit-learn==1.5')
Copy
CREATE OR REPLACE FUNCTION sklearn_udf()
  RETURNS FLOAT
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  ARTIFACT_REPOSITORY = snowflake.snowpark.pypi_shared_repository
  PACKAGES = ('scikit-learn')
  HANDLER = 'udf'
  AS
$$
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

def udf():
  X, y = load_iris(return_X_y=True)
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

  model = RandomForestClassifier()
  model.fit(X_train, y_train)
  return model.score(X_test, y_test)
$$;

SELECT sklearn_udf();
Copy

仅为 x86 构建的包

如果某个包仅为 x86 构建,请选择一个使用 x86 CPU 架构的仓库:MEMORY_1X_x86MEMORY_16X_x86。有关更多信息,请参阅 Snowpark-Optimized Warehouses

然后指定:RESOURCE_CONSTRAINT=(architecture='x86')。请查看以下示例。

CREATE OR REPLACE FUNCTION pymeos_example()
RETURNS STRING
LANGUAGE PYTHON
HANDLER='main'
RUNTIME_VERSION='3.11'
ARTIFACT_REPOSITORY=snowflake.snowpark.pypi_shared_repository
PACKAGES=('pymeos') -- dependency pymeos-cffi is x86 only
RESOURCE_CONSTRAINT=(architecture='x86')
AS $$
def main() -> str:
   from pymeos import pymeos_initialize, pymeos_finalize, TGeogPointInst, TGeogPointSeq

   # Always initialize MEOS library
   pymeos_initialize()

   sequence_from_string = TGeogPointSeq(
      string='[Point(10.0 10.0)@2019-09-01 00:00:00+01, Point(20.0 20.0)@2019-09-02 00:00:00+01, Point(10.0 10.0)@2019-09-03 00:00:00+01]')

   sequence_from_points = TGeogPointSeq(instant_list=[TGeogPointInst(string='Point(10.0 10.0)@2019-09-01 00:00:00+01'),
        TGeogPointInst(string='Point(20.0 20.0)@2019-09-02 00:00:00+01'),
        TGeogPointInst(string='Point(10.0 10.0)@2019-09-03 00:00:00+01')],
          lower_inc=True, upper_inc=True)
   speed = sequence_from_points.speed()

   # Call finish at the end of your code
   pymeos_finalize()

   return speed
$$;

SELECT pymeos_example();
Copy

您可以在 UDF 和存储过程客户端 APIs 中使用 Artifact Repository,如下所示:

使用它们时,请指定以下参数:

  • ARTIFACT_REPOSITORY

  • PACKAGES

并在 PACKAGES 字段中提供包名称。

请查看以下示例:

...
ARTIFACT_REPOSITORY="snowflake.snowpark.pypi_shared_repository",
PACKAGES=["urllib3", "requests"],
...
Copy

故障排除

函数或过程创建部分的包安装失败。您可以在本地运行以下 pip 命令来查看包规范是否有效:

pip install <package name> --only-binary=:all: --python-version 3.9 –platform <platform_tag>
Copy

限制

  • 目前,Snowflake 仅支持二进制分发格式(即 wheel 文件)。

  • 不支持访问私有存储库。

  • 您不能直接在笔记本中直接使用此功能。但是,您可以使用在笔记本中使用 PyPI 包的 UDF 或存储过程。

  • 您无法在匿名存储过程中使用 Artifact Repository。

备注

  • Snowflake 不会检查或管理来自外部来源的 Python 包的安全性。您需要自行评估并确保这些包是安全可靠的。

  • Snowflake 保留在不提前通知的情况下屏蔽或移除任何可能有害或存在风险的包的权利。这是为了保护平台的完整性。

通过 Snowflake 暂存区导入包

Snowflake 暂存区可用于导入包。可以引入任何遵循 一般限制 中定义的准则的 Python 代码。有关更多信息,请参阅 使用从暂存区上传的代码创建 Python UDF

您只能通过 Snowflake 暂存区上传纯 Python 包或具有本机代码的包。如果您上传的包依赖于 x86 CPU 架构,那么您必须使用 Snowpark-Optimized Warehouses 并使用 RESOURCE_CONSTRAINT 仓库属性,架构为 CPU x86。

作为一个示例,您可以使用以下 SQL,它会创建一个名为 so_warehouse 的仓库,该仓库具有 x86 CPU 架构:

CREATE WAREHOUSE so_warehouse WITH
   WAREHOUSE_SIZE = 'LARGE'
   WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED'
   RESOURCE_CONSTRAINT = 'MEMORY_16X_X86';
Copy

要通过从暂存区导入来安装带有本机代码的包,请使用以下示例:

CREATE or REPLACE function native_module_test_zip()
  RETURNS string
  LANGUAGE python
  RUNTIME_VERSION=3.9
  RESOURCE_CONSTRAINT=(architecture='x86')
  IMPORTS=('@mystage/mycustompackage.zip')
  HANDLER='compute'
  as
  $$
  def compute():
      import mycustompackage
      return mycustompackage.mycustompackage()
  $$;
Copy

使用 Anaconda 的第三方包

为了方便起见,Anaconda 构建和提供的许多热门的开源第三方 Python 包都可以在 Snowflake 虚拟仓库中开箱即用。除了 Snowflake 的标准消费定价外,使用 Anaconda 包无需支付额外费用。除 Snowflake Notebooks 外,Anaconda 包目前不适合在 Snowpark Container Services (SPCS) 内使用。要在 SPCS 的容器镜像中使用 Python 包,您可以使用 pip 从 PyPi 安装这些包。

要查看 Anaconda 的第三方包列表,请参阅 Anaconda Snowflake 通道 (https://repo.anaconda.com/pkgs/snowflake)。

要请求添加新包,请访问 Snowflake 社区中的 Snowflake 创意 (https://community.snowflake.com/s/ideas/) 页面。选择 Python Packages & Libraries 类别并检查是否有人已经提交了请求。如果是,请进行投票。否则,点击 New Idea 并提交建议。

如果您上传的包依赖于 x86 CPU 架构,那么您必须使用 Snowpark-Optimized Warehouses 并使用 RESOURCE_CONSTRAINT 仓库属性,架构为 CPU x86。

开始使用

在开始使用 Snowflake 中的 Anaconda 提供的包之前,您必须确认 外部产品条款

备注

您必须使用 ORGADMIN 角色来接受条款。您只需为自己的 Snowflake 账户接受一次 外部产品条款。如果您无权访问 ORGADMIN 角色,请参阅 在账户中启用 ORGADMIN 角色

  1. 登录 Snowsight。

  2. 选择 Admin » Terms

  3. Anaconda 部分中,选择 Enable

  4. Anaconda Packages 对话框中,单击链接查看 外部产品条款页面

  5. 如果您同意条款,请选择 Acknowledge & Continue

如果您在尝试接受 外部产品条款 时遇到错误,可能是由于您的用户配置文件中缺少信息,如姓名或电子邮件地址。如果您具有管理员权限,请参阅 向您的用户简介添加用户详细信息,以使用 Snowsight 更新您的简介。否则,请联系管理员 更新您的账户

显示和使用包

显示可用的包

可以通过查询 Information Schema 中的 PACKAGES 视图来显示所有可用的包及其版本信息。

select * from information_schema.packages where language = 'python';
Copy

要显示特定包的版本信息,例如 numpy,请使用以下命令:

select * from information_schema.packages where (package_name = 'numpy' and language = 'python');
Copy

备注

Anaconda Snowflake 通道中的一些包不适合在 Snowflake UDFs 中使用,因为 UDFs 是在一个受限的引擎中执行的。有关更多信息,请参阅 遵循良好的安全实践

在 Snowflake 仓库中执行调用 Python UDFs 的查询时,为您在虚拟仓库上无缝安装并缓存 Anaconda 包。

显示导入的包

通过执行 DESCRIBE FUNCTION 命令,可以显示 UDF 或 UDTF 正在使用的包和模块的列表。对其处理程序在 Python 中实现的 UDF 执行 DESCRIBE FUNCTION 命令将返回多个属性值,包括导入的模块和包以及已安装包的列表、函数签名及其返回类型。

指定 UDF 的标识符时,请确保包括函数参数类型(如果有)。

desc function stock_sale_average(varchar, number, number);
Copy

使用 Anaconda 包

有关如何在 Python UDF 中使用导入的 Anaconda 包的示例,请参阅 在内联处理程序中导入包

设置包策略

可以使用包策略在账户级别为来自 Anaconda 的第三方 Python 包设置允许列表和阻止列表。这使您可以满足更严格的审核和安全要求,并可以更精细地控制环境中哪些包可用、哪些包被阻止。有关更多信息,请参阅 包策略

冷仓库性能

为了更有效地管理资源,新预置的虚拟仓库不会预先安装 Anaconda 包。相反,Anaconda 包是在第一次使用 UDF 时按需安装的。这些包被缓存起来,以便将来在同一仓库上执行 UDF。当仓库暂停的时候缓存会被弃用。这可能会导致首次使用 UDF 或仓库恢复后性能下降。额外的延迟可能约为 30 秒。

本地开发和测试

为了帮助您在本地机器上创建用于开发和测试的 conda 环境,Anaconda 创建了一个 Snowflake 通道,该通道镜像了 Snowflake Python UDF 环境中支持的包和版本的子集。您可以根据 Anaconda 服务条款的补充嵌入式软件条款免费使用 Snowflake conda 通道进行本地测试和开发。

例如,要使用 Snowflake 通道在本地创建新的 conda 环境,请在命令行中输入以下命令:

conda create --name py39_env -c https://repo.anaconda.com/pkgs/snowflake python=3.9 numpy pandas
Copy

注意,由于平台差异,本地的 conda 环境可能与服务器环境不完全相同。

最佳实践

create function 语句中,包规范(例如, packages = ('numpy','pandas'))应该只指定 UDF 直接使用的顶级包。Anaconda 执行包的依赖项管理,并将自动安装所需的依赖项。因此,不应指定依赖项包。

如果未指定包版本,Anaconda 将安装最新版本的包及其依赖项。通常,不需要指定特定的包版本。请注意,当使用 create function 命令创建 UDF 时,将执行一次版本解析。之后,最终的版本解析被冻结,当这个特定的 UDF 执行时,将使用同一套包。

有关如何在 create function 语句中使用包规范的示例,请参阅 在内联处理程序中导入包

第三方包的已知问题

单行预测的性能

一些数据科学框架,例如 Scikit-learn 和 TensorFlow,在进行单行 ML 预测时可能会很慢。为了提高性能,请进行批量预测而不是单行预测。要执行此操作,您可以使用矢量化的 Python UDFs,通过它可以定义批量接收输入行的 Python 函数,机器学习或数据科学库将在这些函数上进行优化操作。有关更多信息,请参阅 矢量化 Python UDFs

从数据科学库按需下载数据

一些数据科学库(如 NLTK (https://www.nltk.org/data.html)、Keras (https://www.tensorflow.org/api_docs/python/tf/keras/datasets) 和 spaCy (https://spacy.io) )提供按需下载其他语料库、数据或模型的功能。

然而,由于 Snowflake 安全约束,按需下载不能与 Python UDFs 一起使用,Snowflake 安全约束禁用了一些功能,例如网络访问和写入文件。

要解决此问题,请将数据下载到本地环境,然后通过 Snowflake 暂存区将其提供给 UDF。

XGBoost

在 UDF 或 UDTF 中使用 XGBoost 进行并行预测或训练时,每个 XGBoost 实例的并发性应该设置为 1。这确保 XGBoost 在 Snowflake 环境中执行时配置为最佳性能。

示例:

import xgboost as xgb
model = xgb.Booster()
model.set_param('nthread', 1)
model.load_model(...)
Copy
import xgboost as xgb
model = xgb.XGBRegressor(n_jobs=1)
Copy

TensorFlow/Keras

使用 Tensorflow/Keras 进行预测时,请使用 Model.predict_on_batch 而不是 Model.predict。

示例:

import keras
model = keras.models.load_model(...)
model.predict_on_batch(np.array([input]))
Copy
语言: 中文