示例工作流程

本页提供了使用 Snowpark Container Services (SPCS) 部署机器学习模型以进行实时推理的示例工作流程。每个示例都演示了从模型注册到部署和推理的完整生命周期。

其中包括:

  • 如何通过 HTTP 端点创建服务、进行预测和访问模型。

  • 如何使用不同的模型架构(XGBoost、Hugging Face 转换器、PyTorch)和计算选项(CPU 和 GPU)。

为 CPU 支持的推理部署 XGBoost 模型

以下代码:

  • 在 SPCS 中部署用于推理的 XGBoost 模型

  • 使用部署的模型进行推理。

from snowflake.ml.registry import registry
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session

from xgboost import XGBRegressor

# your model training code here output of which is a trained xgb_model

# Open model registry
reg = registry.Registry(session=session, database_name='my_registry_db', schema_name='my_registry_schema')

# Log the model in Snowflake Model Registry
model_ref = reg.log_model(
    model_name="my_xgb_forecasting_model",
    version_name="v1",
    model=xgb_model,
    conda_dependencies=["scikit-learn","xgboost"],
    sample_input_data=pandas_test_df,
    comment="XGBoost model for forecasting customer demand"
)

# Deploy the model to SPCS
model_ref.create_service(
    service_name="forecast_model_service",
    service_compute_pool="my_cpu_pool",
    ingress_enabled=True)

# See all services running a model
model_ref.list_services()

# Run on SPCS
model_ref.run(pandas_test_df, function_name="predict", service_name="forecast_model_service")

# Delete the service
model_ref.delete_service("forecast_model_service")

通过 HTTP 调用(外部应用程序)

由于此模型已启用入口 (ingress_enabled=True),因此您可以调用其公共 HTTP 端点。以下示例使用存储在环境变量 PAT_TOKEN 中的 PAT 对公共 Snowflake 端点进行身份验证:

import os
import json
import numpy as np
from pprint import pprint
import requests

def get_headers(pat_token):
    headers = {'Authorization': f'Snowflake Token="{pat_token}"'}
    return headers

headers = get_headers(os.getenv("PAT_TOKEN"))

# Put the endpoint url with method name `predict`
# The endpoint url can be found with `show endpoints in service <service_name>`.
URL = 'https://<random_str>-<organization>-<account>.snowflakecomputing.app/predict'

# Prepare data to be sent
data = {"data": np.column_stack([range(pandas_test_df.shape[0]), pandas_test_df.values]).tolist()}

# Send over HTTP
def send_request(data: dict):
    output = requests.post(URL, json=data, headers=headers)
    assert (output.status_code == 200), f"Failed to get response from the service. Status code: {output.status_code}"
    return output.content

# Test
results = send_request(data=data)
print(json.loads(results))

为 GPU 支持的推理部署 Hugging Face 语句转换器

以下代码会训练和部署一个 Hugging Face 语句转换器,包括一个 HTTP 端点。

此示例需要 sentence-transformers 包、GPU 计算池和镜像仓库。

from snowflake.ml.registry import registry
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session
from sentence_transformers import SentenceTransformer

session = Session.builder.configs(SnowflakeLoginOptions("connection_name")).create()
reg = registry.Registry(session=session, database_name='my_registry_db', schema_name='my_registry_schema')

# Take an example sentence transformer from HF
embed_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

# Have some sample input data
input_data = [
    "This is the first sentence.",
    "Here's another sentence for testing.",
    "The quick brown fox jumps over the lazy dog.",
    "I love coding and programming.",
    "Machine learning is an exciting field.",
    "Python is a popular programming language.",
    "I enjoy working with data.",
    "Deep learning models are powerful.",
    "Natural language processing is fascinating.",
    "I want to improve my NLP skills.",
]

# Log the model with pip dependencies
pip_model = reg.log_model(
    embed_model,
    model_name="sentence_transformer_minilm",
    version_name="pip",
    sample_input_data=input_data,  # Needed for determining signature of the model
    pip_requirements=["sentence-transformers", "torch", "transformers"], # If you want to run this model in the Warehouse, you can use conda_dependencies instead
)

# Force Snowflake to not try to check warehouse
conda_forge_model = reg.log_model(
    embed_model,
    model_name="sentence_transformer_minilm",
    version_name="conda_forge_force",
    sample_input_data=input_data,
    # setting any package from conda-forge is sufficient to know that it can't be run in warehouse
    conda_dependencies=["sentence-transformers", "conda-forge::pytorch", "transformers"]
)

# Deploy the model to SPCS
pip_model.create_service(
    service_name="my_minilm_service",
    service_compute_pool="my_gpu_pool",  # Using GPU_NV_S - smallest GPU node that can run the model
    ingress_enabled=True,
    gpu_requests="1", # Model fits in GPU memory; only needed for GPU pool
    max_instances=4, # 4 instances were able to run 10M inferences from an XS warehouse
)

# See all services running a model
pip_model.list_services()

# Run on SPCS
pip_model.run(input_data, function_name="encode", service_name="my_minilm_service")

# Delete the service
pip_model.delete_service("my_minilm_service")

在 SQL 中,您可以按如下方式调用服务函数:

SELECT my_minilm_service!encode('This is a test sentence.');

同样,您也可以按如下方式调用其 HTTP 端点。

import json
from pprint import pprint
import requests

# Put the endpoint url with method name `encode`
URL='https://<random_str>-<account>.snowflakecomputing.app/encode'

# Prepare data to be sent
data = {
    'data': []
}
for idx, x in enumerate(input_data):
    data['data'].append([idx, x])

# Send over HTTP
def send_request(data: dict):
    output = requests.post(URL, json=data, headers=headers)
    assert (output.status_code == 200), f"Failed to get response from the service. Status code: {output.status_code}"
    return output.content

# Test
results = send_request(data=data)
pprint(json.loads(results))

为 GPU 支持的推理部署 PyTorch 模型

有关训练 PyTorch 深度学习推荐模型 (DLRM) 并将其部署到 SPCS 以进行 GPU 推理的示例,请参阅本 快速入门 (https://quickstarts.snowflake.com/guide/snowpark-container-services-model-serving-guide/)

部署 Snowpark ML 建模模型

使用 Snowpark ML 建模类开发的模型无法部署到具有 GPU 的环境。解决方法是,您可以提取原生模型并进行部署。例如:

# Train a model using Snowpark ML
from snowflake.ml.modeling.xgboost import XGBRegressor
regressor = XGBRegressor(...)
regressor.fit(training_df)

# Extract the native model
xgb_model = regressor.to_xgboost()
# Test the model with pandas dataframe
pandas_test_df = test_df.select(['FEATURE1', 'FEATURE2', ...]).to_pandas()
xgb_model.predict(pandas_test_df)

# Log the model in Snowflake Model Registry
mv = reg.log_model(xgb_model,
                   model_name="my_native_xgb_model",
                   sample_input_data=pandas_test_df,
                   comment = 'A native XGB model trained from Snowflake Modeling API',
                   )
# Now we should be able to deploy to a GPU compute pool on SPCS
mv.create_service(
    service_name="my_service_gpu",
    service_compute_pool="my_gpu_pool",
    image_repo="my_repo",
    max_instances=1,
    gpu_requests="1",
)