加载和写入数据

使用 Snowflake ML 高效地将 Snowflake 表和暂存区中的数据加载到您的机器学习工作流中。Snowflake ML 提供了优化的数据加载能力,利用 Snowflake 的分布式处理加速训练和推理工作流的数据摄入。

您可以使用以下方式加载和处理数据:

  • Snowflake 笔记本:用于探索数据和构建 ML 模型的交互式开发环境。有关更多信息,请参阅 ML 的容器运行时的笔记本

  • Snowflake ML 作业:可在任何开发环境中异步运行 ML 工作负载。有关更多信息,请参阅 Snowflake ML 作业

笔记本和 ML 作业都在 ML 容器运行时上运行,该运行时提供针对分布式机器学习工作负载优化的预配置环境。容器运行时使用开源分布式计算框架 Ray,在多个计算节点之间高效处理数据。有关 ML 容器运行时的更多信息,请参阅 ML 的容器运行时

Snowflake ML 提供不同的 APIs 来加载结构化和非结构化数据:

结构化数据(表格和数据集)

非结构化数据(暂存区中的文件)

下表可帮助您为具体用例选择合适的 API:

数据源和 APIs

数据类型

数据源

加载用 API

写入用 API

结构化

Snowflake 表

DataConnector

DataSink

结构化

Snowflake 数据集

DataConnector

DataSink

非结构化

CSV 文件(暂存区)

DataSource API

不适用

非结构化

Parquet 文件(暂存区)

DataSource API

不适用

非结构化

其他暂存文件

DataSource API

不适用

从 Snowflake 表加载结构化数据

使用 Snowflake DataConnector 将 Snowflake 表和 Snowflake 数据集中的结构化数据加载到 Snowflake 笔记本或 Snowflake ML 作业中。DataConnector 通过在多个计算节点上并行读取来加速数据加载。

DataConnector 可以与 Snowpark DataFrames 或 Snowflake 数据集一起使用:

  • Snowpark DataFrames:提供对 Snowflake 表中数据的直接访问。开发阶段使用最佳。

  • Snowflake 数据集:版本化的架构级对象。最适合用于生产工作流程。有关更多信息,请参阅 Snowflake 数据集

并行读取后,DataConnector 可以将数据转换为以下数据结构之一:

  • pandas dataframe

  • PyTorch dataset

  • TensorFlow dataset

创建 DataConnector

您可以从 Snowpark DataFrame 或 Snowflake 数据集创建 DataConnector。

使用以下代码从 Snowpark DataFrame 创建 DataConnector:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
Copy

使用以下代码从 Snowflake 数据集创建 DataConnector:

from snowflake.ml.data.data_connector import DataConnector

# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
Copy

将 DataConnector 转换为其他格式

创建 DataConnector 后,可以将其转换为不同数据结构,以供不同 ML 框架使用。

您可以将 DataConnector 转换为 pandas dataframe,用于 scikit-learn 和其他兼容 pandas 的库。

以下将 Snowflake 表中的数据加载到 pandas dataframe 并训练 XGBoost 分类器:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb

session = get_active_session()

# Specify training table location
table_name = "TRAINING_TABLE"

# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))

# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()

# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]

# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
Copy

与 Snowflake 的分布式训练 APIs 搭配使用

为获得最佳性能,可以将 DataConnector 直接传递给 Snowflake 优化的分布式训练 APIs,而无需先转换为 pandas、PyTorch 或 TensorFlow 数据集。

以下示例使用 Snowflake 分布式 XGBoost 估算器训练 XGBoost 模型:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
    XGBEstimator,
    XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)

# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
    n_estimators=1,
    objective="reg:squarederror",
    scaling_config=XGBScalingConfig(use_gpu=False),
)

# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
    data_connector,
    input_cols=NUMERICAL_COLS,
    label_col=LABEL_COL
)
Copy

使用 PyTorch 分布器进行分片

您可以使用 ShardedDataConnector 将数据分片到多个节点,以便在 Snowflake PyTorch 分布器上进行分布式训练。

以下示例展示了如何在 digits 数据集上训练 PyTorch 模型,并将数据分片分配到多个进程中进行训练。

from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
    PyTorchTrainer,
    ScalingConfig,
    WorkerResourceConfig,
    getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)

# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)

# Define the PyTorch model
class DigitsModel(nn.Module):
    def __init__(self):
        super(DigitsModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader
    from torch import nn
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Get context with data shards and model directory
    context = getContext()
    dataset_map = context.get_dataset_map()
    model_dir = context.get_model_dir()
    training_data = dataset_map["train"].get_shard().to_torch_dataset()
    train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)

    dist.init_process_group()
    device = "cpu"
    label_col = '"target"'
    batch_size = 64

    model = DDP(DigitsModel())
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # Training loop
    for epoch in range(5):
        for batch, batch_data in enumerate(train_dataloader):
            y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
            X = torch.concat(
                [tensor.to(torch.float32) for tensor in batch_data.values()],
                dim=-1,
            ).to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")

    # Save the model
    if dist.get_rank() == 0:
        torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))

# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
    train_func=train_func,
    scaling_config=ScalingConfig(
        num_nodes=1,
        num_workers_per_node=4,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
    ),
)

# Run distributed training
response = pytorch_trainer.run(
    dataset_map=dict(
        train=sharded_data_connector,
    )
)
Copy

从 Snowflake 暂存区加载非结构化数据

使用 Snowflake DataSource APIs 从 Snowflake 暂存区读取非结构化数据。每种文件格式都有对应的数据源类,定义了如何读取数据。

以下显示了加载数据所用的文件格式及相应 APIs:

  • 二进制文件SFStageBinaryFileDataSource

  • 文本文件SFStageTextDataSource

  • CSV 文件SFStageCSVDataSource

  • Parquet 文件SFStageParquetDataSource

  • 图片文件SFStageImageDataSource

加载和处理数据

创建 Snowflake 数据源时,必须提供以下内容:

  • 要从中读取数据的暂存区的名称

  • 具有该暂存区的数据库(默认为当前会话)

  • 具有该暂存区的架构(默认为当前会话)

  • 用于过滤从数据源读取的文件的模式(可选)

Data API 或 Data Connector 会检索提供路径中匹配文件模式的所有文件。

定义 Snowflake 数据源后,可以将数据加载到 Ray 数据集。使用 Ray 数据集,您可以执行以下操作:

  • 将数据集与 Ray APIs 一起使用

  • 将数据集传递给 DataConnector

  • 如有需要,将其转换为 pandas 或 PyTorch 数据集。

以下示例执行以下操作:

  • 从 Snowflake 暂存区将 Parquet 文件读取到 Ray 数据集

  • 将数据集转换为 DataConnector

import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector

data_source = SFStageParquetDataSource(
    stage_location="@stage/path/",
    database="DB_NAME", # optional
    schema="SCHEMA_NAME", # optional
    file_pattern='*.parquet', # optional
)

# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)

dc = DataConnector.from_ray_dataset(ray_ds)
Copy

将结构化数据写回 Snowflake 表

使用 Snowflake DataSink API 将笔记本或 ML 作业中的结构化数据写回 Snowflake 表。您可以将转换或预测后的数据集写入 Snowflake 进行进一步分析或存储。

定义数据接收端时需提供以下内容:

  • 暂存区名称

  • 数据库名称(默认为当前会话)

  • 架构名称(默认为当前会话)

  • 匹配特定文件的文件模式(可选)

以下示例定义了数据接收端:

from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
    table_name="table_name",
    database="db_name",
    schema="schema_name",
    auto_create_table=True, # create table if not exists
    override=True # replace vs insert to table
)
Copy

定义数据接收端后,可以使用以下代码将 Ray 数据集写入 Snowflake 表。

import ray

# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)

# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)

# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
Copy

最佳实践和注意事项

为优化性能和提高资源利用率,请考虑以下最佳实践:

并行:设计数据源实现以利用 Ray 的分布式特性。根据用例自定义并行性和并发参数。您可以手动定义在每个步骤中为每个任务分配多少资源。

分区:默认情况下,Ray 内部逻辑会根据资源和数据大小对数据集进行分区。您可以根据用例使用 ray_ds.repartition(X) 自定义分区数量,以选择大量小任务或少量大任务。

最佳实践:请参阅 Ray Data 用户指南 (https://docs.ray.io/en/latest/data/user-guide.html) 获取更多指导。

Ray API 详情

  • Ray 数据源 (https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html)

  • Ray 批量映射(批量转换) (https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html)

后续步骤

加载数据后,您可以:

语言: 中文