Snowflake Native Batch Inference (SQL)¶
使用 Snowflake Model Registry 对模型执行批量推理调用。您可以将这些批量推理调用集成到 Snowflake 工作流程中。使用 Snowflake Native Batch Inference,您可以执行以下操作:
集成到 Snowflake SQL、Snowpark Python、Streaming 和动态表工作流程中。
使用第三方工具(如 dbt)处理推理调用的结果
在哪里运行模型?¶
选择运行时环境¶
您可以在虚拟仓库上托管模型,也可以在 SPCS 计算池上托管模型。使用以下信息来确定托管模型的位置。
运行时 |
最适合用于... |
避免以下情况... |
|---|---|---|
虚拟仓库 |
• In-Database Batch Inference: Executing models as native SQL functions.
• Zero-Ops Experience: Leveraging existing warehouses without managing compute pools.
• Small Models: CPU-runnable models (e.g., scikit-learn, XGBoost).
|
• Hardware Constraints: The model requires a GPU for execution.
• Memory Limits: Model size exceeds 15GB. (this limit is lower for smaller warehouse sizes).
|
Snowpark Container Services (SPCS) |
• Large Models: Optimized for LLMs, deep learning models requiring high memory, or models requiring GPUs.
• Custom Environments: For specific pip packages or a custom OS-level environment not found in standard warehouses.
|
• Organizational Policy: SPCS is not yet approved or enabled in your account.
• Sufficient warehouse compute: If your warehouse can process your batch inference requests, you don't need the additional compute from SPCS.
|
在仓库上运行模型¶
要在仓库上托管模型,请在 target_platforms 实参中指定 WAREHOUSE 以记录模型。有关更多信息,请参阅使用依赖项和目标平台。
有关现有模型是否可在仓库中运行的信息,请运行 SHOW VERSIONS IN MODEL。如果 runnable_in 列的值有 WAREHOUSE,则可以运行。
在 SPCS 上运行模型¶
要在 SPCS 中使用您的模型,必须将模型部署为服务。有关将模型部署到 SPCS 的更多信息,请参阅部署模型以进行在线推理。确保自动暂停处于活动状态。
备注
如果服务已暂停,当有推理请求时,它会自动恢复。但是,如果服务无法在指定时间内恢复,查询可能会失败。如果计算池中缺少可用节点,查询可能无法恢复。为了降低这种风险,您可以显式恢复服务并等待其可用。
使用 XSMALL 或 SMALL 仓库,将推理请求路由到 SPCS 计算池。仓库可以在每个节点上运行多个线程,并随每个请求发送大量推理请求。因此,在 SPCS 中运行的服务很容易过载。因而,在将模型部署到 SPCS 时,建议使用 XSMALL 或 SMALL 仓库。
来自 Python 的推理¶
如果您使用 Snowflake Python API 来发出推理请求,则必须拥有 snowflake-ml-python 包。
连接到 Model Registry¶
从模型注册表中检索用于推理请求的模型。使用以下代码检索模型:
from snowflake.ml.registry import Registry
registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version') # returns ModelVersion
运行批量推理作业¶
使用模型版本对象的 run 方法运行批量推理作业。使用 run 方法,您可以执行以下操作:
在仓库或 SPCS 计算池上上运行推理作业。
提供包含推理数据的 Snowpark 或 pandas 数据框。
run 方法返回一个与您指定的数据框类型匹配的数据框。例如,如果指定 pandas 数据框作为输入,则会获得 pandas 数据框作为输出。
备注
Snowpark DataFrames 进行惰性求值。仅在 DataFrame 的 collect、show 或 to_pandas 方法上执行。
以下示例在仓库上运行批量推理作业:
# Call inference for predict function
# mv: snowflake.ml.model.ModelVersion
remote_prediction = mv.run(input_features, function_name="predict")
remote_prediction.show() # assuming test_features is Snowpark DataFrame
要查看可以从模型调用的方法,请运行 mv.show_functions。此方法的返回值是一个 ModelFunctionInfo 对象列表。这些对象中的每一个都包括以下属性:
name:可从 Python 或 SQL 调用的函数的名称。
target_method:原始登记的模型中 Python 方法的名称。
# Get signature of the inference function in Python
# mv: snowflake.ml.model.ModelVersion
mv.show_functions()
在 run 方法中,指定您自己的输入特征,并将 function_name 设置为“predict”。
以下示例在 SPCS 上运行批量推理作业。
# Call inference for predict function
# mv: snowflake.ml.model.ModelVersion
remote_prediction = mv.run(test_features, function_name="predict", service_name="example_spcs_service")
remote_prediction.show() # assuming test_features is Snowpark DataFrame
在 run 方法中,指定您自己的输入特征,将 function_name 设置为“predict”,并将 service_name 设置为您 SPCS 服务的名称。
remote_prediction.show() 显示输出数据框。
该方法在用于连接注册表的会话中指定的 Snowflake 仓库中执行。请参阅指定仓库。
来自 SQL 的推理¶
使用以下命令了解模型版本的可用函数和签名:
SHOW FUNCTION IN MODEL mymodel VERSION myversion;
在仓库上运行模型¶
使用 MODEL(model_name)!method_name(...) 语法来调用模型方法。模型上可用的方法由底层 Python 模型类确定。例如,许多类型的模型都使用名为 predict 的方法进行推理。
要调用默认模型的方法,请使用以下语法。在括号中包含任何方法实参,并在 FROM 子句中指定包含推理数据的表。
SELECT MODEL(<model_name>)!<method_name>(...) FROM <table_name>;
要从模型的特定版本调用方法,请为模型的特定版本创建别名,然后通过别名调用该方法。
使用以下语法从模型的特定版本调用方法。
SELECT MODEL(<model_name>,<version_or_alias_name>)!<method_name>(...) FROM <table_name>;
以下示例使用 LAST 别名调用模型的最新版本。
SELECT MODEL(my_model,LAST)!predict(...) FROM my_table;
在 SPCS 上运行模型¶
与在仓库中运行不同,可以通过调用 service_name!method_name(...) 从服务中调用函数。
SELECT <mservice_name>!<method_name>(...) FROM <table_name>;
使用动态表进行持续模型推理¶
Snowflake 的动态表对流数据建立了一个持续的转换层。通过定义一个将机器学习模型预测应用于传入数据的动态表,可以针对数据建立一个持续的自动运行模型推理管道,而无需手动编排。
例如,有一个填充表 (LOGINS_RAW) 的登录事件流,其中包括以下列:USER_ID、LOCATION 和时间戳。此表随后会更新模型对新到达事件的登录风险预测。重要的是,只有新行根据模型的预测进行处理。
SQL¶
动态表为 Snowflake 用户提供了强大的功能,可以对传入数据执行增量推理。使用 SQL 定义一个动态表,引用模型并将其应用于 LOGINS_RAW 中的新传入行。
CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
WAREHOUSE = my_wh
TARGET_LAG = '20 minutes'
REFRESH_MODE = INCREMENTAL
INITIALIZE = on_create
COMMENT = 'Dynamic table with continuously updated model predictions'
AS
SELECT
login_id,
user_id,
location,
event_time,
MODEL(ml.registry.mymodel)!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw;
Snowpark Python¶
Snowpark Python API 允许您以编程方式访问模型注册表,并直接在 DataFrames 上运行推理。此方法可以更加灵活和可维护,尤其在代码驱动的环境中。
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry
# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")
# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")
# Load the source data
df_raw = sp_session.table("LOGINS_RAW")
# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))
# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])
# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
name="LOGINS_WITH_PREDICTIONS",
warehouse="MY_WH",
lag='20 minutes',
refresh_mode='INCREMENTAL',
initialize="ON_CREATE",
comment="Dynamic table continuously updated with model predictions"
)
上述代码示例将每 20 分钟自动使用 MYMODEL 对 LOGINS_RAW 中的新数据运行推理。
不可变与可变¶
这种增量至关重要,需要将动态表定义中调用的所有函数指定为 IMMUTABLE。标准模型中的函数通常为 IMMUTABLE,而自定义模型中则默认为 VOLATILE。如果已知底层模型不可变,则确保登记模型时将相应的模型函数显式标记为不可变至关重要。