Snowflake Native Batch Inference (SQL)

使用 Snowflake Model Registry 对模型执行批量推理调用。您可以将这些批量推理调用集成到 Snowflake 工作流程中。使用 Snowflake Native Batch Inference,您可以执行以下操作:

  • 集成到 Snowflake SQL、Snowpark Python、Streaming 和动态表工作流程中。
  • 使用第三方工具(如 dbt)处理推理调用的结果

在哪里运行模型?

选择运行时环境

您可以在虚拟仓库上托管模型,也可以在 SPCS 计算池上托管模型。使用以下信息来确定托管模型的位置。

RuntimeBest For…Avoid When…
Virtual Warehouse• In-Database Batch Inference: Executing models as native SQL functions. • Zero-Ops Experience: Leveraging existing warehouses without managing compute pools. • Small Models: CPU-runnable models (e.g., scikit-learn, XGBoost).• Hardware Constraints: The model requires a GPU for execution. • Memory Limits: Model size exceeds 15GB. (this limit is lower for smaller warehouse sizes).
Snowpark Container Services (SPCS)• Large Models: Optimized for LLMs, deep learning models requiring high memory, or models requiring GPUs. • Custom Environments: For specific pip packages or a custom OS-level environment not found in standard warehouses.• Organizational Policy: SPCS is not yet approved or enabled in your account. • Sufficient warehouse compute: If your warehouse can process your batch inference requests, you don’t need the additional compute from SPCS. • Government Regions: SPCS-based inference is not available in government regions.

在仓库上运行模型

要在仓库上托管模型,请在 target_platforms 实参中指定 WAREHOUSE 以记录模型。有关更多信息,请参阅使用依赖项和目标平台。

有关现有模型是否可在仓库中运行的信息,请运行 SHOW VERSIONS IN MODEL。如果 runnable_in 列的值有 WAREHOUSE,则可以运行。

在 SPCS 上运行模型

要在 SPCS 中使用您的模型,必须将模型部署为服务。有关将模型部署到 SPCS 的更多信息,请参阅部署模型以进行在线推理。确保自动暂停处于活动状态。

Note

Model owners or users with the READ privilege on the model can deploy the model to a service. By default, only the service owner can invoke service functions. To let another role call the service (for SQL calls using service_name!method_name(...) syntax), grant the service role INFERENCE_SERVICE_FUNCTION_USAGE using GRANT SERVICE ROLE.

Note

如果服务已暂停,当有推理请求时,它会自动恢复。但是,如果服务无法在指定时间内恢复,查询可能会失败。如果计算池中缺少可用节点,查询可能无法恢复。为了降低这种风险,您可以显式恢复服务并等待其可用。

使用 XSMALL 或 SMALL 仓库,将推理请求路由到 SPCS 计算池。仓库可以在每个节点上运行多个线程,并随每个请求发送大量推理请求。因此,在 SPCS 中运行的服务很容易过载。因而,在将模型部署到 SPCS 时,建议使用 XSMALL 或 SMALL 仓库。

来自 Python 的推理

如果您使用 Snowflake Python API 来发出推理请求,则必须拥有 snowflake-ml-python 包。

连接到 Model Registry

从模型注册表中检索用于推理请求的模型。使用以下代码检索模型:

from snowflake.ml.registry import Registry

registry = Registry(session=session, database_name=DATABASE, schema_name=REGISTRY_SCHEMA)
mv = registry.get_model('my_model').version('my_version')  # returns ModelVersion

运行批量推理作业

使用模型版本对象的 run 方法运行批量推理作业。使用 run 方法,您可以执行以下操作:

  • 在仓库或 SPCS 计算池上上运行推理作业。
  • 提供包含推理数据的 Snowpark 或 pandas 数据框。

run 方法返回一个与您指定的数据框类型匹配的数据框。例如,如果指定 pandas 数据框作为输入,则会获得 pandas 数据框作为输出。

Note

Snowpark DataFrames 进行惰性求值。仅在 DataFrame 的 collect、show 或 to_pandas 方法上执行。

以下示例在仓库上运行批量推理作业:

# Run inference on a warehouse
# mv: snowflake.ml.model.ModelVersion
remote_prediction = mv.run(input_features, function_name="predict")
remote_prediction.show()

To run inference on SPCS instead of a warehouse, add the service_name argument to the run call:

# Run inference on SPCS
# mv: snowflake.ml.model.ModelVersion
remote_prediction = mv.run(input_features, function_name="predict", service_name="example_spcs_service")
remote_prediction.show()

要查看可以从模型调用的方法,请运行 mv.show_functions。此方法的返回值是一个 ModelFunctionInfo 对象列表。这些对象中的每一个都包括以下属性:

  • name:可从 Python 或 SQL 调用的函数的名称。
  • target_method:原始登记的模型中 Python 方法的名称。
# Get signature of the inference function in Python
# mv: snowflake.ml.model.ModelVersion
mv.show_functions()

在推理期间传递参数

If the model’s signature includes parameters defined in the ParamSpec object, you can pass parameter values at inference time using the params argument. Any parameter that isn’t included in the dictionary uses its default value from the signature. The params argument works the same way whether you are running inference on a warehouse or on SPCS.

# Pass parameters to override default values
# mv: snowflake.ml.model.ModelVersion
remote_prediction = mv.run(
    input_features,
    function_name="predict",
    params={"temperature": 0.9, "max_tokens": 512}
)

来自 SQL 的推理

使用以下命令了解模型版本的可用函数和签名:

SHOW FUNCTION IN MODEL mymodel VERSION myversion;

在仓库上运行模型

Use the MODEL(model_name)!method_name(...) syntax to call or invoke methods of a model. The methods available on a model are determined by the underlying Python model class. For example, many types of models use a method named predict for inference.

要调用默认模型的方法,请使用以下语法。在括号中包含任何方法实参,并在 FROM 子句中指定包含推理数据的表。

SELECT MODEL(<model_name>)!<method_name>(...) FROM <table_name>;

要从模型的特定版本调用方法,请为模型的特定版本创建别名,然后通过别名调用该方法。

使用以下语法从模型的特定版本调用方法。

SELECT MODEL(<model_name>,<version_or_alias_name>)!<method_name>(...) FROM <table_name>;

以下示例使用 LAST 别名调用模型的最新版本。

SELECT MODEL(my_model,LAST)!predict(...) FROM my_table;

通过 SQL 传递参数

If the model’s signature includes parameters defined with ParamSpec, you can pass parameter values as additional arguments after the input columns. Parameters can be specified by position or by name.

使用位置实参时,可从右侧省略参数,并使用函数签名中的默认值:

-- Pass all parameters positionally (temperature, then max_tokens)
SELECT MODEL(my_model, v1)!predict(input_text, 0.9, 512) FROM my_table;

-- Omit max_tokens from the right; its default value from the signature is used
SELECT MODEL(my_model, v1)!predict(input_text, 0.9) FROM my_table;

使用命名实参时,所有实参(包括输入列)都必须按名称指定。这允许您仅传递特定参数,而不管其位置如何:

SELECT MODEL(my_model, v1)!predict(
    input_text => input_text,
    max_tokens => 512
) FROM my_table;

Note

必须按名称或位置指定所有实参。不能在同一调用中混合使用位置实参和命名实参。

在 SPCS 上运行模型

Unlike running in a warehouse, functions can be called from a service by calling service_name!method_name(...).

SELECT <mservice_name>!<method_name>(...) FROM <table_name>;

参数的传递方式与仓库函数相同,要么全部按位置传递,要么全部按名称传递:

-- Positional
SELECT my_service!predict(input_text, 0.9, 512) FROM my_table;

-- Named arguments (all arguments must be named)
SELECT my_service!predict(input_text => input_text, max_tokens => 512) FROM my_table;

使用动态表进行持续模型推理

Snowflake 的动态表对流数据建立了一个持续的转换层。通过定义一个将机器学习模型预测应用于传入数据的动态表,可以针对数据建立一个持续的自动运行模型推理管道,而无需手动编排。

例如,有一个填充表 (LOGINS_RAW) 的登录事件流,其中包括以下列:USER_ID、LOCATION 和时间戳。此表随后会更新模型对新到达事件的登录风险预测。重要的是,只有新行根据模型的预测进行处理。

SQL

动态表为 Snowflake 用户提供了强大的功能,可以对传入数据执行增量推理。使用 SQL 定义一个动态表,引用模型并将其应用于 LOGINS_RAW 中的新传入行。

CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
    WAREHOUSE = my_wh
    TARGET_LAG = '20 minutes'
    REFRESH_MODE = INCREMENTAL
    INITIALIZE = on_create
    COMMENT = 'Dynamic table with continuously updated model predictions'
AS
SELECT
    login_id,
    user_id,
    location,
    event_time,
    MODEL(ml.registry.mymodel)!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw;

Snowpark Python

Snowpark Python API 允许您以编程方式访问模型注册表,并直接在 DataFrames 上运行推理。此方法可以更加灵活和可维护,尤其在代码驱动的环境中。

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry

# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")

# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")

# Load the source data
df_raw = sp_session.table("LOGINS_RAW")

# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))

# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])

# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
    name="LOGINS_WITH_PREDICTIONS",
    warehouse="MY_WH",
    lag='20 minutes',
    refresh_mode='INCREMENTAL',
    initialize="ON_CREATE",
    comment="Dynamic table continuously updated with model predictions"
)

上述代码示例将每 20 分钟自动使用 MYMODEL 对 LOGINS_RAW 中的新数据运行推理。

不可变与可变

这种增量至关重要,需要将动态表定义中调用的所有函数指定为 IMMUTABLE。标准模型中的函数通常为 IMMUTABLE,而自定义模型中则默认为 VOLATILE。如果已知底层模型不可变,则确保登记模型时将相应的模型函数显式标记为不可变至关重要。