《使用CoWork,进行端到端因子投研》
由yangduoduo05创建,最终由small_q 被浏览 84 用户
直播回放:https://bigquant.com/college/3bced27c-db84-4c6a-83b9-7f1fe35d7180
AI对我们生活的改变已不言而喻。
理论上来说,任何人能做的事其实AI都可以做。早晨有私享会会员在群里问我,如果AI可以量化投研,那未来私募做什么。
其实类似的问题在各行各业都被频频提及,如果未来美团的自动外卖车可以送到家门口,那外卖小哥做什么?如果未来自动驾驶可以做到极致,那还需不需要考驾照?
其实很多问题没有答案,就好像我们去问一个古人你不需要种地就有饭吃的话你会做什么一样,他只会不理解,茫然。
今天同样的,我不知道未来AI替代后他们该去做什么,我只知道AI替代传统工作是一个必然的趋势。
因子投研对普通人来说或许是一件很神秘的事情,对有经验的量化人来说又变成了每日繁琐的工作,业内不断有声音在讨论
AI能做量化投研么?
对于这一点,我们很期待,也一直在努力,直到今天,我似乎看到了未来的影子。
今天分享的是通过AI完全从0进行高频因子的研究及魔改,虽然不是完全从0代码开始端到端的交付,但是相信这些baseline对于大多数人来说都是非常简单的。
开始之前,这边准备了2个东西。
1.方正因子《滴水穿石》pdf文件 ,https://mp.weixin.qq.com/s/NieCDvUmNcy0SJCbqN4dnA
2.bigquant,sql复现的baseline
https://bigquant.com/codesharev3/77fe275b-0b7a-49fd-ae84-f22b904f74a0
大家如需获取,可通过链接获取
\
1.首先告诉CoWork我要做什么
提示词:“这是篇高频因子低频化的研报,通过傅立叶变换寻找主力的痕迹。 我复现的代码在这里:/home/aiuser/work/AIQuANT/私享会/DEMO/因子更新/因子2历史计算 copy.ipynb 。帮我看一下这个研报有什么魔改的地方,可以从什么其他角度改进,帮我用我的例子(sql)去完成因子魔改。“
在这过程中,CoWork提出了魔改思路,并给出了如下输出。
📊 研报核心思路总结
"滴水穿石"因子通过傅立叶变换识别日内成交量的周期性节奏:
- 核心逻辑: 机构分批买入形成2-5分钟周期性脉冲 → 预示持续吸筹 → 未来表现更好
- 计算步骤: IQR去脉冲 → 去均值 → Hann窗 → FFT → 提取2-5分钟频带能量占比
🔧 魔改方向建议(8个角度)
1. 多周期频带组合
原研报局限: 只用了2-5分钟单一频带\n改进方向:
- 同时计算多个频带: 超短期(1-2分钟)、短期(2-5分钟)、中期(5-10分钟)、长期(10-20分钟)
- 构建频带能量向量,用PCA降维或加权合成
def volume_fft_multi_band(x_array, bands=[(1,2), (2,5), (5,10), (10,20)]):
"""计算多个频带的能量占比"""
# ... FFT计算同前 ...
band_powers = []
for low, high in bands:
freq_low = 1.0 / high
freq_high = 1.0 / low
mask = (freq >= freq_low) & (freq <= freq_high) & (freq != 0)
band_powers.append(np.sum(P[mask]) / total_power)
# 方案1: 加权组合
weights = [0.1, 0.5, 0.3, 0.1] # 根据回测优化权重
return np.dot(band_powers, weights)
# 方案2: 返回数组(需改return_type)
return band_powers
\
2. 频谱形态特征
原研报局限: 只用能量占比,丢失频谱形状信息\n改进方向:
- 计算频谱的峰度(尖锐程度)、偏度(对称性)
- 识别主导频率的稳定性(是否集中在某一周期)
def volume_fft_shape(x_array):
# ... 前置处理同前 ...
# 特征1: 频谱峰度(越大说明能量越集中)
kurtosis = np.sum((P - P.mean())**4) / (np.sum((P - P.mean())**2)**2)
# 特征2: 主频稳定性(最大峰值/次大峰值)
sorted_P = np.sort(P)[::-1]
peak_stability = sorted_P[0] / (sorted_P[1] + 1e-8)
# 特征3: 目标频带内峰值个数
band_mask = (freq >= 1/5) & (freq <= 1/2)
band_P = P[band_mask]
threshold = band_P.mean() + 2*band_P.std()
num_peaks = np.sum(band_P > threshold)
return (kurtosis, peak_stability, num_peaks)
\
3. 时变频谱分析
原研报局限: 单日静态频谱,无法刻画演化趋势\n改进方向:
-
用短时傅立叶变换(STFT)小波变换,观察盘中周期性如何变化
-
计算滚动窗口(如30/60/120分钟)内的频谱,构建趋势因子
-- SQL层面:滚动窗口计算 WITH rolling_fft AS ( SELECT date, instrument, -- 计算上午/下午分段频谱 volume_fft_ratio( array_agg(volume) FILTER (WHERE date::TIME BETWEEN '09:31' AND '11:30'), (2,5) ) AS morning_band_power, volume_fft_ratio( array_agg(volume) FILTER (WHERE date::TIME BETWEEN '13:01' AND '14:57'), (2,5) ) AS afternoon_band_power FROM ... ) SELECT *, -- 特征:上下午周期性差异 afternoon_band_power - morning_band_power AS periodicity_shift FROM rolling_fft
4. 去脉冲方法优化
原研报局限: 简单IQR裁剪可能过度平滑\n改进方向:
-
用**中位数绝对偏差(MAD)**代替IQR(对极端值更稳健)
-
用自适应阈值(根据波动率动态调整裁剪范围)
def advanced_despike(x_array): x = np.array(x_array) # 方案1: MAD去脉冲 median = np.median(x) mad = np.median(np.abs(x - median)) lower = median - 3 * 1.4826 * mad # 1.4826使MAD等价于标准差 upper = median + 3 * 1.4826 * mad # 方案2: 滚动标准差自适应 rolling_std = pd.Series(x).rolling(window=30, min_periods=1).std() lower = x.mean() - 3 * rolling_std upper = x.mean() + 3 * rolling_std return np.clip(x, lower, upper)
5. 买卖方向分离
原研报局限: 成交量无方向,无法区分买入/卖出\n改进方向:
- 用主动买入量和主动卖出量分别计算频谱
- 对比买卖双方的周期性差异(买方周期性强→机构吸筹)
WITH directional_volume AS (
SELECT
date,
instrument,
CASE
WHEN close > open THEN volume * (close - open) / (high - low + 0.001)
ELSE 0
END AS buy_volume,
CASE
WHEN close < open THEN volume * (open - close) / (high - low + 0.001)
ELSE 0
END AS sell_volume
FROM cn_stock_bar1m_c
)
SELECT
date::DATE,
instrument,
volume_fft_ratio(array_agg(buy_volume), (2,5)) AS buy_periodicity,
volume_fft_ratio(array_agg(sell_volume), (2,5)) AS sell_periodicity,
-- 核心因子:买方周期性占优势
volume_fft_ratio(array_agg(buy_volume), (2,5)) -
volume_fft_ratio(array_agg(sell_volume), (2,5)) AS net_buy_periodicity
FROM directional_volume
GROUP BY date::DATE, instrument
6. 与价格形态结合
原研报局限: 纯量因子,未结合价格信息\n改进方向:
-
计算成交量-价格协同周期性(量价同步脉动更有效)
-
在价格上涨时的周期性 vs 下跌时的周期性
def volume_price_sync_fft(volume_array, return_array, period=(2,5)): """计算量价协同的周期性""" vol = np.array(volume_array) - np.mean(volume_array) ret = np.array(return_array) - np.mean(return_array) # 方案1: 量价乘积的FFT sync_signal = vol * np.sign(ret) # 上涨时量为正,下跌时为负 # 方案2: 相干性分析 from scipy.signal import coherence freq, coh = coherence(vol, ret, fs=1.0, nperseg=min(len(vol), 256)) # 提取目标频段的相干性 band_mask = (freq >= 1/period[1]) & (freq <= 1/period[0]) return np.mean(coh[band_mask])
7. 跨股票相对排名
原研报局限: 绝对值因子,未考虑截面分布\n改进方向:
- 计算每日横截面上的周期性强度排名
- 构建行业内相对周期性因子
WITH daily_factor AS (
SELECT
date,
instrument,
industry, -- 假设有行业字段
volume_fft_ratio(x_array, (2,5)) AS raw_factor
FROM ...
),
ranked_factor AS (
SELECT
*,
-- 全市场排名
percent_rank() OVER (PARTITION BY date ORDER BY raw_factor) AS market_rank,
-- 行业内排名
percent_rank() OVER (PARTITION BY date, industry ORDER BY raw_factor) AS industry_rank,
-- 行业中性化
raw_factor - avg(raw_factor) OVER (PARTITION BY date, industry) AS industry_neutral
FROM daily_factor
)
SELECT * FROM ranked_factor
8. 因子稳定性筛选
原研报局限: 单日因子噪音大\n改进方向:
-
计算5/10/20日的周期性稳定性(标准差/均值)
-
只选择"持续显示周期性"的股票
WITH rolling_stability AS ( SELECT date, instrument, drip_stone, -- 过去20日周期性的稳定性 stddev(drip_stone) OVER w20 / (avg(drip_stone) OVER w20 + 1e-6) AS cv_20d, -- 过去20日周期性均值 avg(drip_stone) OVER w20 AS ma_20d FROM sxh_test002 WINDOW w20 AS (PARTITION BY instrument ORDER BY date ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) ) SELECT *, -- 综合因子:高均值+低变异系数 CASE WHEN cv_20d < 0.5 AND ma_20d > 0.4 THEN ma_20d / (cv_20d + 0.1) ELSE 0 END AS stable_periodicity FROM rolling_stability完整SQL
# ========== UDF定义 ==========
def volume_directional_fft(
buy_vol_array: Iterable[float],
sell_vol_array: Iterable[float],
period: Tuple[int, int] = (2, 5)
) -> Tuple[float, float, float]:
"""
计算买卖双方的周期性强度
返回: (买方周期性, 卖方周期性, 净买入周期性)
"""
def calc_band_power(x_array, period):
x = np.asarray(list(x_array), dtype=np.float64)
if x.size == 0:
return 0.0
# MAD去脉冲
median = np.median(x)
mad = np.median(np.abs(x - median))
x = np.clip(x, median - 3*1.4826*mad, median + 3*1.4826*mad)
# FFT流程
x = x - x.mean()
window = np.hanning(x.size)
xw = x * window
if xw.size < 256:
xw = np.pad(xw, (0, 256 - xw.size), 'constant')
FFTc = np.fft.rfft(xw)
P = np.abs(FFTc) ** 2
freq = np.fft.rfftfreq(xw.size, d=1.0)
freq_low = 1.0 / period[1]
freq_high = 1.0 / period[0]
band_mask = (freq >= freq_low) & (freq <= freq_high) & (freq != 0.0)
band_power = float(np.sum(P[band_mask]))
total_power = float(np.sum(P[freq != 0.0]))
return band_power / (total_power + 1e-8)
buy_power = calc_band_power(buy_vol_array, period)
sell_power = calc_band_power(sell_vol_array, period)
net_power = buy_power - sell_power
return (buy_power, sell_power, net_power)
# 注册UDF
udf_dir_fft = dai.DaiUDF(
name="volume_directional_fft",
function=volume_directional_fft,
return_type="struct<buy_power:double, sell_power:double, net_power:double>"
)
\
修改BUG
在实际代码运行中,虽然会有一些小报错,但是直接丢给cowork都可以做到完全自动修改,第一次因为udf注册问题失败,修改后可以成功注册。第二次因为nan值过多,通过修改代码最终获得了可以正常跑出来的代码。
\
import dai
import numpy as np
import pandas as pd
from calendar import monthrange
# ========== UDF(同前) ==========
def volume_net_buy_safe(buy_vol_array, sell_vol_array, period=None) -> float:
try:
if period is None:
period = (2, 5)
period_low, period_high = period
def calc_band_power(x_array):
if x_array is None:
return 0.0
x = np.asarray(list(x_array), dtype=np.float64)
x = x[np.isfinite(x)]
if x.size < 10:
return 0.0
median = np.median(x)
mad = np.median(np.abs(x - median))
if mad > 1e-8:
x = np.clip(x, median - 3*1.4826*mad, median + 3*1.4826*mad)
x = x - x.mean()
window = np.hanning(x.size) if x.size > 1 else np.ones(x.size)
xw = x * window
if xw.size < 256:
xw = np.pad(xw, (0, 256 - xw.size), 'constant')
FFTc = np.fft.rfft(xw)
P = np.abs(FFTc) ** 2
freq = np.fft.rfftfreq(xw.size, d=1.0)
freq_low = 1.0 / period_high
freq_high = 1.0 / period_low
band_mask = (freq >= freq_low) & (freq <= freq_high) & (freq != 0.0)
if not band_mask.any():
return 0.0
band_power = float(np.sum(P[band_mask]))
total_power = float(np.sum(P[freq != 0.0]))
if total_power < 1e-8:
return 0.0
result = band_power / total_power
return 0.0 if (np.isnan(result) or np.isinf(result)) else float(result)
buy_power = calc_band_power(buy_vol_array)
sell_power = calc_band_power(sell_vol_array)
net = buy_power - sell_power
return 0.0 if (np.isnan(net) or np.isinf(net)) else float(net)
except:
return 0.0
udf_net_buy_safe = dai.DaiUDF(
name="volume_net_buy_safe",
function=volume_net_buy_safe,
return_type="double"
)
# ========== SQL(精简版) ==========
sql_simple = """
WITH price_direction AS (
SELECT
date, instrument, volume, close, open, high, low,
COALESCE(
CASE WHEN close >= open AND (high - low) > 0.0001
THEN volume * (close - low) / (high - low)
ELSE volume * 0.3 END, volume * 0.3) AS buy_volume,
COALESCE(
CASE WHEN close < open AND (high - low) > 0.0001
THEN volume * (high - close) / (high - low)
ELSE volume * 0.3 END, volume * 0.3) AS sell_volume
FROM cn_stock_bar1m_c
WHERE date::TIME > '09:30' AND date::TIME <= '14:57'
),
valid_stocks AS (
SELECT date::DATE as trade_date, instrument
FROM price_direction
GROUP BY date::DATE, instrument
HAVING count(*) >= 200
),
agg AS (
SELECT
p.date::DATE::DATETIME AS date, p.instrument,
array_agg(p.buy_volume ORDER BY p.date::TIME) AS buy_array,
array_agg(p.sell_volume ORDER BY p.date::TIME) AS sell_array
FROM price_direction p
INNER JOIN valid_stocks v ON p.date::DATE = v.trade_date AND p.instrument = v.instrument
GROUP BY p.date::DATE, p.instrument
)
SELECT date, instrument,
volume_net_buy_safe(buy_array, sell_array, (2, 5)) AS drip_stone_buysel
FROM agg ORDER BY date, instrument
"""
# ========== 按月计算函数 ==========
def calc_factor_by_month(year, month):
start_date = f"{year}-{month:02d}-01"
last_day = monthrange(year, month)[1]
end_date = f"{year}-{month:02d}-{last_day}"
print(f"计算 {year}-{month:02d} ({start_date} ~ {end_date})...")
df = dai.query(
sql_simple,
filters={'date': [start_date, end_date]},
udf_list=[udf_net_buy_safe]
).df()
df['date'] = pd.to_datetime(df['date'])
print(f" 完成: {len(df):,} 条, NaN: {df['drip_stone_buysel'].isna().sum()}")
return df
# ========== 执行:先测试1月 ==========
df_jan = calc_factor_by_month(2024, 1)
print("\n样本数据:")
print(df_jan.head())
print(f"\n因子统计:\n{df_jan['drip_stone_buysel'].describe()}")
# ========== 保存到CSV(推荐) ==========
import os
output_dir = '/home/aiuser/work/codev_userdata/3211f197-bbf2-4a41-8bff-9d06f6e99a6d/factors'
os.makedirs(output_dir, exist_ok=True)
df_jan.to_csv(f"{output_dir}/drip_stone_buysel_202401.csv", index=False)
print(f"\n已保存到 {output_dir}/drip_stone_buysel_202401.csv")
\
因子测试
虽然因为工程化的问题目前ai给的代码还无法做到自动计算历史,自动因子分析及自动因子更新,不过相信在不远的未来这一些都可以端到端实现。
目前是通过纯手工的方式迁移到私享会的因子自动计算历史框架内,已经在私享会中分享过了,在此不做过多赘述。
因为时间关系,因子历史仅计算24 25年2年全年,并且和原因子进行对比看绩效是否有提升。
值的一提的是,ai给出的因子共3个,分别为买力中的滴水穿石 卖力中的滴水穿石,并且相减计算净值。考虑到需要对细分因子分别测试,这里将相减部分代码删除,仅计算买力滴水穿石作为测试。
使用私享会分享的因子自动计算历史的框架,计算2年高频因子数据仅耗时15分钟。环境规格为K3
为了方便对比,大家可参考股海泛舟同学之前分享的原版因子绩效。链接如下:
研报复现:【方正金工】个股日内成交量周期性节奏刻画与“滴水穿石”因子构建——多因子选股系列研究之二十四
\
改进后的因子表现
未平滑前因子换手率高达0.8 IC0.05
平滑改进后因子表现
\
总结
从pormt到获得结果,总共用了不到20分钟的时间,进行4轮对话,代码改了不到10行。
对一个从事多年因子研究的我来说,除了震撼,还是震撼。
或许这就是未来量化投研的新的范式,一人即私募的时代,也许离我们很近。
\