多节点集群 ML 上的容器运行时

在本预览版中,ML 容器运行时 允许您在 Snowflake 笔记本中的多节点集群上运行 ML 工作负载。snowflake-ml-python 库包括设置计算池中可用于 ML 工作负载的节点数量的 APIs,允许在不调整计算池大小的情况下扩展工作负载的可用资源。另一个 API 检索活动节点列表。

多节点群集将一个节点指定为 节点。其他节点被称为 工作线程 节点。头节点协调集群中的并行操作,还为运行工作负载贡献其计算资源。具有一个活动节点的多节点集群只有一个头节点。具有三个活动节点的多节点集群具有一个头节点和两个工作线程节点,所有三个节点都参与运行您的工作负载。

先决条件

要使用多节点集群运行 ML 工作负载,您需要:

  • 一个可以访问笔记本的活动的 Snowflake 账户。请参阅 :doc:` Snowflake 笔记本 </user-guide/ui-snowsight/notebooks>`。

  • 创建和管理使用容器运行时的笔记本的权限。请参阅 :doc:` ML 容器运行时笔记本的信息 </developer-guide/snowflake-ml/notebooks-on-spcs>`。

配置计算池

要使用多节点设置,您需要一个至少包含两个节点的计算池。您可以 创建新计算池更改现有计算池。在任一命令中,传递一个 MAX_NODES 实参来设置池的最大容量。最好预置一个或多个额外节点,这样您就可以轻松地向上或向下扩展以应对更大或更小的工作负载。

要查看计算池的容量,请使用 DESCRIBE COMPUTE POOL 命令。容量在返回表的 MAX_NODES 列中。

DESCRIBE COMPUTE POOL my_pool;
Copy

要设置计算池的容量,请使用 ALTER COMPUTE POOL 命令。

ALTER COMPUTE POOL <compute_pool_name>
    SET MAX_NODES = <total_capacity>;
Copy

在多节点集群上运行工作负载

为笔记本电脑选择多节点计算池是使用计算池中的多个节点运行 ML 工作负载所需的唯一操作。

在笔记本中,使用 snowflake.ml.runtime_cluster.scale_cluster Python API 设置活动节点的数量。计算池中活动节点的数量是可用于运行工作负载的节点数量,最高可达该池的 MAX_NODES。该方法将所需的活动节点总数(包括头节点和所有工作线程节点)作为其主要参数。

备注

该函数默认是阻塞式的(即会等待扩展操作完成),并设有 12 分钟的超时限制。如果操作超时,它将自动回滚到以前的状态。

扩展操作不会跨会话持续进行。也就是说,如果笔记本以非零的工作线程节点结尾,则它不会在下次启动笔记本时自动向上扩展。您必须再次调用扩展 API 来设置工作线程节点的数量。

语法

snowflake.ml.runtime_cluster.scale_cluster(
    expected_cluster_size: int,
    *,
    notebook_name: Optional[str] = None,
    is_async: bool = False,
    options: Optional[Dict[str, Any]] = None
) -> bool
Copy

实参

  • expected_cluster_size (int):计算池中活动节点的数量,直至该池的 MAX_NODES。这包括头节点和所有工作线程节点。

  • notebook_name (Optional[str]):运行工作负载的笔记本的名称。要扩展的计算池是运行指定笔记本电脑的池。如果未提供,则将根据当前上下文自动确定。如果使用了错误的笔记本名称,则会引发异常。

  • is_async (bool):控制函数是否阻塞等待扩展:

    • 如果为 False(默认):在集群完全准备就绪或操作超时之前,该函数会一直处于阻塞状态。

    • 如果为 True:该函数在确认扩展请求已被接受后立即返回。

  • options (Optional[Dict[str, Any]]):高级配置选项:

    • rollback_after_seconds (int):如果扩展未完成,则为自动回滚之前的最大时间。默认值为 720 秒。

    • block_until_min_cluster_size (int):函数返回之前必须准备就绪的最小节点数。

返回

若计算池成功扩展到指定数量的活动节点,则为 True。否则会引发异常。

示例

from snowflake.ml.runtime_cluster import scale_cluster

# Example 1: Scale up the cluster
scale_cluster(3) # Scales the cluster to 3 total nodes (1 head + 2 workers)

# Example 2: Scale down the cluster
scale_cluster(1) # Scales the cluster to 1 head + 0 workers

# Example 3: Asynchronous scaling - function returns immediately after request is accepted
scale_cluster(5, is_async=True)

# Example 4: Scaling with custom options - wait for at least 2 nodes to be ready
scale_cluster(5, options={"block_until_min_cluster_size": 2})
Copy

获取可用节点数

使用 get_nodes API 获取有关集群中活动节点的信息。该函数不带任何实参。

语法

get_nodes() -> list
Copy

返回

包含集群中活动节点详细信息的列表。列表中的每个元素都是一个字典,其键值如下:

  • name (str):应用程序的名称。

  • cpus (int):节点上的 CPUs 数量。

  • gpus (int):节点上的 GPUs 数量。

示例

from snowflake.ml.runtime_cluster import get_nodes

# Example: Get the active nodes in the cluster
nodes = get_nodes()
print(len(nodes), nodes)
Copy

示例代码的输出如下所示:

2 [{'name': "IP1", 'cpus': 4, 'gpus': 0}, {'name': "IP2", 'cpus': 8, 'gpus': 1}]

多节点集群上的分布式训练

ML 的容器运行时支持 LightGBM、XGBoost 和 PyTorch 模型的分布式训练。LightGBMEstimator、XGBEstimator 和 PyTorch 分布式训练 APIs 详细记录在 `API 参考<https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/distributors>`_ 中。

扩展配置

所有模型都提供可选的扩展配置参数,允许您为训练任务指定资源。扩展配置是特定模型类的实例:LightGBMScalingConfigXGBScalingConfigPyTorchScalingConfig 取决于模型类型。

LightGBM 和 XGBoost 扩展配置对象具有以下属性:

  • num_workers:用于训练的工作进程的数量。默认为 -1,自动设置工作进程的数量。

  • num_cpu_per_worker:每个工作进程分配的 CPUs 数量。默认为 -1,自动设置每个工作进程的 CPUs 数量。

  • use_gpu:是否使用 GPU 进行训练。默认值为“无”,允许估算器根据环境进行选择。使用 GPU 时,请务必同时配置模型参数以使用 GPU。

备注

通常,将 num_workersnum_cpu_per_worker 保留为其默认值,让 ML 容器服务决定分配这些资源的最佳方式。运行时为计算池中的每个节点分配一个工作线程,并为每个工作线程分配完成任务所需的 CPUs 或 GPUs。

PyTorch 扩展配置对象具有以下属性:

  • num_cpus:为每个工作线程预留的 CPU 内核数量。

  • num_gpus:要为每个工作线程预留的 GPUs 数量。默认值为 0,表示无保留 GPUs。

LightGBM/XGBoost 模型的分布式训练

内存使用量

通常,含 n GB RAM 的节点可以在不耗尽内存的情况下使用 n/4n/3 的数据训练模型。最大数据集大小取决于工作线程的数量和使用的训练算法。

计算性能

多节点训练的性能取决于模型参数,包括树深度、树数量以及最大数据箱数等。增加这些参数值可以增加数据集的总训练时间。

示例

以下示例显示如何在多节点集群上训练 XGBoost 模型。LightGBM 模型训练是相似的。

from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector
from implementations.ray_data_ingester import RayDataIngester
table_name = "MULTINODE_SAMPLE_TRAIN_DS"

# Use code like the following to generate example data
"""
# Create a table in current database/schema and store data there
def generate_dataset_sql(db, schema, table_name) -> str:
    sql_script = f"CREATE TABLE IF NOT EXISTS {db}.{schema}.{table_name} AS \n"
    sql_script += f"select \n"
    for i in range(1, 10):
        sql_script += f"uniform(0::float, 10::float, random()) AS FT_{i}, \n"
    sql_script += f"FT_1 + FT_2 AS TARGET, \n"
    sql_script += f"from TABLE(generator(rowcount=>({10000})));"
    return sql_script
session.sql(generate_dataset_sql(session.get_current_database(), session.get_current_schema(), table_name)).collect()
"""

sample_train_df = session.table(table_name)
INPUT_COLS = list(sample_train_df.columns)
LABEL_COL = "TARGET"
INPUT_COLS.remove(LABEL_COL)

params = {
    "eta": 0.1,
    "max_depth": 8,
    "min_child_weight": 100,
    "tree_method": "hist",
}

scaling_config = XGBScalingConfig(
    use_gpu=False
)

estimator = XGBEstimator(
    n_estimators=50,
    objective="reg:squarederror",
    params=params,
    scaling_config=scaling_config,
)
data_connector = DataConnector.from_dataframe(
    sample_train_df, ingestor_class=RayDataIngester
)

xgb_model = estimator.fit(
    data_connector, input_cols=INPUT_COLS, label_col=LABEL_COL
)
Copy

PyTorch 模型的分布式训练

PyTorch 模型使用在每个工作进程中调用的训练函数 (train_func) 进行训练。

使用上下文 APIs

在执行训练函数期间,您可以使用上下文 APIs 来访问有关训练环境的基本元数据,并将参数从调用者转发至训练函数。有关 PyTorch 上下文类的文档,请参见 相关类

上下文对象公开了运行时元数据,您可以使用这些元数据来自定义训练函数的行为。您可以使用提供的方法 get_node_rankget_local_rankget_world_size 和其他方法检索这些信息。

以下代码是从上下文对象中检索值 testtrain 的示例;这些值通过名为 dataset_map 的键传递(您可参阅本主题后面的训练函数示例)。这些值用于创建 PyTorch 数据集对象,然后将其传递给模型。

dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())

hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
Copy

指标报告

使用上下文对象的 metrics_reporter 方法将指标从训练函数发送到控制代码。这样可以实时监控和调试训练过程,如以下示例所示。

context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
Copy

示例

以下示例是 PyTorch 模型的训练函数。

def train_func():
    import io
    import base64
    import time
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim
    import torch.distributed as dist
    from torch.nn.parallel import DistributedDataParallel as DDP
    from torchvision import transforms
    from torch.utils.data import IterableDataset
    from torch.optim.lr_scheduler import StepLR
    from PIL import Image
    from snowflake.ml.modeling.distributors.pytorch import get_context

    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 32, 3, 1)
            self.conv2 = nn.Conv2d(32, 64, 3, 1)
            self.dropout1 = nn.Dropout(0.25)
            self.dropout2 = nn.Dropout(0.5)
            self.fc1 = nn.Linear(9216, 128)
            self.fc2 = nn.Linear(128, 10)

        def forward(self, x):
            x = self.conv1(x)
            x = F.relu(x)
            x = self.conv2(x)
            x = F.relu(x)
            x = F.max_pool2d(x, 2)
            x = self.dropout1(x)
            x = torch.flatten(x, 1)
            x = self.fc1(x)
            x = F.relu(x)
            x = self.dropout2(x)
            x = self.fc2(x)
            output = F.log_softmax(x, dim=1)
            return output

    class DecodedDataset(IterableDataset):
        def __init__(self, source_dataset):
            self.source_dataset = source_dataset
            self.transforms = transforms.ToTensor()  # Ensure we apply ToTensor transform

        def __iter__(self):
            for row in self.source_dataset:
                base64_image = row['IMAGE']
                image = Image.open(io.BytesIO(base64.b64decode(base64_image)))
                # Convert the image to a tensor
                image = self.transforms(image)  # Converts PIL image to tensor

                labels = row['LABEL']
                yield image, int(labels)

    def train(model, device, train_loader, optimizer, epoch):
        model.train()
        batch_idx = 1
        for data, target in train_loader:
            # print(f"data : {data} \n target: {target}")
            # raise RuntimeError("test")
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % 100 == 0:
                print('Train Epoch: {} [Processed {} images]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), loss.item()))
            batch_idx += 1

    context = get_context()
    rank = context.get_local_rank()
    device = f"cuda:{rank}"
    is_distributed = context.get_world_size() > 1
    if is_distributed:
        dist.init_process_group(backend="nccl")
    print(f"Worker Rank : {context.get_rank()}, world_size: {context.get_world_size()}")

    dataset_map = context.get_dataset_map()
    train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
    test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())

    batch_size = 64
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        pin_memory=True,
        pin_memory_device=f"cuda:{rank}"
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=batch_size,
        pin_memory=True,
        pin_memory_device=f"cuda:{rank}"
    )

    model = Net().to(device)
    if is_distributed:
        model = DDP(model)
    optimizer = optim.Adadelta(model.parameters())
    scheduler = StepLR(optimizer, step_size=1)

    hyper_parms = context.get_hyper_params()
    num_epochs = int(hyper_parms['num_epochs'])
    start_time = time.time()
    for epoch in range(num_epochs):
        train(model, device, train_loader, optimizer, epoch+1)
        scheduler.step()
    now = time.time()
    context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
    test(model, device, test_loader, context)
Copy

以下代码说明了如何根据前面的训练函数启动分布式训练。该示例创建了一个 PyTorch 分布器对象以在多个节点上运行训练,通过上下文对象将训练和测试数据连接到训练函数,并在运行训练器之前建立扩展配置。

# Set up PyTorchDistributor
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.data.data_connector import DataConnector

df = session.table("MNIST_60K")

train_df, test_df = df.random_split([0.99, 0.01], 0)

# Create data connectors for training and test data
train_data = ShardedDataConnector.from_dataframe(train_df)
test_data = DataConnector.from_dataframe(test_df)

pytorch_trainer = PyTorchDistributor(
    train_func=train_func,
    scaling_config=PyTorchScalingConfig(  # scaling configuration
        num_nodes=2,
        num_workers_per_node=1,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=1),
    )
)

# Run the trainer.
results = pytorch_trainer.run(  # accepts context values as parameters
    dataset_map={"train": train_data, "test": test_data},
    hyper_params={"num_epochs": "1"}
)
Copy

已知限制和常见问题

这些限制和问题很可能会在 ML 容器运行时的多节点训练正式发布之前得到解决。

扩展操作超时

扩展操作可能会失败,因为新节点在 12 分钟超时内尚未准备就绪。可能的原因包括:

  • 池容量不足。 您请求的节点数超过了池的 MAX_NODES。增加池的 MAX_NODES。

  • 资源竞争。 12 分钟可能不足以预热添加的节点。将连接池的 MIN_NODES 参数设置为较大数值以保持部分节点处于热备状态,或通过多次调用 scale_cluster 并采用较小增量值的方式来增加活动节点数量。另一种选择是使用异步模式跳过等待所有节点准备就绪的过程:

    • 使用异步模式进行非阻塞操作:

    scale_cluster(3, is_async=True)
    
    Copy
    • 增加超时阈值:

    scale_cluster(3, options={"rollback_after_seconds": 1200})
    
    Copy

笔记本名称错误

如果您看到诸如“Notebook <name> does not exist or not authorized”之类的错误消息,则表示自动检测到的笔记本名称与当前笔记本不匹配。这可能发生在以下情况下:

  • 您的笔记本名称包含点和空格等特殊字符

  • 自动笔记本名称检测无法正常运作

解决方案:显式提供笔记本名称参数。请注意,笔记本名称需要双引号才能被视为 标识符

# Explicitly specifying the notebook name if naming auto detection doesn't work
try:
    scale_cluster(2)
except Exception as e:
    print(e)  # Output: "Notebook "WRONG_NOTEBOOK" does not exist or not authorized"
    scale_cluster(2, notebook_name='"MY_NOTEBOOK"')
Copy

扩展操作失败后 SPCS 服务未被清除

当扩展操作失败时,系统应清理操作中创建的所有资源。但是,如果失败,则可能会有一项或多项 SPCS 服务处于 PENDING 或 FAILED 状态。处于 PENDING 状态的服务可能会稍后变 ACTIVE,或者如果计算池中没有容量,则会永久处于 PENDING 状态。

要移除处于 PENDING 或 FAILED 状态的服务,请将集群扩展到只有一个节点(零个工作线程节点)。要清理所有启动的服务,请在笔记本界面中点击“End Session”来结束当前的笔记本会话。

语言: 中文