Container Runtime on multi-node clusters¶
In this preview, Container Runtime allows you to run
ML workloads on multi-node clusters in Snowflake Notebooks. The snowflake-ml-python library includes APIs to set the
number of nodes in the compute pool available for ML workloads, allowing the resources available to a workload to be
scaled without resizing the compute pool. Another API retrieves a list of active nodes.
多节点群集将一个节点指定为 头 节点。其他节点被称为 工作线程 节点。头节点协调集群中的并行操作,还为运行工作负载贡献其计算资源。具有一个活动节点的多节点集群只有一个头节点。具有三个活动节点的多节点集群具有一个头节点和两个工作线程节点,所有三个节点都参与运行您的工作负载。
先决条件¶
要使用多节点集群运行 ML 工作负载,您需要:
一个可以访问笔记本的活动的 Snowflake 账户。请参阅 :doc:` Snowflake 笔记本 </user-guide/ui-snowsight/notebooks>`。
Privileges to create and manage notebooks that use the container runtime. See Notebooks on Container Runtime.
配置计算池¶
要使用多节点设置,您需要一个至少包含两个节点的计算池。您可以 创建新计算池 或 更改现有计算池。在任一命令中,传递一个 MAX_NODES 实参来设置池的最大容量。最好预置一个或多个额外节点,这样您就可以轻松地向上或向下扩展以应对更大或更小的工作负载。
要查看计算池的容量,请使用 DESCRIBE COMPUTE POOL 命令。容量在返回表的 MAX_NODES 列中。
DESCRIBE COMPUTE POOL my_pool;
要设置计算池的容量,请使用 ALTER COMPUTE POOL 命令。
ALTER COMPUTE POOL <compute_pool_name>
SET MAX_NODES = <total_capacity>;
在多节点集群上运行工作负载¶
为笔记本电脑选择多节点计算池是使用计算池中的多个节点运行 ML 工作负载所需的唯一操作。
在笔记本中,使用 snowflake.ml.runtime_cluster.scale_cluster Python API 设置活动节点的数量。计算池中活动节点的数量是可用于运行工作负载的节点数量,最高可达该池的 MAX_NODES。该方法将所需的活动节点总数(包括头节点和所有工作线程节点)作为其主要参数。
备注
该函数默认是阻塞式的(即会等待扩展操作完成),并设有 12 分钟的超时限制。如果操作超时,它将自动回滚到以前的状态。
扩展操作不会跨会话持续进行。也就是说,如果笔记本以非零的工作线程节点结尾,则它不会在下次启动笔记本时自动向上扩展。您必须再次调用扩展 API 来设置工作线程节点的数量。
语法¶
snowflake.ml.runtime_cluster.scale_cluster(
expected_cluster_size: int,
*,
notebook_name: Optional[str] = None,
is_async: bool = False,
options: Optional[Dict[str, Any]] = None
) -> bool
实参¶
expected_cluster_size(int):计算池中活动节点的数量,直至该池的 MAX_NODES。这包括头节点和所有工作线程节点。notebook_name(Optional[str]):运行工作负载的笔记本的名称。要扩展的计算池是运行指定笔记本电脑的池。如果未提供,则将根据当前上下文自动确定。如果使用了错误的笔记本名称,则会引发异常。is_async(bool):控制函数是否阻塞等待扩展:如果为 False(默认):在集群完全准备就绪或操作超时之前,该函数会一直处于阻塞状态。
如果为 True:该函数在确认扩展请求已被接受后立即返回。
options(Optional[Dict[str, Any]]):高级配置选项:rollback_after_seconds(int):如果扩展未完成,则为自动回滚之前的最大时间。默认值为 720 秒。block_until_min_cluster_size(int):函数返回之前必须准备就绪的最小节点数。
返回¶
若计算池成功扩展到指定数量的活动节点,则为 True。否则会引发异常。
示例¶
from snowflake.ml.runtime_cluster import scale_cluster
# Example 1: Scale up the cluster
scale_cluster(3) # Scales the cluster to 3 total nodes (1 head + 2 workers)
# Example 2: Scale down the cluster
scale_cluster(1) # Scales the cluster to 1 head + 0 workers
# Example 3: Asynchronous scaling - function returns immediately after request is accepted
scale_cluster(5, is_async=True)
# Example 4: Scaling with custom options - wait for at least 2 nodes to be ready
scale_cluster(5, options={"block_until_min_cluster_size": 2})
获取可用节点数¶
使用 get_nodes API 获取有关集群中活动节点的信息。该函数不带任何实参。
语法¶
get_nodes() -> list
返回¶
包含集群中活动节点详细信息的列表。列表中的每个元素都是一个字典,其键值如下:
name(str):应用程序的名称。cpus(int):节点上的 CPUs 数量。gpus(int):节点上的 GPUs 数量。
示例¶
from snowflake.ml.runtime_cluster import get_nodes
# Example: Get the active nodes in the cluster
nodes = get_nodes()
print(len(nodes), nodes)
示例代码的输出如下所示:
2 [{'name': "IP1", 'cpus': 4, 'gpus': 0}, {'name': "IP2", 'cpus': 8, 'gpus': 1}]
多节点集群上的分布式训练¶
The Container Runtime supports distributed training of LightGBM, XGBoost, and PyTorch models. The distributed training APIs for LightGBMEstimator, XGBEstimator, and PyTorch are documented in detail in the API Reference.
扩展配置¶
所有模型都提供可选的扩展配置参数,允许您为训练任务指定资源。扩展配置是特定模型类的实例:LightGBMScalingConfig、XGBScalingConfig 或 PyTorchScalingConfig 取决于模型类型。
LightGBM 和 XGBoost 扩展配置对象具有以下属性:
num_workers:用于训练的工作进程的数量。默认为 -1,自动设置工作进程的数量。num_cpu_per_worker:每个工作进程分配的 CPUs 数量。默认为 -1,自动设置每个工作进程的 CPUs 数量。use_gpu:是否使用 GPU 进行训练。默认值为“无”,允许估算器根据环境进行选择。使用 GPU 时,请务必同时配置模型参数以使用 GPU。
备注
Generally, leave num_workers and num_cpu_per_worker at their default values, so Container Services
determines the best way to distribute these resources. The runtime assigns a worker for each node in the compute pool,
and the necessary CPUs or GPUs for each worker to complete the task.
PyTorch 扩展配置对象具有以下属性:
num_cpus:为每个工作线程预留的 CPU 内核数量。num_gpus:要为每个工作线程预留的 GPUs 数量。默认值为 0,表示无保留 GPUs。
LightGBM/XGBoost 模型的分布式训练¶
- 内存使用量
通常,含 n GB RAM 的节点可以在不耗尽内存的情况下使用 n/4 到 n/3 的数据训练模型。最大数据集大小取决于工作线程的数量和使用的训练算法。
- 计算性能
多节点训练的性能取决于模型参数,包括树深度、树数量以及最大数据箱数等。增加这些参数值可以增加数据集的总训练时间。
示例¶
以下示例显示如何在多节点集群上训练 XGBoost 模型。LightGBM 模型训练是相似的。
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector
from implementations.ray_data_ingester import RayDataIngester
table_name = "MULTINODE_SAMPLE_TRAIN_DS"
# Use code like the following to generate example data
"""
# Create a table in current database/schema and store data there
def generate_dataset_sql(db, schema, table_name) -> str:
sql_script = f"CREATE TABLE IF NOT EXISTS {db}.{schema}.{table_name} AS \n"
sql_script += f"select \n"
for i in range(1, 10):
sql_script += f"uniform(0::float, 10::float, random()) AS FT_{i}, \n"
sql_script += f"FT_1 + FT_2 AS TARGET, \n"
sql_script += f"from TABLE(generator(rowcount=>({10000})));"
return sql_script
session.sql(generate_dataset_sql(session.get_current_database(), session.get_current_schema(), table_name)).collect()
"""
sample_train_df = session.table(table_name)
INPUT_COLS = list(sample_train_df.columns)
LABEL_COL = "TARGET"
INPUT_COLS.remove(LABEL_COL)
params = {
"eta": 0.1,
"max_depth": 8,
"min_child_weight": 100,
"tree_method": "hist",
}
scaling_config = XGBScalingConfig(
use_gpu=False
)
estimator = XGBEstimator(
n_estimators=50,
objective="reg:squarederror",
params=params,
scaling_config=scaling_config,
)
data_connector = DataConnector.from_dataframe(
sample_train_df, ingestor_class=RayDataIngester
)
xgb_model = estimator.fit(
data_connector, input_cols=INPUT_COLS, label_col=LABEL_COL
)
PyTorch 模型的分布式训练¶
PyTorch 模型使用在每个工作进程中调用的训练函数 (train_func) 进行训练。
使用上下文 APIs¶
在执行训练函数期间,您可以使用上下文 APIs 来访问有关训练环境的基本元数据,并将参数从调用者转发至训练函数。有关 PyTorch 上下文类的文档,请参见 相关类。
上下文对象公开了运行时元数据,您可以使用这些元数据来自定义训练函数的行为。您可以使用提供的方法 get_node_rank、get_local_rank、get_world_size 和其他方法检索这些信息。
以下代码是从上下文对象中检索值 test 和 train 的示例;这些值通过名为 dataset_map 的键传递(您可参阅本主题后面的训练函数示例)。这些值用于创建 PyTorch 数据集对象,然后将其传递给模型。
dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())
hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
指标报告¶
使用上下文对象的
metrics_reporter方法将指标从训练函数发送到控制代码。这样可以实时监控和调试训练过程,如以下示例所示。context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
示例¶
以下示例是 PyTorch 模型的训练函数。
def train_func():
import io
import base64
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torchvision import transforms
from torch.utils.data import IterableDataset
from torch.optim.lr_scheduler import StepLR
from PIL import Image
from snowflake.ml.modeling.distributors.pytorch import get_context
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
class DecodedDataset(IterableDataset):
def __init__(self, source_dataset):
self.source_dataset = source_dataset
self.transforms = transforms.ToTensor() # Ensure we apply ToTensor transform
def __iter__(self):
for row in self.source_dataset:
base64_image = row['IMAGE']
image = Image.open(io.BytesIO(base64.b64decode(base64_image)))
# Convert the image to a tensor
image = self.transforms(image) # Converts PIL image to tensor
labels = row['LABEL']
yield image, int(labels)
def train(model, device, train_loader, optimizer, epoch):
model.train()
batch_idx = 1
for data, target in train_loader:
# print(f"data : {data} \n target: {target}")
# raise RuntimeError("test")
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
print('Train Epoch: {} [Processed {} images]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), loss.item()))
batch_idx += 1
context = get_context()
rank = context.get_local_rank()
device = f"cuda:{rank}"
is_distributed = context.get_world_size() > 1
if is_distributed:
dist.init_process_group(backend="nccl")
print(f"Worker Rank : {context.get_rank()}, world_size: {context.get_world_size()}")
dataset_map = context.get_dataset_map()
train_dataset = DecodedDataset(dataset_map["train"].get_shard().to_torch_dataset())
test_dataset = DecodedDataset(dataset_map["test"].to_torch_dataset())
batch_size = 64
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=batch_size,
pin_memory=True,
pin_memory_device=f"cuda:{rank}"
)
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=batch_size,
pin_memory=True,
pin_memory_device=f"cuda:{rank}"
)
model = Net().to(device)
if is_distributed:
model = DDP(model)
optimizer = optim.Adadelta(model.parameters())
scheduler = StepLR(optimizer, step_size=1)
hyper_parms = context.get_hyper_params()
num_epochs = int(hyper_parms['num_epochs'])
start_time = time.time()
for epoch in range(num_epochs):
train(model, device, train_loader, optimizer, epoch+1)
scheduler.step()
now = time.time()
context.get_metrics_reporter().log_metrics({"train_func_train_time": int(now-start_time)})
test(model, device, test_loader, context)
以下代码说明了如何根据前面的训练函数启动分布式训练。该示例创建了一个 PyTorch 分布器对象以在多个节点上运行训练,通过上下文对象将训练和测试数据连接到训练函数,并在运行训练器之前建立扩展配置。
# Set up PyTorchDistributor
from snowflake.ml.modeling.distributors.pytorch import PyTorchDistributor, PyTorchScalingConfig, WorkerResourceConfig
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.data.data_connector import DataConnector
df = session.table("MNIST_60K")
train_df, test_df = df.random_split([0.99, 0.01], 0)
# Create data connectors for training and test data
train_data = ShardedDataConnector.from_dataframe(train_df)
test_data = DataConnector.from_dataframe(test_df)
pytorch_trainer = PyTorchDistributor(
train_func=train_func,
scaling_config=PyTorchScalingConfig( # scaling configuration
num_nodes=2,
num_workers_per_node=1,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=0, num_gpus=1),
)
)
# Run the trainer.
results = pytorch_trainer.run( # accepts context values as parameters
dataset_map={"train": train_data, "test": test_data},
hyper_params={"num_epochs": "1"}
)
已知限制和常见问题¶
These limitations and issues are likely to be addressed before multi-node training on Container Runtime is generally available.
扩展操作超时¶
扩展操作可能会失败,因为新节点在 12 分钟超时内尚未准备就绪。可能的原因包括:
池容量不足。 您请求的节点数超过了池的 MAX_NODES。增加池的 MAX_NODES。
资源竞争。 12 分钟可能不足以预热添加的节点。将连接池的 MIN_NODES 参数设置为较大数值以保持部分节点处于热备状态,或通过多次调用
scale_cluster并采用较小增量值的方式来增加活动节点数量。另一种选择是使用异步模式跳过等待所有节点准备就绪的过程:使用异步模式进行非阻塞操作:
scale_cluster(3, is_async=True)
增加超时阈值:
scale_cluster(3, options={"rollback_after_seconds": 1200})
笔记本名称错误¶
如果您看到诸如“Notebook <name> does not exist or not authorized”之类的错误消息,则表示自动检测到的笔记本名称与当前笔记本不匹配。这可能发生在以下情况下:
您的笔记本名称包含点和空格等特殊字符
自动笔记本名称检测无法正常运作
解决方案:显式提供笔记本名称参数。请注意,笔记本名称需要双引号才能被视为 标识符:
# Explicitly specifying the notebook name if naming auto detection doesn't work
try:
scale_cluster(2)
except Exception as e:
print(e) # Output: "Notebook "WRONG_NOTEBOOK" does not exist or not authorized"
scale_cluster(2, notebook_name='"MY_NOTEBOOK"')
扩展操作失败后 SPCS 服务未被清除¶
当扩展操作失败时,系统应清理操作中创建的所有资源。但是,如果失败,则可能会有一项或多项 SPCS 服务处于 PENDING 或 FAILED 状态。处于 PENDING 状态的服务可能会稍后变 ACTIVE,或者如果计算池中没有容量,则会永久处于 PENDING 状态。
要移除处于 PENDING 或 FAILED 状态的服务,请将集群扩展到只有一个节点(零个工作线程节点)。要清理所有启动的服务,请在笔记本界面中点击“End Session”来结束当前的笔记本会话。