Snowflake ML Data Connector¶
使用 Snowflake ML Data Connector 将数据从 Snowflake 表或暂存区引入容器运行时实例(例如笔记本会话或 ML 作业)。数据连接器使用容器运行时的分布式处理功能来加快数据加载速度,并提高在 Snowflake 笔记本或 ML 作业中运行 ML 管道的效率。您可以使用您加载的数据在 Snowflake 中运行基于 Python 的 ML 工作流程。例如,您可以使用开源包扩展 ML 管道。有关容器运行时的更多信息,请参阅 ML 的容器运行时。
您可以使用数据连接器将来自任何 Snowflake 数据源(例如表或暂存区)的数据加载到 Pandas 数据框中。然后,该 Pandas 数据框可以与 Snowflake 中的开源 ML 工作流程一起使用。数据连接器还提供了创建 torch 和 tensorflow 数据集的功能。
除了使用开源工作流程外,您还可以使用 Snowflake 的分布式 APIs 来大规模训练和调整模型。
数据连接器经过优化,可在容器环境中运行。在容器运行时之外,数据连接器使用基于 Apache Arrow 的数据交换格式在 Snowflake 和您的容器之间移动数据。同样的代码在 Snowflake 的内部和外部都有效。
您可以将数据连接器与 Snowpark DataFrame 或 Snowflake 数据集一起使用。使用 Snowpark DataFrames 可以直接访问您 Snowflake 表中的数据。它们最适合在开发期间使用。
Snowflake 数据集是版本化的架构级对象。它们最适合用于生产工作流程。有关数据集的更多信息,请参阅 Snowflake 数据集。
您可以使用以下代码将 Snowpark DataFrame 引入容器运行时:
connector = DataConnector.from_dataframe(snowpark_df)
您可以使用以下代码将 Snowflake 数据集引入容器运行时:
connector = DataConnector.from_dataset(snowflake_dataset)
数据连接器使用分布式处理功能来加速数据加载到开源数据对象(例如 Pandas 数据框、PyTorch 数据集或 TensorFlow 数据集)的过程。如果 to_pandas
无法足够快地将数据加载到数据框中,则可以使用数据连接器来加快该过程。
将数据连接器直接传递到 Snowflake 分布式工作流程,可以改善在工作流程中使用数据的运行时间。有关更多信息,请参阅 Snowflake ML 容器运行时参考 (Python)。
备注
本主题假定已安装 Snowpark ML 模块。如果尚未安装,请参阅 在本地使用 Snowflake ML。
连接 Pandas 数据框的数据连接器¶
您可以使用以下代码,将数据从 Snowflake 表加载到 Pandas 数据框中。
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Specify training table location
table_name = "TRAINING_TABLE"
# Load tabel into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))
import xgboost as xgb
pandas_df = data_connector.to_pandas()
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]
clf = xgb.Classifier()
clf.fit(X, y)
连接 PyTorch 数据集的数据连接器¶
以下代码显示了如何使用数据连接器将数据从 Snowflake 表加载到 PyTorch 数据集中。
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']
for batch_idx, batch in enumerate(dataloader):
y = batch_data.pop(label_col).squeeze()
X = torch.stack(
[tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
)
连接 TensorFlow 数据集的数据连接器¶
要与 TensorFlow 配合使用,请使用 to_tf_dataset()
方法获取 Tensorflow Dataset:迭代 Dataset,从而生成批处理的 TensorFlow 张量。数据以流式方式加载,以实现更高的效率。
tf_ds = connector.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True
)
for batch in tf_ds:
print(batch)
将数据连接器传递给 Snowflake 的优化分布式训练 APIs¶
为了获得最佳性能,您可以将数据连接器对象传递给 Snowflake 的优化分布式训练 APIs。
以下代码显示了如何执行以下操作:
将数据从 Snowflake 表加载到数据连接器对象中。
创建 Snowflake XGBoost 分类器的实例。
使用数据连接器对象训练分类器。
from sklearn.datasets import make_regression
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
XGBEstimator,
XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session
session = get_active_session()
snowflake_df = session.create_dataframe(session.table(table_name))
# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
data_connector = DataConnector.from_dataframe(snowflake_df)
snowflake_est = XGBEstimator(
n_estimators=1,
objective="reg:squarederror",
scaling_config=XGBScalingConfig(use_gpu=False),
)
# If you use a data connector, input_cols and label_col must also be provided.
fit_booster = snowflake_est.fit(
data_connector,
input_cols = NUMERICAL_COLS,
label_col = LABEL_COL
)
使用 PyTorch 分布器进行分片¶
您可以使用数据连接器,将数据分片到容器运行时中的多个节点。您可以使用分片功能,通过 Snowflake PyTorch 分布器来训练模型。有关 PyTorch 分布器的信息,请参阅 PyTorch 分布器。
以下代码使用数据连接器在数字数据集上训练 PyTorch 模型。它指定一个节点,在该节点上运行四个进程。该代码定义了一个 PyTorch 模型、一个训练函数和一个 PyTorch 训练器。训练函数跨多个节点运行,每个进程接收一个唯一的数据分片。本示例中未使用任何 GPUs,但您可以将 num_gpus
参数的值设置为您所使用的 GPUs 数量。
from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
PyTorchTrainer,
ScalingConfig,
WorkerResourceConfig,
getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create the Snowflake data. We'll start with a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)
# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)
# Define the model. This is OSS PyTorch code.
class DigitsModel(nn.Module):
def __init__(self):
super(DigitsModel, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(8 * 8, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
# Define the training function that will run across multiple nodes or devices.
# Each train_func process will receive a unique data shard.
def train_func():
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch import nn
from torch.nn.parallel import DistributedDataParallel as DDP
# Context provides relevant information to training process, like data shards and model directory.
context = getContext()
dataset_map = context.get_dataset_map()
model_dir = context.get_model_dir()
training_data = dataset_map["train"].get_shard().to_torch_dataset()
train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)
# The rest of this code is OSS pytorch code.
dist.init_process_group()
device = "cpu"
label_col = '"target"'
batch_size = 64
model = DDP(DigitsModel())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# Training loop
for epoch in range(5):
for batch, batch_data in enumerate(train_dataloader):
y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
X = torch.concat(
[tensor.to(torch.float32) for tensor in batch_data.values()],
dim=-1,
) .to(device)
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")
# Save the model
if dist.get_rank() == 0:
torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))
pytroch_trainer = PyTorchTrainer(
train_func=train_func,
scaling_config=ScalingConfig(
num_nodes=1,
num_workers_per_node=4,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
),
)
response = pytroch_trainer.run(
dataset_map=dict(
train=sharded_data_connector,
)
)
在前面的代码中使用了多个 APIs。有关更多信息,请参阅 Snowflake ML 容器运行时参考 (Python)。