Snowflake ML Data Connector

使用 Snowflake ML Data Connector 将数据从 Snowflake 表或暂存区引入容器运行时实例(例如笔记本会话或 ML 作业)。数据连接器使用容器运行时的分布式处理功能来加快数据加载速度,并提高在 Snowflake 笔记本或 ML 作业中运行 ML 管道的效率。您可以使用您加载的数据在 Snowflake 中运行基于 Python 的 ML 工作流程。例如,您可以使用开源包扩展 ML 管道。有关容器运行时的更多信息,请参阅 ML 的容器运行时

您可以使用数据连接器将来自任何 Snowflake 数据源(例如表或暂存区)的数据加载到 Pandas 数据框中。然后,该 Pandas 数据框可以与 Snowflake 中的开源 ML 工作流程一起使用。数据连接器还提供了创建 torch 和 tensorflow 数据集的功能。

除了使用开源工作流程外,您还可以使用 Snowflake 的分布式 APIs 来大规模训练和调整模型。

数据连接器经过优化,可在容器环境中运行。在容器运行时之外,数据连接器使用基于 Apache Arrow 的数据交换格式在 Snowflake 和您的容器之间移动数据。同样的代码在 Snowflake 的内部和外部都有效。

您可以将数据连接器与 Snowpark DataFrame 或 Snowflake 数据集一起使用。使用 Snowpark DataFrames 可以直接访问您 Snowflake 表中的数据。它们最适合在开发期间使用。

Snowflake 数据集是版本化的架构级对象。它们最适合用于生产工作流程。有关数据集的更多信息,请参阅 Snowflake 数据集

您可以使用以下代码将 Snowpark DataFrame 引入容器运行时:

connector = DataConnector.from_dataframe(snowpark_df)
Copy

您可以使用以下代码将 Snowflake 数据集引入容器运行时:

connector = DataConnector.from_dataset(snowflake_dataset)
Copy

数据连接器使用分布式处理功能来加速数据加载到开源数据对象(例如 Pandas 数据框、PyTorch 数据集或 TensorFlow 数据集)的过程。如果 to_pandas 无法足够快地将数据加载到数据框中,则可以使用数据连接器来加快该过程。

将数据连接器直接传递到 Snowflake 分布式工作流程,可以改善在工作流程中使用数据的运行时间。有关更多信息,请参阅 Snowflake ML 容器运行时参考 (Python)

备注

本主题假定已安装 Snowpark ML 模块。如果尚未安装,请参阅 在本地使用 Snowflake ML

连接 Pandas 数据框的数据连接器

您可以使用以下代码,将数据从 Snowflake 表加载到 Pandas 数据框中。

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Specify training table location
table_name = "TRAINING_TABLE"
# Load tabel into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))


import xgboost as xgb

pandas_df = data_connector.to_pandas()
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]

clf = xgb.Classifier()
clf.fit(X, y)
Copy

连接 PyTorch 数据集的数据连接器

以下代码显示了如何使用数据连接器将数据从 Snowflake 表加载到 PyTorch 数据集中。

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']

for batch_idx, batch in enumerate(dataloader):
  y = batch_data.pop(label_col).squeeze()
  X = torch.stack(
      [tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
  )
Copy

连接 TensorFlow 数据集的数据连接器

要与 TensorFlow 配合使用,请使用 to_tf_dataset() 方法获取 Tensorflow Dataset:迭代 Dataset,从而生成批处理的 TensorFlow 张量。数据以流式方式加载,以实现更高的效率。

tf_ds = connector.to_tf_dataset(
    batch_size=4,
    shuffle=True,
    drop_last_batch=True
)

for batch in tf_ds:
    print(batch)
Copy

将数据连接器传递给 Snowflake 的优化分布式训练 APIs

为了获得最佳性能,您可以将数据连接器对象传递给 Snowflake 的优化分布式训练 APIs。

以下代码显示了如何执行以下操作:

  1. 将数据从 Snowflake 表加载到数据连接器对象中。

  2. 创建 Snowflake XGBoost 分类器的实例。

  3. 使用数据连接器对象训练分类器。

from sklearn.datasets import make_regression
from snowflake.ml.data.data_connector import DataConnector


from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
  XGBEstimator,
  XGBScalingConfig,
)

from snowflake.snowpark.context import get_active_session

session = get_active_session()

snowflake_df = session.create_dataframe(session.table(table_name))

# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
data_connector = DataConnector.from_dataframe(snowflake_df)

snowflake_est = XGBEstimator(
  n_estimators=1,
  objective="reg:squarederror",
  scaling_config=XGBScalingConfig(use_gpu=False),
)

# If you use a data connector, input_cols and label_col must also be provided.
fit_booster = snowflake_est.fit(
  data_connector,
  input_cols = NUMERICAL_COLS,
  label_col = LABEL_COL
)
Copy

使用 PyTorch 分布器进行分片

您可以使用数据连接器,将数据分片到容器运行时中的多个节点。您可以使用分片功能,通过 Snowflake PyTorch 分布器来训练模型。有关 PyTorch 分布器的信息,请参阅 PyTorch 分布器

以下代码使用数据连接器在数字数据集上训练 PyTorch 模型。它指定一个节点,在该节点上运行四个进程。该代码定义了一个 PyTorch 模型、一个训练函数和一个 PyTorch 训练器。训练函数跨多个节点运行,每个进程接收一个唯一的数据分片。本示例中未使用任何 GPUs,但您可以将 num_gpus 参数的值设置为您所使用的 GPUs 数量。

from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector

from snowflake.ml.modeling.pytorch import (
  PyTorchTrainer,
  ScalingConfig,
  WorkerResourceConfig,
  getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create the Snowflake data. We'll start with a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)

# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)

# Define the model. This is OSS PyTorch code.
class DigitsModel(nn.Module):
    def __init__(self):
        super(DigitsModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define the training function that will run across multiple nodes or devices.
# Each train_func process will receive a unique data shard.
def train_func():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader
    from torch import nn
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Context provides relevant information to training process, like data shards and model directory.
    context = getContext()
    dataset_map = context.get_dataset_map()
    model_dir = context.get_model_dir()
    training_data = dataset_map["train"].get_shard().to_torch_dataset()
    train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)

    # The rest of this code is OSS pytorch code.
    dist.init_process_group()
    device = "cpu"
    label_col = '"target"'
    batch_size = 64

    model = DDP(DigitsModel())

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # Training loop
    for epoch in range(5):
        for batch, batch_data in enumerate(train_dataloader):
            y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
            X = torch.concat(
                [tensor.to(torch.float32) for tensor in batch_data.values()],
                dim=-1,
          )  .to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")

    # Save the model
    if dist.get_rank() == 0:
        torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))


pytroch_trainer = PyTorchTrainer(
    train_func=train_func,
    scaling_config=ScalingConfig(
        num_nodes=1,
        num_workers_per_node=4,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
    ),
)
response = pytroch_trainer.run(
    dataset_map=dict(
        train=sharded_data_connector,
    )
)
Copy

在前面的代码中使用了多个 APIs。有关更多信息,请参阅 Snowflake ML 容器运行时参考 (Python)

语言: 中文