模型本地化训练指南

由hxgre创建,最终由hxgre 被浏览 16 用户

背景

端到端(End-to-End, E2E)模型直接以分钟级行情和盘口快照为输入,自行学习特征表达与预测目标,相比传统因子方式对算力的要求高出很多。鉴于 BigQuant 云端 GPU 资源有限,端到端模型在云端完整训练成本高、排队久,主办方决定开放一条本地化训练通道:把端到端模型所需的训练集数据通过 BigQuant 特有的 SDK 下载到本地,在本地完成模型训练,再把训练好的模型与训练代码一并提交到云端进行预测评测。

采用这一方案需要特别注意以下四点:

  1. 数据做了压缩存储。由于分钟数据量较大,本地数据表把绝大多数价格/金额字段 ×100 后转换为整数类型存储(单位由"元"变为"分"),instrument(字符串股票代码)被映射为整数 instrument_id,盘口数据从原始 5 档裁剪为 3 档。读取本地数据时务必按约定还原。
  2. SDK 下载有 quota 限制。BigQuant SDK 下载数据存在配额上限,建议一次性把数据下载并保存到本地(如 parquet / bdb),后续所有处理都基于本地文件,不要每次都通过 SDK 重新拉取
  3. 训练代码必须随模型一起提交。本地训练出的模型,需要把训练代码一并提交,便于复现与审核。
  4. 本地与云端数据格式不一致,需在特征构建时对齐。代码提交到云端后,预测阶段使用的是云端未经压缩处理的原始数据。这意味着本地训练时的特征构建逻辑(针对"分"为单位的 int 数据、3 档盘口、instrument_id)与云端预测时(针对"元"为单位的 float 数据、5 档盘口、字符串 instrument)面对的输入并不相同。务必在特征构建层做好两边的一致性处理,否则会出现"本地训练分数高、云端预测对不上"的问题。

BigQuant SDK 官方文档:https://bigquant.com/wiki/doc/vac4qwmQr4


BigQuant SDK 使用说明

1. 安装

SDK 要求 Python 3.11。仅下载数据可使用精简安装:

# 精简安装(仅数据相关功能)
pip install bigquant -i https://pypi.bigquant.com/simple/

# 完整安装(含建模等全部功能)
pip install 'bigquant[all]' -i https://pypi.bigquant.com/simple/

# 验证安装
bq --version

2. 认证

在本地用 BigQuant 账号的 AK/SK 进行认证(AK/SK 可在 BigQuant 个人中心获取):

bq auth --apikey <AK.SK>
# 或交互式配置
bq auth configure

使用 AK/SK 认证可启用 Arrow Flight 高速传输通道,下载大表更快;Token 认证不支持该通道。

3. 核心数据接口(dai 模块)

接口 用途
dai.query(sql, filters=, full_db_scan=) 对云端表执行 SQL 查询
dai.search_datasources(keyword) 搜索可用数据表
dai.get_datasource_schema(table) 查看表结构 / 字段
dai.DataSource(id).read_bdb(...) 读取已落地的 DataSource
dai.DataSource.write_bdb(df, id=, ...) 把自定义数据写入本地 bdb

4. 查询并下载数据

务必通过 filters 传入日期区间,避免全表扫描(官方文档明确指出此举可大幅提升查询性能)。如确需全表扫描,再显式传 full_db_scan=True

from bigquant import dai

sql = """
SELECT date, instrument_id, open, high, low, close, volume, amount
FROM bigalpha_2026_e2e_bar5m
"""
result = dai.query(
    sql,
    filters={"date": ["2024-01-01", "2024-12-31"]},
)

df = result.df()        # -> pandas DataFrame
# 其他输出格式:
# result.arrow()        # -> pyarrow.Table
# result.pl()           # -> polars DataFrame
# result.fetchall()     # -> list[dict]

数据量大时用流式读取,避免一次性占满内存:

reader = result.fetch_arrow_reader(batch_size=10000)
for batch in reader:
    process(batch)

5. 落地到本地(规避 quota)⚠️

下载存在配额限制,强烈建议下载一次后落盘,后续处理只读本地文件:

import os
from bigquant import dai

LOCAL_DIR = "./local_data"
os.makedirs(LOCAL_DIR, exist_ok=True)

def download_once(table: str, start: str, end: str) -> str:
    """按表名下载一次并保存为 parquet;已存在则直接复用,不再调用 SDK。"""
    path = os.path.join(LOCAL_DIR, f"{table}_{start}_{end}.parquet")
    if os.path.exists(path):
        print(f"复用本地文件: {path}")
        return path

    df = dai.query(
        f"SELECT * FROM {table}",
        filters={"date": [start, end]},
    ).df()
    df.to_parquet(path, index=False)
    print(f"已下载并保存: {path}, 行数: {len(df)}")
    return path

# 训练 / 调试阶段统一从本地读取
import pandas as pd
path = download_once("bigalpha_2026_e2e_bar5m", "2024-01-01", "2024-12-31")
df = pd.read_parquet(path)

数据表对比说明(云端 vs 本地)

表清单对应关系

端到端模型的本地表与云端表一一对应,仅频率不同:

频率 本地表(训练用,已压缩) 远端表(云端预测用,原始)
1 分钟 bigalpha_2026_e2e_bar1m bigalpha_2026_stock_bar1m
5 分钟 bigalpha_2026_e2e_bar5m bigalpha_2026_stock_bar5m
15 分钟 bigalpha_2026_e2e_bar15m bigalpha_2026_stock_bar15m
30 分钟 bigalpha_2026_e2e_bar30m bigalpha_2026_stock_bar30m

关键差异

维度 本地表(e2e 远端表(stock 读取/对齐注意
股票代码 instrument_idint16 instrument(字符串,如 600000.SH 本地需用 ID↔代码映射表还原
价格字段(OHLC、盘口价) int32,单位"" = 元 × 100 float32,单位"" 本地读取后需 / 100 还原为元
成交金额 amount int64,单位"分" = 元 × 100 float32,单位"元" 本地读取后需 / 100 还原
盘口档位 3 档*1 ~ *3 5 档*1 ~ *5 云端预测时只取前 3 档,或特征只依赖 3 档
OHLC 缺失值 -1("分"下不可能为负,可区分真实价) NaN 本地需把 -1 视为缺失
盘口价缺失值 0 0 一致
委托笔数 *_num_orders* int16 int32 本地单档通常 < 1000,注意上限 32767
date 含义 K 分钟 bar 的结束时刻(收盘时刻)。如 5 分钟早盘首 bar 标 09:35,早盘收盘 bar 标 11:30,尾盘收盘 bar 标 15:00 同上 一致

为什么这样压缩

  • 价格/金额 ×100 转 int:整数的 delta 压缩远优于 float,且无浮点误差;amountint64 是为了避免 float32 累计到亿元量级后精度退化(float32 仅约 7 位有效数字)。
  • instrument 映射为 int16:字符串列存储与比较成本高,整数 ID 更省空间、更快。
  • 盘口裁剪到 3 档:在体积与信息量之间取平衡。

数据一致性处理建议

把"原始字段 → 模型输入特征"的逻辑收敛到同一个特征构建函数,并在函数入口先把两边的数据归一到同一表示,再做后续特征:

import numpy as np
import pandas as pd

def to_canonical(df: pd.DataFrame, *, is_local: bool, id_to_code: dict | None = None) -> pd.DataFrame:
    """把本地表 / 云端表统一成同一份 canonical 表示,供特征构建复用。"""
    df = df.copy()
    price_cols = [
        "open", "high", "low", "close", "amount",
        "ask_price1", "ask_price2", "ask_price3",
        "bid_price1", "bid_price2", "bid_price3",
    ]
    if is_local:
        # 1) -1 视为 OHLC 缺失
        for c in ["open", "high", "low", "close"]:
            df.loc[df[c] == -1, c] = np.nan
        # 2) "分" -> "元"
        for c in price_cols:
            if c in df.columns:
                df[c] = df[c].astype("float64") / 100.0
        # 3) instrument_id -> instrument 字符串
        if id_to_code is not None:
            df["instrument"] = df["instrument_id"].map(id_to_code)
    else:
        # 云端为原始数据:只取前 3 档盘口,丢弃 4/5 档,使两边列对齐
        drop_cols = [c for c in df.columns
                     if any(c.startswith(p) and c[-1] in "45"
                            for p in ("ask_price", "bid_price",
                                      "ask_volume", "bid_volume",
                                      "ask_num_orders", "bid_num_orders"))]
        df = df.drop(columns=drop_cols, errors="ignore")
    return df

\
def build_features(df_canonical: pd.DataFrame) -> pd.DataFrame:
    """两边都先转成 canonical,再走同一套特征逻辑,避免训练/预测口径不一致。"""
    ...

模版 demo

待补充。

后续将提供本地训练 → 模型保存 → 云端 main() 加载模型并预测的完整可运行示例, 涵盖:数据下载落盘、特征构建(本地/云端对齐)、模型训练与保存、提交目录结构(模型权重 + 训练代码)等。

\

{link}