Snowflake Native Batch Inference (SQL)

使用 Snowflake Model Registry 对模型执行批量推理调用。您可以将这些批量推理调用集成到 Snowflake 工作流程中。使用 Snowflake Native Batch Inference,您可以执行以下操作:

  • 集成到 Snowflake SQL、Snowpark Python、Streaming 和动态表工作流程中。

  • 使用第三方工具(如 dbt)处理推理调用的结果

在哪里运行模型?

选择运行时环境

您可以在虚拟仓库上托管模型,也可以在 SPCS 计算池上托管模型。使用以下信息来确定托管模型的位置。

运行时

最适合用于...

避免以下情况...

虚拟仓库

• In-Database Batch Inference: Executing models as native SQL functions.
• Zero-Ops Experience: Leveraging existing warehouses without managing compute pools.
• Small Models: CPU-runnable models (e.g., scikit-learn, XGBoost).
• Hardware Constraints: The model requires a GPU for execution.
• Memory Limits: Model size exceeds 15GB. (this limit is lower for smaller warehouse sizes).

Snowpark Container Services (SPCS)

• Large Models: Optimized for LLMs, deep learning models requiring high memory, or models requiring GPUs.
• Custom Environments: For specific pip packages or a custom OS-level environment not found in standard warehouses.
• Organizational Policy: SPCS is not yet approved or enabled in your account.
• Sufficient warehouse compute: If your warehouse can process your batch inference requests, you don't need the additional compute from SPCS.

在仓库上运行模型

要在仓库上托管模型,请在 target_platforms 实参中指定 WAREHOUSE 以记录模型。有关更多信息,请参阅使用依赖项和目标平台。

有关现有模型是否可在仓库中运行的信息,请运行 SHOW VERSIONS IN MODEL。如果 runnable_in 列的值有 WAREHOUSE,则可以运行。

在 SPCS 上运行模型

要在 SPCS 中使用您的模型,必须将模型部署为服务。有关将模型部署到 SPCS 的更多信息,请参阅部署模型以进行在线推理。确保自动暂停处于活动状态。

备注

如果服务已暂停,当有推理请求时,它会自动恢复。但是,如果服务无法在指定时间内恢复,查询可能会失败。如果计算池中缺少可用节点,查询可能无法恢复。为了降低这种风险,您可以显式恢复服务并等待其可用。

使用 XSMALL 或 SMALL 仓库,将推理请求路由到 SPCS 计算池。仓库可以在每个节点上运行多个线程,并随每个请求发送大量推理请求。因此,在 SPCS 中运行的服务很容易过载。因而,在将模型部署到 SPCS 时,建议使用 XSMALL 或 SMALL 仓库。

来自 Python 的推理

如果您使用 Snowflake Python API 来发出推理请求,则必须拥有 snowflake-ml-python 包。

连接到 Model Registry

从模型注册表中检索用于推理请求的模型。使用以下代码检索模型:

from snowflake.ml.registry import Registry

registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version')  # returns ModelVersion
Copy

运行批量推理作业

使用模型版本对象的 run 方法运行批量推理作业。使用 run 方法,您可以执行以下操作:

  • 在仓库或 SPCS 计算池上上运行推理作业。

  • 提供包含推理数据的 Snowpark 或 pandas 数据框。

run 方法返回一个与您指定的数据框类型匹配的数据框。例如,如果指定 pandas 数据框作为输入,则会获得 pandas 数据框作为输出。

备注

Snowpark DataFrames 进行惰性求值。仅在 DataFrame 的 collect、show 或 to_pandas 方法上执行。

以下示例在仓库上运行批量推理作业:

# Call inference for predict function
# mv: snowflake.ml.model.ModelVersion
remote_prediction = mv.run(input_features, function_name="predict")
remote_prediction.show()   # assuming test_features is Snowpark DataFrame
Copy

要查看可以从模型调用的方法,请运行 mv.show_functions。此方法的返回值是一个 ModelFunctionInfo 对象列表。这些对象中的每一个都包括以下属性:

  • name:可从 Python 或 SQL 调用的函数的名称。

  • target_method:原始登记的模型中 Python 方法的名称。

# Get signature of the inference function in Python
# mv: snowflake.ml.model.ModelVersion
mv.show_functions()
Copy

在 run 方法中,指定您自己的输入特征,并将 function_name 设置为“predict”。

以下示例在 SPCS 上运行批量推理作业。

# Call inference for predict function
# mv: snowflake.ml.model.ModelVersion
remote_prediction = mv.run(test_features, function_name="predict", service_name="example_spcs_service")
remote_prediction.show()   # assuming test_features is Snowpark DataFrame
Copy

在 run 方法中,指定您自己的输入特征,将 function_name 设置为“predict”,并将 service_name 设置为您 SPCS 服务的名称。

remote_prediction.show() 显示输出数据框。

该方法在用于连接注册表的会话中指定的 Snowflake 仓库中执行。请参阅指定仓库。

来自 SQL 的推理

使用以下命令了解模型版本的可用函数和签名:

SHOW FUNCTION IN MODEL mymodel VERSION myversion;
Copy

在仓库上运行模型

使用 MODEL(model_name)!method_name(...) 语法来调用模型方法。模型上可用的方法由底层 Python 模型类确定。例如,许多类型的模型都使用名为 predict 的方法进行推理。

要调用默认模型的方法,请使用以下语法。在括号中包含任何方法实参,并在 FROM 子句中指定包含推理数据的表。

SELECT MODEL(<model_name>)!<method_name>(...) FROM <table_name>;
Copy

要从模型的特定版本调用方法,请为模型的特定版本创建别名,然后通过别名调用该方法。

使用以下语法从模型的特定版本调用方法。

SELECT MODEL(<model_name>,<version_or_alias_name>)!<method_name>(...) FROM <table_name>;
Copy

以下示例使用 LAST 别名调用模型的最新版本。

SELECT MODEL(my_model,LAST)!predict(...) FROM my_table;
Copy

在 SPCS 上运行模型

与在仓库中运行不同,可以通过调用 service_name!method_name(...) 从服务中调用函数。

SELECT <mservice_name>!<method_name>(...) FROM <table_name>;
Copy

使用动态表进行持续模型推理

Snowflake 的动态表对流数据建立了一个持续的转换层。通过定义一个将机器学习模型预测应用于传入数据的动态表,可以针对数据建立一个持续的自动运行模型推理管道,而无需手动编排。

例如,有一个填充表 (LOGINS_RAW) 的登录事件流,其中包括以下列:USER_ID、LOCATION 和时间戳。此表随后会更新模型对新到达事件的登录风险预测。重要的是,只有新行根据模型的预测进行处理。

SQL

动态表为 Snowflake 用户提供了强大的功能,可以对传入数据执行增量推理。使用 SQL 定义一个动态表,引用模型并将其应用于 LOGINS_RAW 中的新传入行。

CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
    WAREHOUSE = my_wh
    TARGET_LAG = '20 minutes'
    REFRESH_MODE = INCREMENTAL
    INITIALIZE = on_create
    COMMENT = 'Dynamic table with continuously updated model predictions'
AS
SELECT
    login_id,
    user_id,
    location,
    event_time,
    MODEL(ml.registry.mymodel)!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw;
Copy

Snowpark Python

Snowpark Python API 允许您以编程方式访问模型注册表,并直接在 DataFrames 上运行推理。此方法可以更加灵活和可维护,尤其在代码驱动的环境中。

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry

# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")

# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")

# Load the source data
df_raw = sp_session.table("LOGINS_RAW")

# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))

# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])

# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
    name="LOGINS_WITH_PREDICTIONS",
    warehouse="MY_WH",
    lag='20 minutes',
    refresh_mode='INCREMENTAL',
    initialize="ON_CREATE",
    comment="Dynamic table continuously updated with model predictions"
)
Copy

上述代码示例将每 20 分钟自动使用 MYMODEL 对 LOGINS_RAW 中的新数据运行推理。

不可变与可变

这种增量至关重要,需要将动态表定义中调用的所有函数指定为 IMMUTABLE。标准模型中的函数通常为 IMMUTABLE,而自定义模型中则默认为 VOLATILE。如果已知底层模型不可变,则确保登记模型时将相应的模型函数显式标记为不可变至关重要。

语言: 中文