Ray Runtime:构建分布式应用工具

发布时间: 更新时间: 总字数:1856 阅读时间:4m 作者: IP上海 分享 网址

Ray 是一个开源的统一框架,用于扩展 AI 和 Python 应用程序。它提供了一个简单、通用的 API,用于构建分布式应用程序,这些应用程序可以从单机扩展到整个集群。

Ray Runtime 介绍

Ray 的核心思想是让你能够以与编写单机 Python 代码几乎相同的方式编写分布式代码。它通过提供以下核心组件来实现这一点:

  • Ray Core: 这是 Ray 的基础,它允许你将普通的 Python 函数转化为 Ray 任务 (tasks) 进行并行执行,或者将 Python 类转化为 Ray Actor 进行分布式有状态计算。Ray Core 还提供了分布式对象存储和调度器,用于管理任务和 Actor 的执行。
  • Ray Libraries: 在 Ray Core 的基础上,Ray 提供了多个高级库,用于简化特定领域的分布式计算:
    • Ray Data: 用于构建可扩展的数据管道,支持大规模数据的读取、转换和处理。
    • Ray Train: 用于简化分布式模型训练,支持 PyTorch, TensorFlow 等主流深度学习框架。
    • Ray Tune: 用于超参数调优,可以在任何规模下高效地搜索最佳模型超参数。
    • Ray Serve: 用于部署机器学习模型和其他业务逻辑,支持灵活的模型部署模式和在线推理。
    • RLlib: 用于强化学习。

Ray 的工作原理

Ray 的主要抽象是 任务 (Tasks)Actor

  • 任务 (Tasks):

    • 你可以通过 @ray.remote 装饰器将任何 Python 函数标记为 Ray 任务。
    • 当调用 .remote() 方法时,Ray 任务会被异步执行,并立即返回一个 对象引用 (ObjectRef)
    • 对象引用是对任务计算结果的未来引用,你可以使用 ray.get() 来获取实际的结果。
    • Ray 会自动将任务调度到可用的 Ray Worker 上执行,从而实现并行化。
  • Actor:

    • 你可以通过 @ray.remote 装饰器将任何 Python 类标记为 Ray Actor。
    • Actor 是有状态的分布式对象。当你实例化一个 Actor 时(例如 MyActor.remote()),Ray 会在一个 Worker 进程中启动一个 Actor 实例。
    • 你可以通过 Actor 的对象引用来调用其方法,这些方法也会作为 Ray 任务异步执行。
    • Actor 维护自己的状态,并且其方法调用可以改变其内部状态。

安装

节点

# For machine learning applications
pip install -U "ray[data,train,tune,serve]"

# For general Python applications
pip install -U "ray[default]"

k8s

  • kuberay 是一个在 Kubernetes 上运行分布式应用的工具包

常见的使用示例

1. 并行化普通函数 (Ray Tasks)

这是一个将一个简单的 Python 函数并行化的例子。

import ray
import time

# 初始化 Ray,如果 Ray 尚未初始化
if ray.is_initialized():
    ray.shutdown()
ray.init()

@ray.remote
def slow_function(i):
    time.sleep(1) # 模拟耗时操作
    return i * 2

start_time = time.time()

# 提交多个任务并行执行
results_refs = [slow_function.remote(i) for i in range(5)]

# 获取所有任务的结果
results = ray.get(results_refs)

end_time = time.time()

print(f"Parallel results: {results}")
print(f"Total time taken (parallel): {end_time - start_time:.2f} seconds")

# 比较:顺序执行
start_time_sequential = time.time()
sequential_results = [slow_function(i) for i in range(5)]
end_time_sequential = time.time()
print(f"Sequential results: {sequential_results}")
print(f"Total time taken (sequential): {end_time_sequential - start_time_sequential:.2f} seconds")

ray.shutdown() # 关闭 Ray 运行时

解释:

  • @ray.remoteslow_function 转换为一个 Ray 任务。
  • slow_function.remote(i) 提交任务并在后台执行,立即返回一个 ObjectRef
  • ray.get(results_refs) 会阻塞直到所有引用的任务完成,并返回它们的结果。
  • 可以看到,并行执行的时间显著少于顺序执行。

2. 使用 Ray Actors 进行有状态计算

import ray
import time

if ray.is_initialized():
    ray.shutdown()
ray.init()

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_value(self):
        return self.value

# 创建一个 Counter Actor 实例
counter_actor = Counter.remote()

# 调用 Actor 的方法
print(f"Initial value: {ray.get(counter_actor.get_value.remote())}")

# 异步调用 increment 方法多次
increment_results_refs = [counter_actor.increment.remote() for _ in range(5)]

# 获取每次 increment 的结果
increment_results = ray.get(increment_results_refs)
print(f"Increment results: {increment_results}")

# 获取最终的值
print(f"Final value: {ray.get(counter_actor.get_value.remote())}")

ray.shutdown()

解释:

  • @ray.remoteCounter 类转换为一个 Ray Actor。
  • Counter.remote() 在 Ray 集群中创建并启动一个 Counter 实例。
  • counter_actor.increment.remote() 调用 Actor 实例的方法,并异步执行。
  • Actor 维护其内部状态 self.value,即使是分布式调用也能正确更新。

3. 使用 Ray Data 进行数据处理

假设我们有一个大型数据集需要进行 ETL (Extract, Transform, Load) 操作。

import ray
import pandas as pd

if ray.is_initialized():
    ray.shutdown()
ray.init()

# 模拟一个大型数据集
data = [{"id": i, "value": i * 10} for i in range(1000)]
df = pd.DataFrame(data)

# 将 Pandas DataFrame 转换为 Ray Dataset
ds = ray.data.from_pandas(df)

# 对数据进行并行转换
# 例如,添加一个新列 'squared_value'
transformed_ds = ds.map(lambda row: {"id": row["id"], "value": row["value"], "squared_value": row["value"]**2})

# 聚合操作 (例如,计算平均值)
mean_value = transformed_ds.mean(on="value")
print(f"Mean of 'value': {mean_value}")

# 将结果收集到 Pandas DataFrame (通常在结果集较小时使用)
result_df = transformed_ds.to_pandas()
print("\nFirst 5 rows of transformed data:")
print(result_df.head())

ray.shutdown()

解释:

  • ray.data.from_pandas(df) 将 Pandas DataFrame 转换为 Ray Dataset。
  • ds.map(...) 会将 map 函数并行应用于数据集中的每个记录,Ray 会自动处理数据的分区和任务调度。
  • transformed_ds.mean(on="value") 对特定列进行分布式聚合。
  • to_pandas() 用于将分布式数据集收集回一个 Pandas DataFrame。

4. 使用 Ray Train 进行分布式模型训练

这个示例展示了如何使用 Ray Train 进行简单的分布式 PyTorch 模型训练。

import ray
import ray.train as train
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

if ray.is_initialized():
    ray.shutdown()
ray.init()

# 定义一个简单的神经网络
class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = nn.Linear(10, 1)

    def forward(self, x):
        return self.fc(x)

# 定义训练函数
def train_func(config):
    # Prepare model and data for distributed training
    model = SimpleModel()
    model = train.torch.prepare_model(model)

    # 模拟数据
    X = torch.randn(100, 10)
    y = torch.randn(100, 1)
    dataset = TensorDataset(X, y)
    dataloader = DataLoader(dataset, batch_size=config["batch_size"])
    dataloader = train.torch.prepare_data_loader(dataloader)

    criterion = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=config["lr"])

    for epoch in range(config["epochs"]):
        for batch_idx, (data, target) in enumerate(dataloader):
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

        # 报告训练指标
        train.report({"loss": loss.item()})

# 配置训练器
trainer = TorchTrainer(
    train_func,
    scaling_config=train.ScalingConfig(num_workers=2, use_gpu=False), # 使用2个worker,不使用GPU
    train_loop_config={"lr": 0.01, "batch_size": 16, "epochs": 5},
)

# 运行训练
result = trainer.fit()

print("\nTraining completed.")
print(f"Final training metrics: {result.metrics}")

ray.shutdown()

解释:

  • TorchTrainer 是 Ray Train 提供的用于 PyTorch 分布式训练的类。
  • train.ScalingConfig 配置了分布式训练的资源,例如 num_workers
  • train.torch.prepare_modeltrain.torch.prepare_data_loader 帮助 PyTorch 模型和数据加载器适应分布式环境。
  • train.report() 用于向 Ray 报告训练指标,这对于监控和超参数调优很有用。

总结

Ray 提供了一套强大且易于使用的 API,用于构建各种规模的分布式应用程序,特别是在 AI 和机器学习领域。通过理解 Ray Core 的任务和 Actor 概念,并利用其高级库,你可以高效地扩展你的 Python 代码,从单机扩展到大规模集群。

参考

  1. https://docs.ray.io/en/latest/ray-core/starting-ray.html
  2. https://docs.ray.io/en/latest/ray-overview/installation.html
  3. https://docs.ray.io/en/latest/cluster/kubernetes/index.html#kuberay-index
本文总阅读量 次 本站总访问量 次 本站总访客数
Home Archives Categories Tags Statistics