本地生成因子 并提供给线上策略
由bq1fuwkt创建,最终由bq1fuwkt 被浏览 4 用户
问题描述
我想在本地计算自定义因子(如技术指标、基本面因子等),然后上传到 BigQuant 平台,供线上模拟交易策略使用,应该如何实现?
详细解答
BigQuant SDK 提供了 DataSource 类,可以将本地计算的因子数据上传为数据源,然后在线上策略中通过 SQL 查询使用。
方法一:本地计算因子并上传(推荐)
- 查询基础数据
首先从平台获取用于计算因子的基础数据,例如 从平安银行、万科A、浦发银行获取 2024 年的历史数据
import bigquant
import pandas as pd
import numpy as np
# 查询股票历史数据
result = bigquant.dai.query("""
SELECT date, instrument, open, high, low, close, volume
FROM cn_stock_bar1d
WHERE date >= '2024-01-01' AND date <= '2024-12-31'
AND instrument IN ('000001.SZ', '000002.SZ', '600000.SH')
ORDER BY instrument, date
""")
df = result.df()
print(f"查询到 {len(df)} 条数据")
print(df.head())
- 关键说明
- 建议按 instrument, date 排序,方便后续按股票分组计算
- 本地计算因子
在本地计算各种技术指标或自定义因子
# 按股票分组计算移动平均线
def calculate_factors(group):
"""计算技术指标因子"""
# 5日均线
group['ma5'] = group['close'].rolling(window=5).mean()
# 20日均线
group['ma20'] = group['close'].rolling(window=20).mean()
# 5日涨跌幅
group['return_5d'] = group['close'].pct_change(periods=5)
# 波动率(20日标准差)
group['volatility_20d'] = group['close'].rolling(window=20).std()
# 相对强弱指标 RSI (简化版)
delta = group['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
group['rsi_14'] = 100 - (100 / (1 + rs))
return group
# 按股票分组计算
df_factors = df.groupby('instrument', group_keys=False).apply(calculate_factors)
# 删除因滚动窗口产生的 NaN 值
df_factors = df_factors.dropna()
print(f"\n计算得到 {len(df_factors)} 条因子数据")
print(df_factors.head())
- 关键说明
- 使用 groupby().apply() 对每只股票单独计算指标
- 移动平均、标准差等需要足够的历史数据,会产生 NaN
- 可以根据需求计算任意自定义因子
- 上传因子数据到平台
将计算好的因子数据上传为 DataSource:
# 选择需要上传的列
factor_cols = ['date', 'instrument', 'ma5', 'ma20', 'return_5d', 'volatility_20d', 'rsi_14']
df_upload = df_factors[factor_cols].copy()
# 写入数据源
ds = bigquant.dai.DataSource.write_bdb(
data=df_upload,
id="my_custom_factors_2024", # 数据源 ID(不指定则创建临时数据源)
partitioning=["date"], # 按日期分区(提升查询性能)
indexes=["instrument"], # 为 instrument 创建索引
unique_together=["date", "instrument"], # 复合唯一约束
on_duplicates="last", # 重复数据保留最新的
sort_by=[("date", "ascending"), ("instrument", "ascending")], # 排序
docs={"description": "自定义技术指标因子", "version": "v1.0"}, # 文档说明
overwrite=True # 如果已存在则覆盖
)
print(f"\n✓ 因子数据已上传")
print(f" 数据源 ID: {ds.id}")
print(f" 数据行数: {len(df_upload)}")
- 关键说明
- id: 指定数据源 ID,便于在策略中引用;如果不指定则创建临时数据源
- partitioning: 分区字段,通常使用日期分区以提升查询性能
- indexes: 索引字段,用于加速查询
- unique_together: 唯一约束,防止重复数据
- on_duplicates: 处理重复数据的策略("last" 保留最新,"first" 保留最旧,"error" 报错)
- overwrite=True: 如果数据源已存在,则覆盖旧数据
方法二:在查询中计算因子并上传
如果因子计算逻辑简单,可以在 SQL 查询时直接计算:
# 使用 SQL 窗口函数计算移动平均
result = bigquant.dai.query("""
SELECT
date,
instrument,
close,
AVG(close) OVER (
PARTITION BY instrument
ORDER BY date
ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
) as ma5,
AVG(close) OVER (
PARTITION BY instrument
ORDER BY date
ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
) as ma20
FROM cn_stock_bar1d
WHERE date >= '2024-01-01' AND date <= '2024-12-31'
AND instrument IN ('000001.SZ', '000002.SZ', '600000.SH')
ORDER BY instrument, date
""")
df_factors = result.df()
# 上传计算结果
ds = bigquant.dai.DataSource.write_bdb(
data=df_factors[['date', 'instrument', 'ma5', 'ma20']],
id="my_sql_factors_2024",
partitioning=["date"],
indexes=["instrument"],
unique_together=["date", "instrument"],
overwrite=True
)
print(f"✓ 因子数据已上传: {ds.id}")
- 关键说明
- 使用 SQL 窗口函数 AVG() OVER() 计算移动平均
- 适用于简单的聚合计算,复杂逻辑建议在本地计算
方法三:使用 UDF 在查询中计算因子
对于复杂的因子计算逻辑,可以定义 UDF(用户自定义函数):
from bigquant.dai import DaiUDF
# 定义因子计算函数
def calculate_rsi(prices):
"""计算 RSI 指标"""
# 注意:这是简化示例,实际 UDF 需要更复杂的实现
return prices * 1.5 # 示例计算
# 在查询中使用 UDF
result = bigquant.dai.query(
sql="""
SELECT
date,
instrument,
close,
calculate_rsi(close) as custom_rsi
FROM cn_stock_bar1d
WHERE date >= '2024-12-01' AND date <= '2024-12-31'
AND instrument = '000001.SZ'
""",
udf_list=[
DaiUDF(
name="calculate_rsi",
function=calculate_rsi,
return_type="DOUBLE"
)
]
)
df = result.df()
print(df.head())
- 关键说明
- UDF 在云端执行,可以在 SQL 中直接调用
- 适用于需要在查询时动态计算的场景
- UDF 函数会被序列化并上传到云端执行
关键概念解释
DataSource
表示 BigQuant 平台上的一个数据源。可以是平台内置的数据(如 cn_stock_bar1d),也可以是用户上传的自定义数据。
DataSource.write_bdb()
将本地数据上传到平台,创建或更新数据源。关键参数:
- data: pandas DataFrame 或 PyArrow Table
- id: 数据源 ID(不指定则创建临时数据源)
- partitioning: 分区字段(通常用日期)
- indexes: 索引字段(用于加速查询)
- unique_together: 复合唯一约束
- on_duplicates: 重复数据处理策略
- overwrite: 是否覆盖已有数据
DataSource.read_bdb()
从平台读取数据源。关键参数:
- as_type: 返回类型(pd.DataFrame 或 pa.Table)
- partition_filter: 分区过滤(元组表示范围,集合表示精确值)
- columns: 指定读取的列
分区 (Partitioning)
将数据按某个字段(通常是日期)划分存储,可以大幅提升查询性能。查询时只读取相关分区的数据。
索引 (Indexes)
为某个字段创建索引,加速基于该字段的查询和关联操作。
唯一约束 (Unique Together)
确保指定字段的组合在数据中是唯一的,避免重复数据。
临时数据源 vs 持久数据源
- 临时数据源: id=None,系统自动分配 ID(如 cache_xxx),一段时间后自动清理
- 持久数据源: 指定 id,永久保存,可在策略中重复使用
数据更新策略
增量更新(推荐)
# 只计算和上传新增的数据
latest_date = "2024-12-31" # 数据源中最新日期
# 查询新数据
result = bigquant.dai.query(f"""
SELECT date, instrument, close, volume
FROM cn_stock_bar1d
WHERE date > '{latest_date}'
ORDER BY instrument, date
""")
df_new = result.df()
# 计算因子
df_new_factors = df_new.groupby('instrument', group_keys=False).apply(calculate_factors)
df_new_factors = df_new_factors.dropna()
# 追加到数据源(on_duplicates="last" 确保更新覆盖)
ds = bigquant.dai.DataSource.write_bdb(
data=df_new_factors,
id="my_custom_factors_2024",
partitioning=["date"],
indexes=["instrument"],
unique_together=["date", "instrument"],
on_duplicates="last", # 如果日期+股票重复,保留新数据
overwrite=False # 不覆盖整个数据源,只追加/更新
)
print(f"✓ 增量更新完成,新增 {len(df_new_factors)} 条数据")
全量覆盖
# 重新计算所有数据并覆盖
ds = bigquant.dai.DataSource.write_bdb(
data=df_all_factors,
id="my_custom_factors_2024",
partitioning=["date"],
indexes=["instrument"],
unique_together=["date", "instrument"],
overwrite=True # 完全覆盖旧数据
)
print(f"✓ 全量更新完成")
性能优化建议
- 使用分区: 按日期分区可以大幅提升查询性能
- 创建索引: 为常用查询字段(如 instrument)创建索引
- 减少列数: 只上传策略中需要使用的因子列
- 数据类型优化: 使用合适的数据类型,避免使用 object 类型
- 批量上传: 一次上传多个因子,减少网络往返
注意事项
- 数据源 ID 命名: 使用有意义的 ID,便于在策略中引用(如 my_factors_2024 而非 temp_123)
- 权限管理: 上传的数据源默认只有自己可以访问
\