BigQuant使用文档

并行处理 深度学习等复杂计算任务

由bq1fuwkt创建,最终由bq1fuwkt 被浏览 3 用户

问题描述

我有大量数据需要处理(如批量计算因子、训练多个模型、参数调优等),单机执行太慢,如何使用 BigQuant SDK 进行分布式并行计算,加速处理过程?

详细解答

BigQuant SDK 提供了 fai 模块(FAI = Fast AI Computing),可以创建多节点集群,并行执行计算密集型任务。

1. 查看现有集群

首先查看是否已有可用的计算集群:

  import bigquant

  # 查看所有集群
  clusters = bigquant.fai.list_clusters()

  print(f"现有集群数量: {len(clusters)}")
  for cluster in clusters:
      print(f"集群ID: {cluster['id']}")
      print(f"集群名称: {cluster.get('fullname', 'N/A')}")
      print(f"状态: {cluster.get('status', 'N/A')}")
      print(f"Worker数量: {cluster.get('num_workers', 0)}")
      print(f"可用Worker: {cluster.get('available_workers', 0)}")
      print("-" * 50)
  • 关键说明
    • 集群状态包括:Running(运行中)、Stopped(已停止)、Pending(启动中)、Stopping(停止中)
    • num_workers 是集群总 Worker 数量
    • available_workers 是当前可用的 Worker 数量

2. 创建计算集群

如果没有集群或需要新建,可以创建指定配置的集群:

  # 创建集群
  cluster = bigquant.fai.create_cluster(
      cluster_name="my_compute_cluster",  # 集群名称
      num_workers=2,                       # Worker 数量
      worker_cpus=4,                       # 每个 Worker 的 CPU 核数
      worker_memory="8G",                  # 每个 Worker 的内存
      worker_gpus=0,                       # 每个 Worker 的 GPU 数量(0表示不使用GPU)
      max_no_fai_run_time=300              # 空闲超时时间(秒)
  )

  print(f"✓ 集群已创建")
  print(f"  集群ID: {cluster.cluster_info['id']}")
  print(f"  集群名称: {cluster.cluster_info['fullname']}")
  print(f"  Worker数量: {cluster.cluster_info['num_workers']}")
  • 关键说明
    • num_workers: Worker 节点数量,决定并行度
    • worker_cpus: 每个 Worker 的 CPU 核数
    • worker_memory: 支持字符串格式(如 "8G", "512M")或整数(字节数)
    • worker_gpus: GPU 数量,用于深度学习任务
    • max_no_fai_run_time: 空闲自动关闭时间(秒)

3. 启动集群并初始化连接

创建后需要启动集群并等待就绪:

  # 等待集群启动完成
  cluster.wait_cluster("Running")
  print("✓ 集群已启动")

  # 初始化 Ray 连接
  cluster.init()
  print("✓ Ray 连接已建立")
  • 关键说明
    • wait_cluster("Running") 会阻塞等待集群启动(默认超时120秒)
    • init() 会自动处理集群状态,如果是 Stopped 会先启动
    • init() 会关闭现有 Ray 连接并建立新连接

4. 定义远程执行函数

使用 @fai.remote 装饰器定义可并行执行的函数:

  # 定义远程函数
  @bigquant.fai.remote
  def compute_factor(stock_code, start_date, end_date):
      """在远程 Worker 上计算单只股票的因子"""
      import pandas as pd
      import numpy as np
      import time

      # 模拟数据查询
      time.sleep(0.5)  # 模拟查询延迟

      # 模拟因子计算
      np.random.seed(hash(stock_code) % 2**32)
      data = pd.DataFrame({
          'date': pd.date_range(start_date, end_date, freq='D'),
          'close': 10 + np.random.randn(10).cumsum()
      })

      # 计算技术指标
      data['ma5'] = data['close'].rolling(5).mean()
      data['ma20'] = data['close'].rolling(20).mean()
      data['volatility'] = data['close'].rolling(10).std()

      result = {
          'stock_code': stock_code,
          'data_rows': len(data),
          'mean_close': data['close'].mean(),
          'max_volatility': data['volatility'].max()
      }

      return result

  print("✓ 远程函数已定义")
  • 关键说明
    • @fai.remote 装饰器标记函数为远程执行
    • 函数会被序列化并发送到 Worker 节点执行
    • 函数内的 import 语句必须在函数内部,不能使用外部导入
    • 返回值会被序列化传回主节点

5. 提交并行任务

批量提交任务到集群执行:

  # 准备任务列表
  stock_codes = [f"00000{i}.SZ" for i in range(1, 11)]  # 10只股票
  start_date = "2024-01-01"
  end_date = "2024-01-10"

  # 提交并行任务(不会阻塞)
  print(f"提交 {len(stock_codes)} 个任务...")
  task_refs = [
      compute_factor.remote(code, start_date, end_date)
      for code in stock_codes
  ]

  print(f"✓ 已提交 {len(task_refs)} 个任务")
  print("  任务正在并行执行...")
  • 关键说明
    • .remote() 返回的是 ObjectRef(引用),不是实际结果
    • 任务立即提交,不会阻塞主程序
    • 多个任务会自动分配到不同 Worker 并行执行

6. 获取执行结果

等待任务完成并获取结果:

  # 获取所有结果(会阻塞等待)
  print("等待任务完成...")
  results = bigquant.fai.get(task_refs)

  print(f"\n✓ 所有任务已完成")
  print(f"  共处理 {len(results)} 只股票")

  # 查看结果
  for result in results:
      print(f"  {result['stock_code']}: {result['data_rows']} 行数据, "
            f"均价={result['mean_close']:.2f}, "
            f"最大波动={result['max_volatility']:.2f}")
  • 关键说明
    • fai.get() 会阻塞等待所有任务完成
    • 也可以传入单个 ObjectRef 获取单个结果
    • 结果顺序与提交顺序一致

7. 高级用法 - 使用 wait() 控制并发

对于大量任务,可以使用 wait() 控制并发度:

  # 批量提交1000个任务
  all_tasks = [compute_factor.remote(f"{i:06d}.SZ", "2024-01-01", "2024-01-10")
               for i in range(1, 1001)]

  # 每次等待100个任务完成
  batch_size = 100
  all_results = []

  print(f"处理 {len(all_tasks)} 个任务...")
  remaining = all_tasks

  while remaining:
      # 等待至少 batch_size 个任务完成(或全部完成)
      num_returns = min(batch_size, len(remaining))
      ready, remaining = bigquant.fai.wait(remaining, num_returns=num_returns)

      # 获取已完成任务的结果
      batch_results = bigquant.fai.get(ready)
      all_results.extend(batch_results)

      print(f"  已完成: {len(all_results)}/{len(all_tasks)}")

  print(f"✓ 全部完成,共 {len(all_results)} 个结果")
  • 关键说明
    • fai.wait() 返回 (ready, not_ready) 元组
    • num_returns 指定等待多少个任务完成
    • 适用于超大批量任务,避免内存占用过高

8. 使用共享数据 - put() 和 get()

对于大对象,可以放入对象存储,避免重复传输:

  import pandas as pd

  # 准备大对象(如配置、参数等)
  large_config = pd.DataFrame({
      'param': ['alpha', 'beta', 'gamma'],
      'value': [0.1, 0.2, 0.3]
  })

  # 放入对象存储
  config_ref = bigquant.fai.put(large_config)
  print("✓ 配置已放入对象存储")

  # 定义使用共享数据的函数
  @bigquant.fai.remote
  def process_with_config(stock_code, config_ref):
      """使用共享配置处理数据"""
      import ray

      # 从对象存储获取配置(只传输引用,不重复传输数据)
      config = ray.get(config_ref)

      # 使用配置进行计算
      alpha = config[config['param'] == 'alpha']['value'].values[0]

      result = {
          'stock_code': stock_code,
          'alpha': alpha,
          'computed_value': alpha * 100
      }

      return result

  # 提交任务(传递引用而非数据)
  tasks = [process_with_config.remote(f"{i:06d}.SZ", config_ref) for i in range(1, 11)]
  results = bigquant.fai.get(tasks)

  print(f"✓ 处理完成 {len(results)} 个任务")
  • 关键说明
    • fai.put() 将对象放入 Ray 对象存储
    • 返回的引用可以在多个任务间共享,避免重复传输
    • Worker 通过 ray.get() 获取实际数据

9. 指定资源需求

为任务指定 CPU、内存、GPU 需求:

  # 定义需要更多资源的函数
  @bigquant.fai.remote(num_cpus=2, memory="2G")
  def heavy_computation(data_size):
      """需要 2 个 CPU 和 2G 内存的任务"""
      import numpy as np

      # 大规模计算
      data = np.random.randn(data_size, data_size)
      result = np.linalg.svd(data)

      return {"size": data_size, "computed": True}

  # GPU 任务示例(需要 GPU 集群)
  @bigquant.fai.remote(num_gpus=1, memory="4G")
  def train_model(model_id):
      """需要 1 个 GPU 和 4G 内存的训练任务"""
      # 模拟模型训练
      import time
      time.sleep(2)

      return {"model_id": model_id, "accuracy": 0.95}
  • 关键说明
    • num_cpus: 需要的 CPU 核数
    • memory: 需要的内存(字符串格式,如 "2G", "512M")
    • num_gpus: 需要的 GPU 数量
    • Ray 会根据资源需求调度任务到合适的 Worker

10. 关闭集群

使用完毕后关闭连接并删除集群:

  # 关闭 Ray 连接
  cluster.shutdown()
  print("✓ Ray 连接已关闭")

  # 删除集群(释放云端资源)
  result = cluster.delete_cluster()
  print("✓ 集群已删除")
  • 关键说明
    • shutdown() 只关闭本地连接,集群仍在运行
    • delete_cluster() 会删除集群,释放所有资源
    • 建议使用完毕后及时删除,避免资源浪费

深度学习任务示例

使用 GPU 集群进行模型训练:

  import bigquant

  # 创建 GPU 集群
  cluster = bigquant.fai.create_cluster(
      cluster_name="dl_training",
      num_workers=2,
      worker_cpus=8,
      worker_memory="16G",
      worker_gpus=1  # 每个 Worker 1 个 GPU
  )

  cluster.wait_cluster("Running")
  cluster.init()

  # 定义训练函数
  @bigquant.fai.remote(num_gpus=1, memory="8G")
  def train_model(model_config):
      """训练深度学习模型"""
      import torch
      import time

      # 检查 GPU 可用性
      device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
      print(f"使用设备: {device}")

      # 模拟模型训练
      model_name = model_config['name']
      epochs = model_config['epochs']

      # 训练循环
      for epoch in range(epochs):
          time.sleep(0.5)  # 模拟训练时间

      result = {
          'model_name': model_name,
          'epochs': epochs,
          'final_loss': 0.05,
          'accuracy': 0.95,
          'device': str(device)
      }

      return result

  # 准备多个模型配置
  model_configs = [
      {'name': 'LSTM_model_1', 'epochs': 10},
      {'name': 'LSTM_model_2', 'epochs': 15},
      {'name': 'Transformer_model', 'epochs': 20},
      {'name': 'CNN_model', 'epochs': 12}
  ]

  # 并行训练
  print(f"开始训练 {len(model_configs)} 个模型...")
  tasks = [train_model.remote(config) for config in model_configs]
  results = bigquant.fai.get(tasks)

  print("\n训练结果:")
  for result in results:
      print(f"  {result['model_name']}: "
            f"Accuracy={result['accuracy']:.2%}, "
            f"Loss={result['final_loss']:.4f}, "
            f"Device={result['device']}")

  # 清理
  cluster.shutdown()
  cluster.delete_cluster()

关键概念解释

FAI (Fast AI Computing)

BigQuant 提供的分布式计算服务,基于 Ray 框架,支持 CPU 和 GPU 集群。

Cluster(集群)

一组计算节点的集合,包含:

  • Head 节点: 调度和管理节点
  • Worker 节点: 实际执行计算的节点

@fai.remote 装饰器

标记函数为远程执行函数,被装饰的函数会:

  • 被序列化并发送到 Worker
  • 在 Worker 上执行
  • 返回结果的引用(ObjectRef)

ObjectRef

Ray 对象引用,指向存储在 Ray 对象存储中的数据。通过 fai.get() 获取实际数据。

资源需求

  • num_cpus: CPU 核数需求
  • memory: 内存需求(如 "2G", "512M")
  • num_gpus: GPU 数量需求

对象存储

Ray 的共享内存系统,用于:

  • 存储任务参数和返回值
  • 在节点间共享大对象
  • 避免重复序列化/反序列化

适用场景

  • 批量因子计算

并行计算数千只股票的技术指标、基本面因子等。

  • 参数调优

并行测试不同参数组合,快速找到最优参数。

  • 模型训练

使用 GPU 集群并行训练多个模型或进行超参数搜索。

  • 回测优化

并行执行多个策略的回测,加速策略开发。

  • 数据处理

并行处理大规模数据清洗、特征工程等任务。

注意事项

  • 函数独立性: 远程函数必须包含所有依赖,不能使用外部变量
  • Import 位置: 所有 import 必须在函数内部
  • 数据大小: 避免传递超大对象,使用 fai.put() 共享
  • 资源管理: 及时删除集群,避免资源浪费
  • 错误处理: 任务失败会抛出异常,需要捕获处理
  • 版本一致: Worker 和主节点的 Python 环境应保持一致

最佳实践

  • 合理设置 Worker 数量: 根据任务数量和单任务耗时选择
  • 批量提交: 一次提交多个任务,减少调度开销
  • 使用 wait(): 大批量任务使用 wait() 控制内存
  • 复用集群: 多次计算复用同一集群,避免重复启动

\

{link}