特征工程¶
Snowflake ML 允许您将原始数据转换为特征,从而使机器学习模型能够高效使用这些数据。您可以使用多种方法来转换数据,每种方法适用于不同的数据规模和需求:
开源软件 (OSS) 预处理器 - 适用于小到中等规模的数据集和快速原型开发,使用熟悉的 Python ML 库,这些库可在本地或单节点容器运行时环境中运行。
Snowflake ML 预处理器 - 适用于较大数据集,使用 Snowflake ML 在 Snowflake 平台上原生执行的预处理 APIs。这些 APIs 可以将处理分布到仓库计算资源上。
Ray map_batches - 适用于高度可定制的大规模处理,尤其是非结构化数据,可在单节点或多节点容器运行时环境中进行并行、资源管理的执行。
请选择最适合您数据规模、性能需求和自定义转换逻辑的方法。
下表展示了 Snowflake ML 中三种主要特征工程方法的详细比较:
特征/维度 |
OSS(包括 scikit-learn) |
Snowflake ML 预处理器 |
Ray |
|---|---|---|---|
小数位数 |
小型和中型数据集 |
大型/分布式数据 |
大型/分布式数据 |
执行环境 |
内存中 |
推送到您用于运行 SQL 查询的默认仓库 |
跨计算池中的节点 |
计算资源 |
Snowpark Container Services(计算池) |
仓库 |
Snowpark Container Services(计算池) |
集成 |
标准 Python ML 生态系统 |
与 Snowflake ML 原生集成 |
支持 Python ML 与 Snowflake |
性能 |
本地内存工作负载快速;扩展受限且非分布式 |
设计用于可扩展的分布式特征工程 |
高度并行且资源管理良好,适合大规模/非结构化数据 |
适用场景 |
快速进行原型设计和实验 |
大数据集的生产工作流 |
需要自定义资源控制的大数据工作流 |
以下示例展示了如何使用每种方法实现特征转换:
使用以下代码在您的预处理工作流中实现 scikit-learn:
import pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
# Load your data locally into a Pandas DataFrame
df = pd.DataFrame({
'age': [34, 23, 54, 31],
'city': ['SF', 'NY', 'SF', 'LA'],
'income': [120000, 95000, 135000, 99000]
})
# Define preprocessing steps
numeric_features = ['age', 'income']
numeric_transformer = StandardScaler()
categorical_features = ['city']
categorical_transformer = OneHotEncoder()
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
pipeline = Pipeline(steps=[
('preprocessor', preprocessor)
])
# Preprocess the data
X_processed = pipeline.fit_transform(df)
print(X_processed)
Snowflake ML 预处理器可直接在 Snowflake 内处理分布式转换。这些预处理器可推送以跨仓库扩展。对于大数据集和生产工作负载,请使用 Snowflake ML 预处理器。
备注
Snowflake ML 预处理器是 scikit-learn 中预处理器的一个子集,但覆盖了最常用的场景。有关可用预处理器的信息,请参见 Snowflake ML 模型预处理
以下代码使用了 StandardScaler 和 OneHotEncoder 库。
from snowflake.snowpark import Session
from snowflake.ml.modeling.preprocessing import StandardScaler, OneHotEncoder
from snowflake.ml.modeling.pipeline import Pipeline
# Assume your Snowflake connection details are configured
session = Session.builder.configs(...).create()
# Load your data from a Snowflake table as a DataFrame
df = session.table('CUSTOMER_DATA')
# Define Snowflake ML preprocessors
scaler = StandardScaler(input_cols=['AGE', 'INCOME'], output_cols=['AGE_SCALED', 'INCOME_SCALED'])
encoder = OneHotEncoder(input_cols=['CITY'], output_cols=['CITY_ENCODED'])
pipeline = Pipeline(steps=[
('scaling', scaler),
('encoding', encoder)
])
# Fit and transform data in Snowflake (distributed)
result = pipeline.fit_transform(df)
result.show()
使用 Ray 进行分布式并行处理,并可实现自定义转换。Ray map_batches 使用惰性执行,即在您实际生成数据集之前不会进行处理,这有助于减少内存使用。这种方法非常适合需要自定义逻辑的大规模数据处理。
import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector
# Example for data transform
def preprocess_batch(batch: pd.DataFrame) -> pd.DataFrame:
batch['AGE_SCALED'] = (batch['age'] - batch['age'].mean()) / batch['age'].std()
return batch
# Example of filtering
def filter_by_value(row):
return row['city'] != 'LA'
# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)
# Setup filter operations, not executed yet
filtered_ds = ray_ds.filter(filter_by_value)
transformed_ds = filtered_ds.map_batches(example_transform_batch_function)
# Create DataConnector directly from ray dataset
data_connector = DataConnector.from_ray_dataset(transformed_ds)