在 Snowflake 中使用动态表实现连续流模型推理

Snowflake 的动态表为流数据提供持续的转换层。通过定义动态表,将机器学习模型的预测应用于传入数据,您可以在数据上维护自动化、持续运行的模型推理管道,而无需手动编排。

例如,考虑到达表的登录事件流 (LOGINS_RAW),包含列(包括 USER_IDLOCATION)和时间戳。该表更新了模型对最近到达事件的登录风险的预测。只有新行根据模型的预测进行处理。

备注

如果您正在使用 Snowflake Feature Store,您还可以使用 Feature Store API 创建此推理动态表作为 特征视图。这允许您在推理时可选地从其他特征视图加载特征。

以下部分演示了如何使用 SQL 和 Snowpark Python 设置此连续推理管道。

SQL 方法

使用 SQL 定义一个动态表,引用模型并将其应用于 LOGINS_RAW 中的新传入行。此示例使用 WITH 子句从注册表中引用模型,并使用 !PREDICT 语法运行推理:

CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
    WAREHOUSE = my_wh
    TARGET_LAG = '20 minutes'
    REFRESH_MODE = INCREMENTAL
    INITIALIZE = on_create
    COMMENT = 'Dynamic table with continuously updated model predictions'
AS
WITH my_model AS MODEL ml.registry.mymodel
SELECT
    l.login_id,
    l.user_id,
    l.location,
    l.event_time,
    my_model!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw l;
Copy

Snowpark Python 方法

Snowpark Python API 允许您以编程方式访问模型注册表,并直接在 DataFrames 上运行推理。此方法可以更加灵活和可维护,尤其在代码驱动的环境中。

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry

# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")

# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")

# Load the source data
df_raw = sp_session.table("LOGINS_RAW")

# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))

# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])

# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
    name="LOGINS_WITH_PREDICTIONS",
    warehouse="MY_WH",
    lag='20 minutes',
    refresh_mode='INCREMENTAL',
    initiliaze="ON_CREATE",
    comment="Dynamic table continuously updated with model predictions"
)
Copy

上述代码示例将每 20 分钟自动使用 MYMODELLOGINS_RAW 中的新数据运行推理。

语言: 中文