Snowpark Container Services 中的 Model Serving¶
备注
本主题中所述的在 Snowpark Container Services (SPCS) 中运行模型的功能已在 snowflake-ml-python
版本 1.6.4 及更高版本中推出。
Snowflake Model Registry 允许您在仓库(默认)中或通过 Model Serving 在 Snowpark Container Services (SPCS) 计算池中运行模型。在仓库中运行模型对您可以使用的模型的大小和种类有一些限制(特别是中小型仅 CPU 模型,其依赖项可由 Snowflake conda 通道中可用的包来满足)。
在 Snowpark Container Services (SPCS) 上运行模型可减轻或完全消除这些限制。您可以使用任何包,包括来自 Python Package Index (PyPI) 或其他来源的包。大型模型可在分布式 GPUs 集群上运行。而且,您不需要了解任何容器技术(例如 Docker 或 Kubernetes)。Snowflake Model Serving 会处理所有细节。
关键概念¶
Snowflake Model Serving 推理架构的简化版高层概述如下所示。

该架构的主要组件包括:
推理服务器:运行模型并提供预测的服务器。推理服务器可以使用多个推理进程,以充分利用节点的能力。对模型的请求由准入控制进行调度,准入控制管理传入的请求队列,以避免出现内存不足的情况,并在服务器重载时拒绝客户端。如今,Snowflake 提供了一个简单灵活的基于 Python 的推理服务器,可以为所有类型的模型运行推理。随着时间的推移,Snowflake 计划提供针对特定模型类型进行优化的推理服务器。
特定于模型的 Python 环境:为了降低启动模型时的延迟(包括下载依赖项和加载模型所需的时间),Snowflake 构建了一个容器来封装特定模型的依赖项。
服务函数:为了从仓库中运行的代码与推理服务器进行对话,Snowflake Model Serving 构建了与模型具有相同签名的函数,但这些函数通过 外部函数协议 调用推理服务器。
入口端点:为了允许 Snowflake 以外的应用程序调用模型,Snowflake Model Serving 可预置一个可选的 HTTP 端点,供公共互联网访问。
它是如何工作的?¶
下图展示了 Snowflake Model Serving 在仓库或 SPCS 中部署和提供模型的方式。

如您所见,SPCS 部署的路径比仓库部署的路径更加复杂,但 Snowflake Model Serving 会为您完成所有工作,包括构建容纳模型及其依赖项的容器镜像,以及创建运行模型的推理服务器。
先决条件¶
在开始之前,请确保您满足以下条件:
一个位于任意 AWS 商业区域的 Snowflake 账户。不支持政府区域。如果您的账户在 Azure 中,请联系您的客户代表。
版本 1.6.4 或更高版本的
snowflake-ml-python
Python 包。您希望在 Snowpark Container Services 上运行的模型。
熟悉 Snowpark Container Services。特别是,您应该了解 计算池、镜像仓库 以及相关权限。
创建计算池¶
Snowpark Container Services (SPCS) 在计算池中运行容器镜像。如果您还没有合适的计算池,请按如下方式创建:
CREATE COMPUTE POOL IF NOT EXISTS mypool
MIN_NODES = 2
MAX_NODES = 4
INSTANCE_FAMILY = 'CPU_X64_M'
AUTO_RESUME = TRUE;
有关有效实例系列的列表,请参阅 系列名称表。
确保将运行模型的角色是计算池的所有者,或者是拥有池的 USAGE 或 OPERATE 权限的其他人。
创建镜像仓库¶
Snowflake Model Serving 会构建一个容纳模型及其依赖项的容器镜像。要存储该镜像,您需要一个镜像仓库。如果您还没有,请按以下方式创建一个:
CREATE IMAGE REPOSITORY IF NOT EXISTS my_inference_images
如果要使用非自身拥有的镜像仓库,请确保负责构建容器镜像的角色拥有该仓库的 READ、WRITE、SERVICE READ 和 SERVICE WRITE 权限。按如下方式授予这些权限;
GRANT READ ON IMAGE REPOSITORY my_inference_images TO ROLE myrole;
GRANT WRITE ON IMAGE REPOSITORY my_inference_images TO ROLE myrole;
GRANT SERVICE READ ON IMAGE REPOSITORY my_inference_images TO ROLE myrole;
GRANT SERVICE WRITE ON IMAGE REPOSITORY my_inference_images TO ROLE myrole;
限制¶
在此功能的预览阶段,有以下限制。Snowflake 打算在正式发布之前消除这些限制。
只有模型的所有者才能将模型部署到 Snowpark Container Services 中。
计算集群的规模不会自动扩缩。您可以在运行时使用
ALTER SERVICE myservice MIN_INSTANCES = n
手动更改实例数量。在某些情况下,这会导致现有节点出现故障。服务和计算池的纵向扩展速度比预期慢。这一点在正式发布之前应该会得到改进。
不支持自动暂停容器服务。如果预计会偶尔使用,可能需要在每次使用后手动暂停服务。
如果耗时超过一小时,镜像构建就会失败。
不支持表函数。无常规函数的模型目前无法部署到 Snowpark Container Services。
将模型部署到 SPCS¶
记录新的模型版本 <label-snowpark_model_registry_log_models>`(使用 :code:`reg.log_model)或 获取对现有模型版本的引用 (reg.get_model(...).version(...)
)。无论哪种情况,最终都会得到一个对 ModelVersion
对象的引用。
模型依赖项和资格¶
模型的依赖项决定了它能否在仓库、SPCS 服务或两者中运行。如有必要,您可以特意指定依赖项,使模型没有资格在这些环境中运行。
Snowflake conda 通道仅在仓库中可用,是仓库依赖项的唯一来源。默认情况下,SPCS 模型的 conda 依赖项从 conda-forge 获取其依赖项。
当您记录模型版本时,模型的 conda 依赖项会根据 Snowflake conda 通道进行验证。如果该模型的所有 conda 依赖项都可用,则认为该模型有资格在仓库中运行。如果其所有依赖项都可以从 conda-forge 获取,那么它也有资格在 SPCS 服务中运行,不过在您创建服务之前不会检查这一点。
记录有 PyPI 依赖项的模型必须在 SPCS 上运行。指定至少一个 PyPI 依赖项是使模型没有资格在仓库中运行的一种方法。如果您的模型只有 conda 依赖项,请至少指定一个显式通道(甚至是 conda-forge),如下面的示例所示。
reg.log_model(
model_name="my_model",
version_name="v1",
model=model,
conda_dependencies=["conda-forge::scikit-learn"])
对于 SPCS 部署的模型,应首先安装 conda 依赖项(如果有),然后使用 pip
在 conda 环境中安装所有 PyPI 依赖项。
创建服务¶
要创建 SPCS 服务并将模型部署到其中,请调用模型版本的 create_service
方法,如下面的示例所示。
mv.create_service(service_name="myservice",
service_compute_pool="my_compute_pool",
image_repo="mydb.myschema.my_image_repo",
ingress_enabled=True,
gpu_requests=None)
以下是 create_service
所需的实参:
service_name
:要创建的服务的名称。此名称在账户中必须唯一。service_compute_pool
:用于运行模型的计算池的名称。计算池必须已经存在。image_repo
:用于存储容器镜像的镜像仓库的名称。该仓库必须已经存在,且用户必须对其有 SERVICE WRITE 权限(或 OWNERSHIP)。ingress_enabled
:如果为 True,则服务可通过 HTTP 端点访问。要创建端点,用户必须拥有 BIND SERVICE ENDPOINT 权限。gpu_requests
:用于指定 GPUs 的数量的字符串。对于可在 CPU 或 GPU 上运行的模型,该实参决定模型是在 CPU 还是 GPUs 上运行。如果模型属于只能在 CPU 上运行的已知类型(例如,scikit-learn 模型),则镜像构建会在请求 GPUs 时失败。
本示例只显示了必需的和最常用的实参。有关实参的完整列表,请参阅 ModelVersion API 参考。
默认服务配置¶
Inference Server 使用适用于大多数用例且易于使用的默认值,开箱即用。这些设置如下:
工作线程数:对于 CPU 支持的模型,则服务器使用两倍数量的 CPUs 与一个工作进程。GPU 支持的模型使用一个工作进程。您可以在
create_service
调用中使用num_workers
实参对其进行替换。线程安全性:某些模型不是线程安全模型。因此,服务会为每个工作进程加载一个单独的模型副本。这可能会导致大型模型的资源枯竭。
节点利用率:默认情况下,一个推理服务器实例会请求其运行的所有 CPU 和节点内存,从而请求整个节点。要自定义每个实例的资源分配,请使用实参,例如
cpu_requests
、memory_requests
和gpu_requests
。端点:推理端点命名为
inference
并使用端口 5000。这些无法自定义。
容器镜像构建行为¶
默认情况下,Snowflake Model Serving 会使用将用于运行模型的同一计算池构建容器镜像。对这项任务而言,这一推理计算池很可能过于强大(例如,在构建容器镜像时没有使用 GPUs)。在大多数情况下,这不会对计算成本产生重大影响,但如果这是个问题,您可以通过指定 image_build_compute_pool
实参来选择功能较弱的计算池来构建镜像。
create_service
是一个幂等函数。多次调用并不会每次都触发镜像构建。不过,容器镜像可能会根据推理服务中的更新(包括对依赖包中漏洞的修复)进行重建。出现这种情况时,create_service
会自动触发镜像重建。
备注
使用 Snowpark ML 建模类 开发的模型无法部署到具有 GPU 的环境。解决方法是,您可以提取原生模型并进行部署。例如:
# Train a model using Snowpark ML from snowflake.ml.modeling.xgboost import XGBRegressor regressor = XGBRegressor(...) regressor.fit(training_df) # Extract the native model xgb_model = regressor.to_xgboost() # Test the model with pandas dataframe xgb_model.predict(pandas_test_df) pandas_test_df = test_df.select(['FEATURE1', 'FEATURE2', ...]).to_pandas() # Log the model in Snowflake Model Registry mv = reg.log_model(xgb_model, model_name="my_native_xgb_model", sample_input_data=pandas_test_df, comment = 'A native XGB model trained from Snowflake Modeling API', ) # Now we should be able to deploy to a GPU compute pool on SPCS mv.create_service( service_name="my_service_gpu", service_compute_pool="my_gpu_pool", image_repo="my_repo", max_instances=1, gpu_requests="1", )
使用部署到 SPCS 的模型¶
您可以使用 SQL、Python 或 HTTP 端点调用模型的方法。
SQL¶
将模型部署到 SPCS 时,Snowflake Model Serving 会创建服务函数。这些函数是 SQL 与在 SPCS 计算池中运行的模型之间的桥梁。系统会为模型的每个方法创建一个服务函数,它们的名称类似于 model_name_method_name
。例如,如果模型有两个名为 PREDICT
和 EXPLAIN
的方法,并被部署到名为 MY_SERVICE
的服务中,那么生成的服务函数就是 MY_SERVICE_PREDICT
和 MY_SERVICE_EXPLAIN
。
备注
服务函数包含在服务中。因此,它们只有一个访问控制点,即服务。您不能对单个服务中的不同函数拥有不同的访问控制权限。
在 SQL 中调用模型的服务函数时,需要使用如下代码:
-- See signature of the inference function in SQL.
SHOW FUNCTIONS IN MODEL my_native_xgb_model VERSION ...;
-- Call the inference function in SQL following the same signature (from `arguments` column of the above query)
SELECT MY_SERVICE_PREDICT(feature1, feature2, ...) FROM input_data_table;
Python¶
使用模型版本对象的 run
方法调用服务的方法,同时使用 service_name
实参来以指定方法在哪种服务中运行。例如:
# Get signature of the inference function in Python
mv.show_functions()
# Call the function in Python
service_prediction = mv.run(
test_df,
function_name="predict",
service_name="my_service")
如果您没有包含 service_name
实参,模型将在仓库中运行。
HTTP 端点¶
在启用 Ingress 的情况下部署服务会创建一个 HTTP 端点,您可以通过该端点调用服务。您可以使用 ShOW ENDPOINTS IN SERVICE 命令找到该端点。
SHOW ENDPOINTS IN SERVICE my_service;
请注意 ingress_url
列,它应该类似于 random_str-account-id.snowflakecomputing.app
。
要了解有关使用此端点的更多信息,请参阅 SPCS 教程 创建 Snowpark Container Services 服务 以及开发者指南中的主题 使用来自 Snowflake 外部的服务。有关所需数据格式的更多信息,请参阅 远程服务输入和输出数据格式。
有关使用模型服务 HTTP 端点的示例,请参阅 为 GPU 支持的推理部署 Hugging Face 语句转换器。
管理服务¶
Snowpark Container Services 提供用于管理服务的 SQL 界面。您可以像管理任何其他 SPCS 服务一样,对由 Snowflake Model Serving 创建的 SPCS 服务使用 DESCRIBE SERVICE 和 ALTER SERVICE 命令。例如,您可以:
更改服务的 MIN_INSTANCES 和其他属性
删除服务
将服务共享到另一个账户
更改服务的所有权(新所有者必须有权访问模型)
备注
如果服务的所有者因任何原因而失去了对底层模型的访问权限,服务将在重新启动后停止工作。它在重新启动之前会继续运行。
为确保可重现性和可调试性,您不能更改现有推理服务的规范。但您可以复制规范,对其进行定制,然后使用定制的规范创建自己的服务来托管模型。不过,这种方法不能防止底层模型被删除。通常最好允许 Snowflake Model Serving 创建服务。
暂停服务¶
在使用完一个模型之后,或者如果没有使用,暂停服务以节省成本是一种好的做法。您可以使用 ALTER SERVICE 命令实现这一目的。
ALTER SERVICE my_service SUSPEND;
服务会在收到请求时自动重新启动,但会受到计划和启动延迟的影响。计划延迟取决于计算池的可用性,启动延迟则取决于模型的大小。
删除模型¶
您可以像往常一样使用 SQL 界面或 Python API 管理模型和模型版本,但有一个限制条件,即不能删除服务正在使用的模型或模型版本(无论是运行中的还是已暂停的)。要删除某个模型或模型版本,请先删除服务。
示例¶
这些示例假定您已经创建了计算池和镜像仓库,并根据需要授予了权限。有关详细信息,请参阅 先决条件。
为 CPU 支持的推理部署 XGBoost 模型¶
THe 以下代码说明了在 SPCS 中部署 XGBoost 推理模型,然后使用所部署的模型进行推理的关键步骤。此示例中的笔记本 可用 (https://github.com/rajshah4/snowflake-notebooks/blob/main/Forecasting_ChicagoBus/Snowpark_Forecasting_Bus_FeatureStore.ipynb)。
from snowflake.ml.registry import registry
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session
from snowflake.ml.modeling.xgboost import XGBRegressor
# your model training code here output of which is a trained xgb_model
# Open model registry
reg = registry.Registry(session=session, database_name='my_registry_db', schema_name='my_registry_schema')
# Log the model in Snowflake Model Registry
model_ref = reg.log_model(
model_name="my_xgb_forecasting_model",
version_name="v1",
model=xgb_model,
conda_dependencies=["scikit-learn","xgboost"],
sample_input_data=train,
comment="XGBoost model for forecasting customer demand"
)
# Deploy the model to SPCS
reg_model.create_service(
service_name="ForecastModelServicev1",
service_compute_pool="my_cpu_pool",
image_repo="my_db.data.my_images",
ingress_enabled=True)
# See all services running a model
reg_model.list_services()
# Run on SPCS
reg_model.run(input_data, function_name="predict", service_name="ForecastModelServicev1")
# Delete the service
reg_model.delete_service("ForecastModelServicev1")
为 GPU 支持的推理部署 Hugging Face 语句转换器¶
这段代码会训练和部署一个 Hugging Face 语句转换器,包括一个 HTTP 端点。
from snowflake.ml.registry import registry
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session
from sentence_transformers import SentenceTransformer
session = Session.builder.configs(SnowflakeLoginOptions("connection_name")).create()
reg = registry.Registry(session=session, database_name='my_registry_db', schema_name='my_registry_schema')
# Take an example sentence transformer from HF
embed_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
# Have some sample input data
input_data = [
"This is the first sentence.",
"Here's another sentence for testing.",
"The quick brown fox jumps over the lazy dog.",
"I love coding and programming.",
"Machine learning is an exciting field.",
"Python is a popular programming language.",
"I enjoy working with data.",
"Deep learning models are powerful.",
"Natural language processing is fascinating.",
"I want to improve my NLP skills.",
]
# Log the model with pip dependencies
pip_model = reg.log_model(
embed_model,
model_name="sentence_transformer_minilm",
version_name='pip',
sample_input_data=input_data, # Needed for determining signature of the model
pip_requirements=["sentence-transformers", "torch", "transformers"], # If you want to run this model in the Warehouse, you can use conda_dependencies instead
)
# Force Snowflake to not try to check warehouse
conda_forge_model = reg.log_model(
embed_model,
model_name="sentence_transformer_minilm",
version_name='conda_forge_force',
sample_input_data=input_data,
# setting any package from conda-forge is sufficient to know that it can't be run in warehouse
conda_dependencies=["sentence-transformers", "conda-forge::pytorch", "transformers"]
)
# Deploy the model to SPCS
pip_model.create_service(
service_name="my_minilm_service",
service_compute_pool="my_gpu_pool", # Using GPU_NV_S - smallest GPU node that can run the model
image_repo="my_db.data.my_images",
ingress_enabled=True,
gpu_requests="1", # Model fits in GPU memory; only needed for GPU pool
max_instances=4, # 4 instances were able to run 10M inferences from an XS warehouse
)
# See all services running a model
pip_model.list_services()
# Run on SPCS
pip_model.run(input_data, function_name="encode", service_name="my_minilm_service")
# Delete the service
pip_model.delete_service("my_minilm_service")
在 SQL 中,您可以按如下方式调用服务函数:
SELECT my_minilm_service_encode('This is a test sentence.');
由于此模型已启用入口,因此您可以按如下方式调用其 HTTP 端点。
import json
from pprint import pprint
import requests
import snowflake.connector
# Generate right header
# Note that, ideally user should use key-pair authentication for API access (see this).
def initiate_snowflake_connection():
connection_parameters = SnowflakeLoginOptions("connection_name")
connection_parameters["session_parameters"] = {"PYTHON_CONNECTOR_QUERY_RESULT_FORMAT": "json"}
snowflake_conn = snowflake.connector.connect(**connection_parameters)
return snowflake_conn
def get_headers(snowflake_conn):
token = snowflake_conn._rest._token_request('ISSUE')
headers = {'Authorization': f'Snowflake Token=\"{token["data"]["sessionToken"]}\"'}
return headers
headers = get_headers(initiate_snowflake_connection())
# Put the endpoint url with method name
URL='https://<random_str>-<account>.snowflakecomputing.app/encode'
# Prepare data to be sent
data = {
'data': []
}
for idx, x in enumerate(input_data):
data['data'].append([idx, x])
# Send over HTTP
def send_request(data: dict):
output = requests.post(URL, json=data, headers=headers)
assert (output.status_code == 200), f"Failed to get response from the service. Status code: {output.status_code}"
return output.content
# Test
results = send_request(data=data)
pprint(json.loads(results))
为 GPU 支持的推理部署 PyTorch 模型¶
请参阅 本快速指南 (https://quickstarts.snowflake.com/guide/getting-started-with-running-distributed-pytorch-models-on-snowflake/index.html?index=..%2F..index#0),了解有关训练 PyTorch 深度学习推荐模型 (DLRM) 并将其部署到 SPCS 以进行 GPU 推理的示例。
最佳实践¶
- 共享镜像仓库
多个用户或角色使用同一模型的情况很常见。使用单个镜像仓库可以一次性构建镜像并供所有用户重复使用,从而节省时间和费用。所有将使用该仓库的角色都需要拥有该仓库的 SERVICE READ、SERVICE WRITE、READ 和 WRITE 权限。由于可能需要重建镜像以更新依赖项,因此应保留写入权限;不要在初始构建镜像后撤销写入权限。
- 扩缩推理服务
Snowpark Container Services 的自动扩缩功能非常保守,对于大多数 ML 工作负载而言,其纵向扩展和缩减的速度都不够快。因此,Snowflake 建议您将 MIN_INSTANCES 和 MAX_INSTANCES 设置为相同的值,选择这些值可获得典型工作负载所需的性能。使用如下 SQL 语句:
ALTER SERVICE myservice SET MIN_INSTANCES = <new_num> MAX_INSTANCES = <new_num>;
出于同样的原因,在最初使用 Python API 创建服务时,
create_service
方法只接受max_instances
,并将相同的值用于min_instances
。- 选择节点类型和实例数量
使用内存可满足模型需求的最小 GPU 节点。通过增加实例数量进行扩展,而不是在更大的 GPU 节点中增加
num_workers
。例如,如果模型适合 GPU_NV_S 实例类型,则使用gpu_requests=1
,然后通过增加max_instances
来进行纵向扩展,而不是在更大的 GPU 实例上使用gpu_requests
和num_workers
的组合。- 选择仓库大小
仓库越大,向推理服务器发送的并行请求就越多。推理是一项昂贵的操作,因此尽可能使用较小的仓库。使用大于中等规模的仓库不会提高查询性能,而且还会产生额外费用。
- 用于模型部署的独立架构
在创建服务时,会创建多个架构级对象(服务本身和每个模型函数的一个服务函数)。为避免混乱,请使用单独的架构来存储模型 (Snowflake Model Registry) 和部署模型 (Snowflake Model Serving)。
故障排除¶
监控 SPCS 部署¶
您可以使用以下 SQL 查询检查正在启动的服务,从而监控部署情况。
SHOW SERVICES IN COMPUTE POOL my_compute_pool;
启动两项作业:
MODEL_BUILD_xxxxxx:名称的最后字符是随机的,旨在避免名称冲突。此作业会构建镜像,并在镜像构建完成后结束。如果镜像已经存在,则跳过该作业。
日志对于调试包依赖项冲突等问题非常有用。要查看此作业的日志,请运行下面的 SQL,确保使用相同的最后字符:
CALL SYSTEM$GET_SERVICE_LOGS('MODEL_BUILD_xxxxx', 0, 'model-build');MYSERVICE:调用
create_service
时指定的服务名称。如果 MODEL_BUILD 作业成功或被跳过,则启动此作业。要查看此作业的日志,请运行下面的 SQL:CALL SYSTEM$GET_SERVICE_LOGS('MYSERVICE', 0, 'model-inference');
包冲突¶
有两个系统决定了安装在服务容器中的包:模型本身和推理服务器。为了减少与模型依赖项的冲突,推理服务器只需要以下包:
gunicorn<24.0.0
starlette<1.0.0
uvicorn-standard<1.0.0
请确保您的模型依赖项以及上述依赖项可通过 pip
或 :code:`conda`(无论您使用哪种)解析。
如果模型同时设置了 conda_dependencies
和 pip_requirements
,则将通过 conda 按如下方式安装:
通道:
conda-forge
nodefaults
依赖项:
all_conda_packages
- pip:
all_pip_packages
默认情况下,Snowflake 使用 conda-forge 获取 Anaconda 包,因为 Snowflake conda 通道在仓库中可用,而 defaults
通道要求用户接受 Anaconda 使用条款。要从默认设置通道指定包,请包含包名称:defaults::pkg_name
。
服务内存不足¶
有些模型不是线程安全模型,因此 Snowflake 会在内存中为每个工作进程加载一个模型副本。这可能会导致具有较多工作线程的大型模型出现内存不足的情况。可尝试减少 num_workers
。
查询性能不理想¶
通常,推理服务中的实例数量会成为瓶颈。可尝试在部署模型时为 max_instances
传递一个更大的值。
无法更改服务规范¶
使用 ALTER SERVICE 无法更改模型构建和推理服务的规范。您只能更改 TAG、MIN_INSTANCES 等属性。
不过,由于镜像已发布在镜像仓库中,因此您可以复制规范、修改它并从中创建一个新服务,然后手动启动它。