加载和写入数据¶
使用 Snowflake ML 高效地将 Snowflake 表和暂存区中的数据加载到您的机器学习工作流中。Snowflake ML 提供了优化的数据加载能力,利用 Snowflake 的分布式处理加速训练和推理工作流的数据摄入。
您可以使用以下方式加载和处理数据:
Snowflake 笔记本:用于探索数据和构建 ML 模型的交互式开发环境。有关更多信息,请参阅 ML 的容器运行时的笔记本。
Snowflake ML 作业:可在任何开发环境中异步运行 ML 工作负载。有关更多信息,请参阅 Snowflake ML 作业。
笔记本和 ML 作业都在 ML 容器运行时上运行,该运行时提供针对分布式机器学习工作负载优化的预配置环境。容器运行时使用开源分布式计算框架 Ray,在多个计算节点之间高效处理数据。有关 ML 容器运行时的更多信息,请参阅 ML 的容器运行时。
Snowflake ML 提供不同的 APIs 来加载结构化和非结构化数据:
结构化数据(表格和数据集)
DataConnector:从 Snowflake 表和 Snowflake 数据集加载数据。有关更多信息,请参阅 从 Snowflake 表加载结构化数据。
DataSink:将数据写回 Snowflake 表。有关更多信息,请参阅 将结构化数据写回 Snowflake 表。
非结构化数据(暂存区中的文件)
DataSource APIs:从 Snowflake 暂存区加载各种文件格式的数据(CSV、Parquet、图片等)。有关更多信息,请参阅 从 Snowflake 暂存区加载非结构化数据。
下表可帮助您为具体用例选择合适的 API:
数据类型 |
数据源 |
加载用 API |
写入用 API |
|---|---|---|---|
结构化 |
Snowflake 表 |
DataConnector |
DataSink |
结构化 |
Snowflake 数据集 |
DataConnector |
DataSink |
非结构化 |
CSV 文件(暂存区) |
DataSource API |
不适用 |
非结构化 |
Parquet 文件(暂存区) |
DataSource API |
不适用 |
非结构化 |
其他暂存文件 |
DataSource API |
不适用 |
从 Snowflake 表加载结构化数据¶
使用 Snowflake DataConnector 将 Snowflake 表和 Snowflake 数据集中的结构化数据加载到 Snowflake 笔记本或 Snowflake ML 作业中。DataConnector 通过在多个计算节点上并行读取来加速数据加载。
DataConnector 可以与 Snowpark DataFrames 或 Snowflake 数据集一起使用:
Snowpark DataFrames:提供对 Snowflake 表中数据的直接访问。开发阶段使用最佳。
Snowflake 数据集:版本化的架构级对象。最适合用于生产工作流程。有关更多信息,请参阅 Snowflake 数据集。
并行读取后,DataConnector 可以将数据转换为以下数据结构之一:
pandas dataframe
PyTorch dataset
TensorFlow dataset
创建 DataConnector¶
您可以从 Snowpark DataFrame 或 Snowflake 数据集创建 DataConnector。
使用以下代码从 Snowpark DataFrame 创建 DataConnector:
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
使用以下代码从 Snowflake 数据集创建 DataConnector:
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
将 DataConnector 转换为其他格式¶
创建 DataConnector 后,可以将其转换为不同数据结构,以供不同 ML 框架使用。
您可以将 DataConnector 转换为 pandas dataframe,用于 scikit-learn 和其他兼容 pandas 的库。
以下将 Snowflake 表中的数据加载到 pandas dataframe 并训练 XGBoost 分类器:
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb
session = get_active_session()
# Specify training table location
table_name = "TRAINING_TABLE"
# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))
# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()
# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]
# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
您可以将 DataConnector 转换为 PyTorch 数据集,用于 PyTorch 模型和数据加载器。
以下示例将 Snowflake 表的数据加载到 PyTorch 数据集中:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector (see previous examples)
# data_connector = DataConnector.from_dataframe(...)
# Convert to PyTorch dataset
torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']
for batch_idx, batch in enumerate(dataloader):
y = batch_data.pop(label_col).squeeze()
X = torch.stack(
[tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
)
您可以将 DataConnector 转换为 TensorFlow 数据集,用于 TensorFlow 模型。数据以流式方式加载,以实现更高的效率。
以下示例将 DataConnector 转换为 TensorFlow 数据集:
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector (see previous examples)
# data_connector = DataConnector.from_dataframe(...)
# Convert to TensorFlow dataset
tf_ds = data_connector.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True
)
for batch in tf_ds:
print(batch)
与 Snowflake 的分布式训练 APIs 搭配使用¶
为获得最佳性能,可以将 DataConnector 直接传递给 Snowflake 优化的分布式训练 APIs,而无需先转换为 pandas、PyTorch 或 TensorFlow 数据集。
以下示例使用 Snowflake 分布式 XGBoost 估算器训练 XGBoost 模型:
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
XGBEstimator,
XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)
# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
n_estimators=1,
objective="reg:squarederror",
scaling_config=XGBScalingConfig(use_gpu=False),
)
# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
data_connector,
input_cols=NUMERICAL_COLS,
label_col=LABEL_COL
)
使用 PyTorch 分布器进行分片¶
您可以使用 ShardedDataConnector 将数据分片到多个节点,以便在 Snowflake PyTorch 分布器上进行分布式训练。
以下示例展示了如何在 digits 数据集上训练 PyTorch 模型,并将数据分片分配到多个进程中进行训练。
from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
PyTorchTrainer,
ScalingConfig,
WorkerResourceConfig,
getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)
# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)
# Define the PyTorch model
class DigitsModel(nn.Module):
def __init__(self):
super(DigitsModel, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(8 * 8, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch import nn
from torch.nn.parallel import DistributedDataParallel as DDP
# Get context with data shards and model directory
context = getContext()
dataset_map = context.get_dataset_map()
model_dir = context.get_model_dir()
training_data = dataset_map["train"].get_shard().to_torch_dataset()
train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)
dist.init_process_group()
device = "cpu"
label_col = '"target"'
batch_size = 64
model = DDP(DigitsModel())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# Training loop
for epoch in range(5):
for batch, batch_data in enumerate(train_dataloader):
y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
X = torch.concat(
[tensor.to(torch.float32) for tensor in batch_data.values()],
dim=-1,
).to(device)
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")
# Save the model
if dist.get_rank() == 0:
torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))
# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
train_func=train_func,
scaling_config=ScalingConfig(
num_nodes=1,
num_workers_per_node=4,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
),
)
# Run distributed training
response = pytorch_trainer.run(
dataset_map=dict(
train=sharded_data_connector,
)
)
从 Snowflake 暂存区加载非结构化数据¶
使用 Snowflake DataSource APIs 从 Snowflake 暂存区读取非结构化数据。每种文件格式都有对应的数据源类,定义了如何读取数据。
以下显示了加载数据所用的文件格式及相应 APIs:
二进制文件:
SFStageBinaryFileDataSource文本文件:
SFStageTextDataSourceCSV 文件:
SFStageCSVDataSourceParquet 文件:
SFStageParquetDataSource图片文件:
SFStageImageDataSource
加载和处理数据¶
创建 Snowflake 数据源时,必须提供以下内容:
要从中读取数据的暂存区的名称
具有该暂存区的数据库(默认为当前会话)
具有该暂存区的架构(默认为当前会话)
用于过滤从数据源读取的文件的模式(可选)
Data API 或 Data Connector 会检索提供路径中匹配文件模式的所有文件。
定义 Snowflake 数据源后,可以将数据加载到 Ray 数据集。使用 Ray 数据集,您可以执行以下操作:
将数据集与 Ray APIs 一起使用
将数据集传递给 DataConnector
如有需要,将其转换为 pandas 或 PyTorch 数据集。
以下示例执行以下操作:
从 Snowflake 暂存区将 Parquet 文件读取到 Ray 数据集
将数据集转换为 DataConnector
import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector
data_source = SFStageParquetDataSource(
stage_location="@stage/path/",
database="DB_NAME", # optional
schema="SCHEMA_NAME", # optional
file_pattern='*.parquet', # optional
)
# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)
dc = DataConnector.from_ray_dataset(ray_ds)
将结构化数据写回 Snowflake 表¶
使用 Snowflake DataSink API 将笔记本或 ML 作业中的结构化数据写回 Snowflake 表。您可以将转换或预测后的数据集写入 Snowflake 进行进一步分析或存储。
定义数据接收端时需提供以下内容:
暂存区名称
数据库名称(默认为当前会话)
架构名称(默认为当前会话)
匹配特定文件的文件模式(可选)
以下示例定义了数据接收端:
from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
table_name="table_name",
database="db_name",
schema="schema_name",
auto_create_table=True, # create table if not exists
override=True # replace vs insert to table
)
定义数据接收端后,可以使用以下代码将 Ray 数据集写入 Snowflake 表。
import ray
# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)
# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)
# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
最佳实践和注意事项¶
为优化性能和提高资源利用率,请考虑以下最佳实践:
并行:设计数据源实现以利用 Ray 的分布式特性。根据用例自定义并行性和并发参数。您可以手动定义在每个步骤中为每个任务分配多少资源。
分区:默认情况下,Ray 内部逻辑会根据资源和数据大小对数据集进行分区。您可以根据用例使用 ray_ds.repartition(X) 自定义分区数量,以选择大量小任务或少量大任务。
最佳实践:请参阅 Ray Data 用户指南 (https://docs.ray.io/en/latest/data/user-guide.html) 获取更多指导。
Ray API 详情:
Ray 数据源 (https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html)
Ray 批量映射(批量转换) (https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html)
后续步骤¶
加载数据后,您可以: