加载和写入数据

使用 Snowflake ML 高效地将 Snowflake 表和暂存区中的数据加载到您的机器学习工作流中。Snowflake ML 提供了优化的数据加载能力,利用 Snowflake 的分布式处理加速训练和推理工作流的数据摄入。

您可以使用以下方式加载和处理数据:

  • Snowflake 笔记本:用于探索数据和构建 ML 模型的交互式开发环境。有关更多信息,请参阅 Notebooks on Container Runtime

  • Snowflake ML 作业:可在任何开发环境中异步运行 ML 工作负载。有关更多信息,请参阅 Snowflake ML 作业

Both Notebooks and ML Jobs run on the Container Runtime, which provides preconfigured environments optimized for machine learning workloads with distributed processing capabilities. The Container Runtime uses Ray, an open-source framework for distributed computing, to efficiently process data across multiple compute nodes. For more information about the Container Runtime, see Container Runtime.

Snowflake ML 提供不同的 APIs 来加载结构化和非结构化数据:

结构化数据(表格和数据集)

非结构化数据(暂存区中的文件)

下表可帮助您为具体用例选择合适的 API:

数据源和 APIs

数据类型

数据源

加载用 API

写入用 API

结构化

Snowflake 表

DataConnector

DataSink

结构化

Snowflake 数据集

DataConnector

DataSink

非结构化

CSV 文件(暂存区)

DataSource API

不适用

非结构化

Parquet 文件(暂存区)

DataSource API

不适用

非结构化

其他暂存文件

DataSource API

不适用

从 Snowflake 表加载结构化数据

使用 Snowflake DataConnector 将 Snowflake 表和 Snowflake 数据集中的结构化数据加载到 Snowflake 笔记本或 Snowflake ML 作业中。DataConnector 通过在多个计算节点上并行读取来加速数据加载。

DataConnector 可以与 Snowpark DataFrames 或 Snowflake 数据集一起使用:

  • Snowpark DataFrames:提供对 Snowflake 表中数据的直接访问。开发阶段使用最佳。

  • Snowflake 数据集:版本化的架构级对象。最适合用于生产工作流程。有关更多信息,请参阅 Snowflake 数据集

并行读取后,DataConnector 可以将数据转换为以下数据结构之一:

  • pandas dataframe

  • PyTorch dataset

  • TensorFlow dataset

创建 DataConnector

您可以从 Snowpark DataFrame 或 Snowflake 数据集创建 DataConnector。

使用以下代码从 Snowpark DataFrame 创建 DataConnector:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
Copy

使用以下代码从 Snowflake 数据集创建 DataConnector:

from snowflake.ml.data.data_connector import DataConnector

# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
Copy

将 DataConnector 转换为其他格式

创建 DataConnector 后,可以将其转换为不同数据结构,以供不同 ML 框架使用。

您可以将 DataConnector 转换为 pandas dataframe,用于 scikit-learn 和其他兼容 pandas 的库。

以下将 Snowflake 表中的数据加载到 pandas dataframe 并训练 XGBoost 分类器:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb

session = get_active_session()

# Specify training table location
table_name = "TRAINING_TABLE"

# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))

# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()

# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]

# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
Copy

与 Snowflake 的分布式训练 APIs 搭配使用

为获得最佳性能,可以将 DataConnector 直接传递给 Snowflake 优化的分布式训练 APIs,而无需先转换为 pandas、PyTorch 或 TensorFlow 数据集。

以下示例使用 Snowflake 分布式 XGBoost 估算器训练 XGBoost 模型:

from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
    XGBEstimator,
    XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)

# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
    n_estimators=1,
    objective="reg:squarederror",
    scaling_config=XGBScalingConfig(use_gpu=False),
)

# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
    data_connector,
    input_cols=NUMERICAL_COLS,
    label_col=LABEL_COL
)
Copy

使用 PyTorch 分布器进行分片

您可以使用 ShardedDataConnector 将数据分片到多个节点,以便在 Snowflake PyTorch 分布器上进行分布式训练。

以下示例展示了如何在 digits 数据集上训练 PyTorch 模型,并将数据分片分配到多个进程中进行训练。

from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
    PyTorchTrainer,
    ScalingConfig,
    WorkerResourceConfig,
    getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)

# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)

# Define the PyTorch model
class DigitsModel(nn.Module):
    def __init__(self):
        super(DigitsModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader
    from torch import nn
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Get context with data shards and model directory
    context = getContext()
    dataset_map = context.get_dataset_map()
    model_dir = context.get_model_dir()
    training_data = dataset_map["train"].get_shard().to_torch_dataset()
    train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)

    dist.init_process_group()
    device = "cpu"
    label_col = '"target"'
    batch_size = 64

    model = DDP(DigitsModel())
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # Training loop
    for epoch in range(5):
        for batch, batch_data in enumerate(train_dataloader):
            y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
            X = torch.concat(
                [tensor.to(torch.float32) for tensor in batch_data.values()],
                dim=-1,
            ).to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")

    # Save the model
    if dist.get_rank() == 0:
        torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))

# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
    train_func=train_func,
    scaling_config=ScalingConfig(
        num_nodes=1,
        num_workers_per_node=4,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
    ),
)

# Run distributed training
response = pytorch_trainer.run(
    dataset_map=dict(
        train=sharded_data_connector,
    )
)
Copy

从 Snowflake 暂存区加载非结构化数据

使用 Snowflake DataSource APIs 从 Snowflake 暂存区读取非结构化数据。每种文件格式都有对应的数据源类,定义了如何读取数据。

以下显示了加载数据所用的文件格式及相应 APIs:

  • 二进制文件SFStageBinaryFileDataSource

  • 文本文件SFStageTextDataSource

  • CSV 文件SFStageCSVDataSource

  • Parquet 文件SFStageParquetDataSource

  • 图片文件SFStageImageDataSource

加载和处理数据

创建 Snowflake 数据源时,必须提供以下内容:

  • 要从中读取数据的暂存区的名称

  • 具有该暂存区的数据库(默认为当前会话)

  • 具有该暂存区的架构(默认为当前会话)

  • 用于过滤从数据源读取的文件的模式(可选)

Data API 或 Data Connector 会检索提供路径中匹配文件模式的所有文件。

定义 Snowflake 数据源后,可以将数据加载到 Ray 数据集。使用 Ray 数据集,您可以执行以下操作:

  • 将数据集与 Ray APIs 一起使用

  • 将数据集传递给 DataConnector

  • 如有需要,将其转换为 pandas 或 PyTorch 数据集。

以下示例执行以下操作:

  • 从 Snowflake 暂存区将 Parquet 文件读取到 Ray 数据集

  • 将数据集转换为 DataConnector

import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector

data_source = SFStageParquetDataSource(
    stage_location="@stage/path/",
    database="DB_NAME", # optional
    schema="SCHEMA_NAME", # optional
    file_pattern='*.parquet', # optional
)

# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)

dc = DataConnector.from_ray_dataset(ray_ds)
Copy

将结构化数据写回 Snowflake 表

使用 Snowflake DataSink API 将笔记本或 ML 作业中的结构化数据写回 Snowflake 表。您可以将转换或预测后的数据集写入 Snowflake 进行进一步分析或存储。

定义数据接收端时需提供以下内容:

  • 暂存区名称

  • 数据库名称(默认为当前会话)

  • 架构名称(默认为当前会话)

  • 匹配特定文件的文件模式(可选)

以下示例定义了数据接收端:

from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
    table_name="table_name",
    database="db_name",
    schema="schema_name",
    auto_create_table=True, # create table if not exists
    override=True # replace vs insert to table
)
Copy

定义数据接收端后,可以使用以下代码将 Ray 数据集写入 Snowflake 表。

import ray

# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)

# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)

# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
Copy

最佳实践和注意事项

为优化性能和提高资源利用率,请考虑以下最佳实践:

并行:设计数据源实现以利用 Ray 的分布式特性。根据用例自定义并行性和并发参数。您可以手动定义在每个步骤中为每个任务分配多少资源。

分区:默认情况下,Ray 内部逻辑会根据资源和数据大小对数据集进行分区。您可以根据用例使用 ray_ds.repartition(X) 自定义分区数量,以选择大量小任务或少量大任务。

最佳实践:请参阅 Ray Data 用户指南 (https://docs.ray.io/en/latest/data/user-guide.html) 获取更多指导。

Ray API 详情

  • Ray 数据源 (https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html)

  • Ray 批量映射(批量转换) (https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html)

后续步骤

加载数据后,您可以:

语言: 中文