使用分区模型¶
许多数据集都可以划分为多个独立的子集。例如,包含连锁店销售数据的数据集可按店铺编号进行划分。然后,就可以为每个分区训练一个单独的模型。对分区的训练和推理操作可以并行开展,从而减少这些操作的挂钟时间。此外,由于各个店铺的特征对其销售的影响可能存在显著差异,该方法可以在店铺层面实现更准确的推理。
当满足以下条件时,Snowflake Model Registry 支持对分区数据的训练和推理进行分布式处理:
数据集中包含一列,该列可以可靠地识别数据中的分区。
每个分区中的数据与其他分区中的数据不相关,并且包含足够多的行来训练模型。
模型可以是无状态的(每次调用推理时都要进行训练),也可以是有状态的(在推理前进行一次训练,并保留供多次推理操作使用)。
借助 Snowflake Model Registry,使用 自定义模型 实施分区训练和推理。在推理过程中,模型推理方法会对数据集进行分区,使用仓库中的所有节点和核心并行生成每个分区的预测结果,然后将结果合并为一个数据集。
备注
对于分区模型,重要的是要将注册模型与由注册模型创建或组成注册模型的单个模型进行区分。在可能的情况下,我们将各个基础模型称为子模型。
定义和记录模型¶
分区模型类继承自 snowflake.ml.model.custom_model.CustomModel,并且推理方法通过 @custom_model.partitioned_api 装饰器进行声明。有关定义标准自定义模型的信息,请参阅 通过序列化文件引入您自己的模型类型。
import pandas as pd
from snowflake.ml.model import custom_model
class ExamplePartitionedModel(custom_model.CustomModel):
@custom_model.partitioned_api
def predict(self, input: pd.DataFrame) -> pd.DataFrame:
# All data in the partition will be loaded in the input dataframe.
#… implement model logic here …
return output_df
my_model = ExamplePartitionedModel()
记录模型时,在 options 字典中提供 function_type 的 TABLE_FUNCTION 以及模型所需的任何其他 选项。
from snowflake.ml.registry import Registry
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")
model_version = reg.log_model(my_model,
model_name="my_model",
version_name="v1",
options={"function_type": "TABLE_FUNCTION"}, ###
conda_dependencies=["scikit-learn"],
sample_input_data=train_features
)
如果分区模型也有作为方法的常规(非表)函数,您可以使用 method_options 字典来指定每种方法的类型。
model_version = reg.log_model(my_model,
model_name="my_model",
version_name="v1",
options={
"method_options": { ###
"METHOD1": {"function_type": "TABLE_FUNCTION"}, ###
"METHOD2": {"function_type": "FUNCTION"} ###
}
},
conda_dependencies=["scikit-learn"],
sample_input_data=train_features,
)
分区模型推理¶
使用 Python run 对象的 ModelVersion 方法,以分区的方式调用表函数方法,传递 partition_column 来指定包含数字或字符串值的列名,该值可标识每条记录的分区。像往常一样,您可以传递 Snowpark 或 Pandas DataFrame(后者对本地测试很有用)。您会收到与 DataFrame 类型相同的结果。在这些示例中,推理是根据店铺编号进行分区的。
model_version.run(
input_df,
function_name="PREDICT",
partition_column="STORE_NUMBER"
)
您还可以直接使用 SQL 调用模型表函数,如下所示。
SELECT output1, output2, partition_column
FROM input_table,
TABLE(
my_model!predict(input_table.input1, input_table.input2)
OVER (PARTITION BY input_table.store_number)
)
ORDER BY input_table.store_number;
输入数据会自动在仓库中的节点和核心之间进行分割,分割后的数据会并行处理。
有关表函数语法的更多信息,请参阅 调用带有 SQL 的 UDF。
在分区模型中使用参数¶
使用 @partitioned_api 装饰的分区模型方法可以接受可选的推理参数,这与 @inference_api 方法相同。将参数定义为仅关键字参数(在 * 之后),并附上类型注解和默认值:
class PartitionedModelWithParams(custom_model.CustomModel):
@custom_model.partitioned_api
def predict(
self,
input_df: pd.DataFrame,
*,
n_estimators: int = 100,
learning_rate: float = 0.1,
) -> pd.DataFrame:
import xgboost
training_data = ...
my_model = xgboost.XGBRegressor(
n_estimators=n_estimators,
learning_rate=learning_rate,
)
my_model.fit(training_data)
output_df = my_model.predict(...)
return output_df
在推理时通过 mv.run 传递参数:
model_version.run(
input_df,
function_name="PREDICT",
partition_column="STORE_NUMBER",
params={"n_estimators": 200, "learning_rate": 0.05}
)
或者在 SQL 中,使用位置实参或命名实参:
-- Positional: input columns, then parameters
SELECT output1, output2, partition_column
FROM input_table,
TABLE(
my_model!predict(input_table.input1, input_table.input2, 200, 0.05)
OVER (PARTITION BY input_table.store_number)
)
ORDER BY input_table.store_number;
-- Named arguments (all arguments must be named)
SELECT output1, output2, partition_column
FROM input_table,
TABLE(
my_model!predict(
input1 => input_table.input1,
input2 => input_table.input2,
n_estimators => 200
)
OVER (PARTITION BY input_table.store_number)
)
ORDER BY input_table.store_number;
无状态分区模型¶
在分区模型的最简单应用中,当调用 predict 时,训练和推理均已完成。模型进行拟合,然后运行推理,之后立即丢弃拟合的模型。这种模型被称为“无状态”模型,因为它不存储拟合状态。在以下示例中,每个分区训练一个 XGBoost 模型:
class ExampleStatelessPartitionedModel(custom_model.CustomModel):
@custom_model.partitioned_api
def predict(self, input_df: pd.DataFrame) -> pd.DataFrame:
import xgboost
# All data in the partition will be loaded in the input dataframe.
# Construct training data by transforming input_df.
training_data = ...
# Train the model.
my_model = xgboost.XGBRegressor()
my_model.fit(training_data)
# Generate predictions.
output_df = my_model.predict(...)
return output_df
my_model = ExampleStatelessPartitionedModel()
请参阅 分区模型快速入门指南 (https://quickstarts.snowflake.com/guide/partitioned-ml-model/),了解无状态分区模型的示例,包括示例数据。
有状态分区模型¶
同样,也可以实施加载存储子模型拟合状态的有状态分区模型。为此,您可以通过 snowflake.ml.model.custom_model.ModelContext 提供内存中的模型,或者提供指向拟合模型工件的文件路径,并在推理期间加载它们。
以下示例展示了如何向模型上下文提供内存中的模型。
from snowflake.ml.model import custom_model
# `models` is a dict with model ids as keys, and fitted xgboost models as values.
models = {
"model1": models[0],
"model2": models[1],
...
}
model_context = custom_model.ModelContext(
models=models
)
my_stateful_model = MyStatefulCustomModel(model_context=model_context)
记录 my_stateful_model 时,上下文中提供的子模型将与所有模型文件一起存储。然后就可以在推理方法逻辑中通过从上下文中检索来访问它们,如下所示:
class ExampleStatefulModel(custom_model.CustomModel):
@custom_model.inference_api
def predict(self, input: pd.DataFrame) -> pd.DataFrame:
model1 = self.context.model_ref("model1")
# ... use model1 for inference
也可以通过 predict 方法中的分区 ID 以编程方式访问模型。如果提供了分区列作为输入特征,则可以用它来访问针对分区拟合的模型。例如,如果分区列是 MY_PARTITION_COLUMN,可以定义以下模型类:
class ExampleStatefulModel(custom_model.CustomModel):
@custom_model.inference_api
def predict(self, input: pd.DataFrame) -> pd.DataFrame:
model_id = input["MY_PARTITION_COLUMN"][0]
model = self.context.model_ref(model_id)
# ... use model for inference
同样,子模型可以作为工件存储,并在运行时加载。当模型过大而无法全部加载到内存中时,这种方法非常有用。提供指向模型上下文的字符串文件路径。文件路径可以在推理期间使用 self.context.path(artifact_id) 访问。有关更多信息,请参阅 通过关键字实参定义模型上下文。
示例¶
有关示例,包括示例数据,请参阅 分区模型快速入门指南 (https://quickstarts.snowflake.com/guide/partitioned-ml-model/)。
请参阅 Snowflake 快速入门指南中的多模型推理 (https://quickstarts.snowflake.com/guide/many-model-inference-in-snowflake/),了解有状态分区自定义模型的示例。