使用 Snowpark Python 训练机器学习模型¶
本主题介绍了如何使用 Snowpark 训练机器学习 (ML) 模型。
备注
Snowpark ML 是 Snowpark Python 的配套产品,专为 Snowflake 中的机器学习而构建。本主题仍然包含有关使用 Snowpark Python 进行机器学习的有用的一般信息,特别是如果您喜欢为机器学习编写自己的存储过程。
本主题内容:
Snowpark-Optimized Warehouses¶
训练机器学习 (ML) 模型有时会非常耗费资源。Snowpark-Optimized Warehouses 是一种 Snowflake 虚拟仓库,可用于需要大量内存和计算资源的工作负载。例如,您可以使用该仓库在单个节点上使用自定义代码来训练 ML 模型。
这些优化的仓库也能使某些 UDF 和 UDTF 场景受益。
有关如何创建 Snowpark 优化仓库的更多信息,请参阅 Snowpark-Optimized Warehouses。
使用 Snowpark Python 存储过程进行 ML 训练¶
Snowpark Python 存储过程 可用于使用 Snowflake 仓库运行自定义代码。Snowpark-Optimized Warehouses 可以使用 Snowpark Stored Procedures 直接在 Snowflake 中运行单节点 ML 训练工作负载。
Python 存储过程可以使用 :doc:` Snowpark API for Python </developer-guide/snowpark/python/index>` 运行嵌套查询来加载和转换数据集,然后将数据集加载到存储过程内存中,以执行预处理和 ML 训练。训练后的模型可以上传到 Snowflake 暂存区,并可用于创建 UDFs 来执行推理。
虽然 Snowpark-Optimized Warehouses 可用于执行预处理和训练逻辑,但可能需要在单独的仓库中执行嵌套查询,以实现更好的性能和资源利用。可以根据数据集大小独立地调整和扩展单独的查询仓库。
准则¶
按照以下准则执行单节点 ML 训练工作负载:
设置 WAREHOUSE_SIZE = MEDIUM 以确保 Snowpark 优化仓库包含 1 个 Snowpark 优化节点。
将仓库 MAX_CONCURRENCY_LEVEL 参数设置为 1,以充分利用给定 Snowpark 存储过程的内存和计算资源。例如:
alter warehouse snowpark_opt_wh set max_concurrency_level = 1;
如果需要,请考虑将仓库设置为 多集群仓库 以支持所需的并发性。
考虑使用单独仓库在存储过程中执行嵌套查询:
使用 session.use_warehouse() API 在存储过程中选择查询的仓库。
不要在用于运行 ML 训练存储过程的 Snowpark 优化仓库上混合其他工作负载。
示例¶
以下示例创建并使用 Snowpark 优化仓库。然后,该示例创建一个训练线性回归模型的存储过程。该存储过程使用名为 MARKETING_BUDGETS_FEATURES
的表(此处未显示)中的数据。
CREATE OR REPLACE WAREHOUSE snowpark_opt_wh WITH
WAREHOUSE_SIZE = 'MEDIUM'
WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED'
MAX_CONCURRENCY_LEVEL = 1;
CREATE OR REPLACE PROCEDURE train()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
PACKAGES = ('snowflake-snowpark-python', 'scikit-learn', 'joblib')
HANDLER = 'main'
AS $$
import os
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split, GridSearchCV
from joblib import dump
def main(session):
# Load features
df = session.table('MARKETING_BUDGETS_FEATURES').to_pandas()
X = df.drop('REVENUE', axis = 1)
y = df['REVENUE']
# Split dataset into training and test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state = 42)
# Preprocess numeric columns
numeric_features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = 2)),('scaler', StandardScaler())])
preprocessor = ColumnTransformer(transformers=[('num', numeric_transformer, numeric_features)])
# Create pipeline and train
pipeline = Pipeline(steps=[('preprocessor', preprocessor),('classifier', LinearRegression(n_jobs=-1))])
model = GridSearchCV(pipeline, param_grid={}, n_jobs=-1, cv=10)
model.fit(X_train, y_train)
# Upload trained model to a stage
model_file = os.path.join('/tmp', 'model.joblib')
dump(model, model_file)
session.file.put(model_file, "@ml_models",overwrite=True)
# Return model R2 score on train and test data
return {"R2 score on Train": model.score(X_train, y_train),"R2 score on Test": model.score(X_test, y_test)}
$$;
要调用存储过程,请执行以下命令:
CALL train();
备注
` Snowflake-Labs GitHub 存储库 <https://github.com/Snowflake-Labs/snowpark-python-demos (https://github.com/Snowflake-Labs/snowpark-python-demos)>`_ 中提供了各种其他 Snowpark Python 演示。` 广告支出和 ROI 预测 <https://github.com/Snowflake-Labs/snowpark-python-demos/blob/main/Advertising-Spend-ROI-Prediction/Snowpark_For_Python.ipynb (https://github.com/Snowflake-Labs/snowpark-python-demos/blob/main/Advertising-Spend-ROI-Prediction/Snowpark_For_Python.ipynb)>`_ 示例演示了如何创建训练线性回归模型的存储过程。