使用 Snowpark Python 训练机器学习模型

本主题介绍了如何使用 Snowpark 训练机器学习 (ML) 模型。

备注

Snowpark ML 是 Snowpark Python 的配套产品,专为 Snowflake 中的机器学习而构建。本主题仍然包含有关使用 Snowpark Python 进行机器学习的有用的一般信息,特别是如果您喜欢为机器学习编写自己的存储过程。

本主题内容:

Snowpark-Optimized Warehouses

训练机器学习 (ML) 模型有时会非常耗费资源。Snowpark-Optimized Warehouses 是一种 Snowflake 虚拟仓库,可用于需要大量内存和计算资源的工作负载。例如,您可以使用该仓库在单个节点上使用自定义代码来训练 ML 模型。

这些优化的仓库也能使某些 UDF 和 UDTF 场景受益。

有关如何创建 Snowpark 优化仓库的更多信息,请参阅 Snowpark-Optimized Warehouses

使用 Snowpark Python 存储过程进行 ML 训练

Snowpark Python 存储过程 可用于使用 Snowflake 仓库运行自定义代码。Snowpark-Optimized Warehouses 可以使用 Snowpark Stored Procedures 直接在 Snowflake 中运行单节点 ML 训练工作负载。

Python 存储过程可以使用 :doc:` Snowpark API for Python </developer-guide/snowpark/python/index>` 运行嵌套查询来加载和转换数据集,然后将数据集加载到存储过程内存中,以执行预处理和 ML 训练。训练后的模型可以上传到 Snowflake 暂存区,并可用于创建 UDFs 来执行推理。

虽然 Snowpark-Optimized Warehouses 可用于执行预处理和训练逻辑,但可能需要在单独的仓库中执行嵌套查询,以实现更好的性能和资源利用。可以根据数据集大小独立地调整和扩展单独的查询仓库。

准则

按照以下准则执行单节点 ML 训练工作负载:

  • 设置 WAREHOUSE_SIZE = MEDIUM 以确保 Snowpark 优化仓库包含 1 个 Snowpark 优化节点。

  • 将仓库 MAX_CONCURRENCY_LEVEL 参数设置为 1,以充分利用给定 Snowpark 存储过程的内存和计算资源。例如:

    alter warehouse snowpark_opt_wh set max_concurrency_level = 1;
    
    Copy
  • 如果需要,请考虑将仓库设置为 多集群仓库 以支持所需的并发性。

  • 考虑使用单独仓库在存储过程中执行嵌套查询:

  • 不要在用于运行 ML 训练存储过程的 Snowpark 优化仓库上混合其他工作负载。

示例

以下示例创建并使用 Snowpark 优化仓库。然后,该示例创建一个训练线性回归模型的存储过程。该存储过程使用名为 MARKETING_BUDGETS_FEATURES 的表(此处未显示)中的数据。

CREATE OR REPLACE WAREHOUSE snowpark_opt_wh WITH
  WAREHOUSE_SIZE = 'MEDIUM'
  WAREHOUSE_TYPE = 'SNOWPARK-OPTIMIZED'
  MAX_CONCURRENCY_LEVEL = 1;

CREATE OR REPLACE PROCEDURE train()
  RETURNS VARIANT
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.8
  PACKAGES = ('snowflake-snowpark-python', 'scikit-learn', 'joblib')
  HANDLER = 'main'
AS $$
import os
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split, GridSearchCV
from joblib import dump

def main(session):
  # Load features
  df = session.table('MARKETING_BUDGETS_FEATURES').to_pandas()
  X = df.drop('REVENUE', axis = 1)
  y = df['REVENUE']

  # Split dataset into training and test
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state = 42)

  # Preprocess numeric columns
  numeric_features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
  numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = 2)),('scaler', StandardScaler())])
  preprocessor = ColumnTransformer(transformers=[('num', numeric_transformer, numeric_features)])

  # Create pipeline and train
  pipeline = Pipeline(steps=[('preprocessor', preprocessor),('classifier', LinearRegression(n_jobs=-1))])
  model = GridSearchCV(pipeline, param_grid={}, n_jobs=-1, cv=10)
  model.fit(X_train, y_train)

  # Upload trained model to a stage
  model_file = os.path.join('/tmp', 'model.joblib')
  dump(model, model_file)
  session.file.put(model_file, "@ml_models",overwrite=True)

  # Return model R2 score on train and test data
  return {"R2 score on Train": model.score(X_train, y_train),"R2 score on Test": model.score(X_test, y_test)}
$$;
Copy

要调用存储过程,请执行以下命令:

CALL train();
Copy

备注

` Snowflake-Labs GitHub 存储库 <https://github.com/Snowflake-Labs/snowpark-python-demos (https://github.com/Snowflake-Labs/snowpark-python-demos)>`_ 中提供了各种其他 Snowpark Python 演示。` 广告支出和 ROI 预测 <https://github.com/Snowflake-Labs/snowpark-python-demos/blob/main/Advertising-Spend-ROI-Prediction/Snowpark_For_Python.ipynb (https://github.com/Snowflake-Labs/snowpark-python-demos/blob/main/Advertising-Spend-ROI-Prediction/Snowpark_For_Python.ipynb)>`_ 示例演示了如何创建训练线性回归模型的存储过程。

语言: 中文