在 Snowflake 中使用动态表实现连续流模型推理¶
Snowflake 的动态表为流数据提供持续的转换层。通过定义动态表,将机器学习模型的预测应用于传入数据,您可以在数据上维护自动化、持续运行的模型推理管道,而无需手动编排。
例如,考虑到达表的登录事件流 (LOGINS_RAW
),包含列(包括 USER_ID
、LOCATION
)和时间戳。该表更新了模型对最近到达事件的登录风险的预测。只有新行根据模型的预测进行处理。
备注
如果您正在使用 Snowflake Feature Store,您还可以使用 Feature Store API 创建此推理动态表作为 特征视图。这允许您在推理时可选地从其他特征视图加载特征。
以下部分演示了如何使用 SQL 和 Snowpark Python 设置此连续推理管道。
SQL 方法¶
使用 SQL 定义一个动态表,引用模型并将其应用于 LOGINS_RAW
中的新传入行。此示例使用 WITH
子句从注册表中引用模型,并使用 !PREDICT
语法运行推理:
CREATE OR REPLACE DYNAMIC TABLE logins_with_predictions
WAREHOUSE = my_wh
TARGET_LAG = '20 minutes'
REFRESH_MODE = INCREMENTAL
INITIALIZE = on_create
COMMENT = 'Dynamic table with continuously updated model predictions'
AS
WITH my_model AS MODEL ml.registry.mymodel
SELECT
l.login_id,
l.user_id,
l.location,
l.event_time,
my_model!predict(l.user_id, l.location) AS prediction_result
FROM logins_raw l;
Snowpark Python 方法¶
Snowpark Python API 允许您以编程方式访问模型注册表,并直接在 DataFrames 上运行推理。此方法可以更加灵活和可维护,尤其在代码驱动的环境中。
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry
# Initialize the registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")
# Retrieve the default model version from the registry
model = reg.get_model("MYMODEL")
# Load the source data
df_raw = sp_session.table("LOGINS_RAW")
# Run inference on the necessary features
predictions_df = model.run(df_raw.select("USER_ID", "LOCATION"))
# Join predictions back to the source data
joined_df = df_raw.join(predictions_df, on=["USER_ID", "LOCATION"])
# Create or replace a dynamic table from the joined DataFrame
joined_df.create_or_replace_dynamic_table(
name="LOGINS_WITH_PREDICTIONS",
warehouse="MY_WH",
lag='20 minutes',
refresh_mode='INCREMENTAL',
initiliaze="ON_CREATE",
comment="Dynamic table continuously updated with model predictions"
)
上述代码示例将每 20 分钟自动使用 MYMODEL
对 LOGINS_RAW
中的新数据运行推理。