方正 暗流涌动 因子构建 基础
由bq355jhd创建,最终由bq355jhd 被浏览 10 用户
研报:https://mp.weixin.qq.com/s/h9S6UCvV0Ffa9taHI8qjrA
因子分析 计算了2021到近期的,多头预测能力并不算突出,魔改版本还在进行中…
--因子分析代码(图下面)
--因子计算代码(最下面)
改为5日return 的IC = 0.0328
因子分析原文
https://bigquant.com/codesharev3/215ccfb3-3bfb-49ce-af79-27c328013d05
因子构建 已经平滑20日(好像python文件不能分享,就粘贴代码了)
""" 暗流涌动因子 - 严格原文版 完全按照方正金工原文逻辑实现
原文核心逻辑: 1. 成交量熵值因子:
- 240分钟划分为48个5分钟区间
- 计算相对成交量(个股/全市场)
- 计算每个5分钟区间的相对成交量占比
- 香农熵测量分布集中度
- 均值距离化处理
- 20日低频化(均值+标准差)
2. 流动性弹性因子:
- 激增定义:超过过去5分钟均量1倍
- 价格波动:(最高-最低)/开盘
- 敏感系数 = 激增时刻波动/普通时刻波动
- 弹性 = 1 - 敏感系数
- 均值距离化处理
- 20日低频化(均值+标准差)
3. 合成:等权合成 """
import numpy as np import pandas as pd import dai from datetime import datetime, timedelta import warnings warnings.filterwarnings('ignore')
class UndercurrentFactorOriginal: """暗流涌动因子 - 严格原文版"""
def __init__(self):
"""严格按照原文参数"""
self.lookback_days = 20 # 低频化窗口
def fetch_minute_data(self, start_date, end_date):
"""获取分钟数据"""
print(f" 获取分钟数据 {start_date} \~ {end_date}...", end='')
sql = f"""
WITH minute_bars AS (
SELECT
instrument,
date,
close,
high,
low,
open,
volume,
CAST(date AS DATE) as trade_date,
EXTRACT(HOUR FROM date) \* 60 + EXTRACT(MINUTE FROM date) as minute_of_day
FROM cn_stock_bar1m_c
WHERE date >= '{start_date} 09:00:00'
AND date <= '{end_date} 15:30:00'
AND volume > 0
AND close > 0
AND high > 0
AND low > 0
AND open > 0
),
valid_stocks AS (
SELECT DISTINCT
instrument,
CAST(date AS DATE) as trade_date
FROM cn_stock_prefactors
WHERE date >= '{start_date}'
AND date <= '{end_date}'
AND st_status = 0
AND DATE_DIFF('day', list_date, date) >= 60
)
SELECT
m.instrument,
m.trade_date,
m.date,
m.close,
m.high,
m.low,
m.open,
m.volume,
m.minute_of_day
FROM minute_bars m
INNER JOIN valid_stocks v
ON m.instrument = v.instrument
AND m.trade_date = v.trade_date
WHERE m.minute_of_day >= 570 -- 09:30
AND m.minute_of_day <= 900 -- 15:00 (不含最后一分钟)
ORDER BY m.instrument, m.trade_date, m.date
"""
df = dai.query(sql).df()
print(f" ✓ {len(df):,}条")
return df
def calculate_volume_entropy_factor(self, df_minute):
"""
成交量熵值因子 - 严格按照原文
步骤:
1. 将240分钟划分为48个5分钟区间
2. 计算相对成交量(个股/全市场)
3. 计算香农熵
4. 均值距离化
5. 20日低频化
"""
print(" 计算成交量熵值因子...", end='')
# 步骤1: 创建5分钟区间索引 (0-47)
# 09:30-11:30 = 120分钟 = 24个区间
# 13:00-15:00 = 120分钟 = 24个区间
df = df_minute.copy()
def get_interval_index(minute_of_day):
"""将分钟转换为区间索引"""
if minute_of_day < 690: # 09:30-11:30
return (minute_of_day - 570) // 5
else: # 13:00-15:00
return 24 + (minute_of_day - 780) // 5
df\['interval'\] = df\['minute_of_day'\].apply(get_interval_index)
# 步骤2: 计算相对成交量
# 先计算全市场每分钟的总成交量
market_volume = df.groupby(\['trade_date', 'date'\])\['volume'\].sum().reset_index()
market_volume.columns = \['trade_date', 'date', 'market_volume'\]
df = df.merge(market_volume, on=\['trade_date', 'date'\], how='left')
df\['relative_volume'\] = df\['volume'\] / df\['market_volume'\]
# 步骤3: 计算每个5分钟区间的相对成交量占比
interval_stats = df.groupby(\['instrument', 'trade_date', 'interval'\])\['relative_volume'\].sum().reset_index()
interval_stats.columns = \['instrument', 'trade_date', 'interval', 'interval_volume'\]
# 计算每只股票每天的总相对成交量
daily_total = interval_stats.groupby(\['instrument', 'trade_date'\])\['interval_volume'\].sum().reset_index()
daily_total.columns = \['instrument', 'trade_date', 'total_volume'\]
interval_stats = interval_stats.merge(daily_total, on=\['instrument', 'trade_date'\], how='left')
interval_stats\['prob'\] = interval_stats\['interval_volume'\] / interval_stats\['total_volume'\]
# 计算香农熵
def calc_entropy(group):
probs = group\['prob'\].values
probs = probs\[probs > 0\] # 只保留非零概率
if len(probs) == 0:
return 0
entropy = -np.sum(probs \* np.log(probs))
return entropy
entropy_daily = interval_stats.groupby(\['instrument', 'trade_date'\]).apply(calc_entropy).reset_index()
entropy_daily.columns = \['instrument', 'date', 'entropy'\]
# 步骤4: 均值距离化处理(减去截面均值取绝对值)
entropy_daily\['entropy_mean'\] = entropy_daily.groupby('date')\['entropy'\].transform('mean')
entropy_daily\['entropy_distance'\] = abs(entropy_daily\['entropy'\] - entropy_daily\['entropy_mean'\])
# 步骤5: 20日低频化(移动平均 + 移动标准差)
entropy_daily = entropy_daily.sort_values(\['instrument', 'date'\])
entropy_daily\['entropy_20d_mean'\] = entropy_daily.groupby('instrument')\['entropy_distance'\].transform(
lambda x: x.rolling(self.lookback_days, min_periods=1).mean()
)
entropy_daily\['entropy_20d_std'\] = entropy_daily.groupby('instrument')\['entropy_distance'\].transform(
lambda x: x.rolling(self.lookback_days, min_periods=1).std()
)
# 合成:均值 + 标准差
entropy_daily\['volume_entropy_factor'\] = entropy_daily\['entropy_20d_mean'\] + entropy_daily\['entropy_20d_std'\].fillna(0)
print(f" ✓")
return entropy_daily\[\['instrument', 'date', 'volume_entropy_factor'\]\]
def calculate_liquidity_elasticity_factor(self, df_minute):
"""
流动性弹性因子 - 严格按照原文
步骤:
1. 定义激增时刻:超过过去5分钟均量1倍
2. 计算价格波动幅度:(最高-最低)/开盘
3. 计算激增时刻和普通时刻的平均波动
4. 敏感系数 = 激增波动/普通波动
5. 弹性 = 1 - 敏感系数
6. 均值距离化
7. 20日低频化
"""
print(" 计算流动性弹性因子...", end='')
df = df_minute.copy()
df = df.sort_values(\['instrument', 'trade_date', 'date'\]).reset_index(drop=True)
# 步骤1: 计算过去5分钟滚动均量
df\['volume_5min_avg'\] = df.groupby('instrument')\['volume'\].transform(
lambda x: x.rolling(5, min_periods=1).mean()
)
# 定义激增时刻:超过5分钟均量1倍(即2倍)
df\['is_surge'\] = df\['volume'\] > (df\['volume_5min_avg'\] \* 2)
# 步骤2: 计算价格波动幅度
df\['price_volatility'\] = (df\['high'\] - df\['low'\]) / df\['open'\]
# 步骤3: 分别计算激增时刻和普通时刻的平均波动
def calc_elasticity(group):
surge_data = group\[group\['is_surge'\]\]
normal_data = group\[\~group\['is_surge'\]\]
if len(surge_data) < 3 or len(normal_data) < 3:
return pd.Series({
'surge_volatility': np.nan,
'normal_volatility': np.nan,
'elasticity_raw': np.nan
})
surge_vol = surge_data\['price_volatility'\].mean()
normal_vol = normal_data\['price_volatility'\].mean()
if normal_vol == 0 or np.isnan(normal_vol):
return pd.Series({
'surge_volatility': surge_vol,
'normal_volatility': normal_vol,
'elasticity_raw': np.nan
})
# 步骤4: 敏感系数 = 激增波动 / 普通波动
sensitivity = surge_vol / normal_vol
# 步骤5: 弹性 = 1 - 敏感系数
elasticity = 1 - sensitivity
return pd.Series({
'surge_volatility': surge_vol,
'normal_volatility': normal_vol,
'elasticity_raw': elasticity
})
elasticity_daily = df.groupby(\['instrument', 'trade_date'\]).apply(calc_elasticity).reset_index()
elasticity_daily.columns = \['instrument', 'date', 'surge_volatility', 'normal_volatility', 'elasticity_raw'\]
# 步骤6: 均值距离化处理
elasticity_daily\['elasticity_mean'\] = elasticity_daily.groupby('date')\['elasticity_raw'\].transform('mean')
elasticity_daily\['elasticity_distance'\] = abs(elasticity_daily\['elasticity_raw'\] - elasticity_daily\['elasticity_mean'\])
# 步骤7: 20日低频化
elasticity_daily = elasticity_daily.sort_values(\['instrument', 'date'\])
elasticity_daily\['elasticity_20d_mean'\] = elasticity_daily.groupby('instrument')\['elasticity_distance'\].transform(
lambda x: x.rolling(self.lookback_days, min_periods=1).mean()
)
elasticity_daily\['elasticity_20d_std'\] = elasticity_daily.groupby('instrument')\['elasticity_distance'\].transform(
lambda x: x.rolling(self.lookback_days, min_periods=1).std()
)
# 合成:均值 + 标准差
elasticity_daily\['liquidity_elasticity_factor'\] = elasticity_daily\['elasticity_20d_mean'\] + elasticity_daily\['elasticity_20d_std'\].fillna(0)
print(f" ✓")
return elasticity_daily\[\['instrument', 'date', 'liquidity_elasticity_factor'\]\]
def combine_factors(self, df_entropy, df_elasticity):
"""等权合成两个子因子"""
print(" 合成因子...", end='')
df = pd.merge(df_entropy, df_elasticity, on=\['instrument', 'date'\], how='inner')
# 等权合成
df\['undercurrent'\] = df\['volume_entropy_factor'\] + df\['liquidity_elasticity_factor'\]
print(f" ✓")
return df\[\['instrument', 'date', 'volume_entropy_factor', 'liquidity_elasticity_factor', 'undercurrent'\]\]
def calculate_batch(self, start_date, end_date):
"""计算单个批次"""
# 获取分钟数据
df_minute = self.fetch_minute_data(start_date, end_date)
if len(df_minute) == 0:
return pd.DataFrame()
# 计算两个子因子
df_entropy = self.calculate_volume_entropy_factor(df_minute)
df_elasticity = self.calculate_liquidity_elasticity_factor(df_minute)
# 合成
df_factor = self.combine_factors(df_entropy, df_elasticity)
return df_factor
def save_to_bdb(self, df_factor, table_name='factor_undercurrent_original'):
"""保存到数据库"""
if len(df_factor) == 0:
return
df_save = df_factor.copy()
df_save\['date'\] = pd.to_datetime(df_save\['date'\])
df_save.columns = \['instrument', 'date', 'entropy', 'elasticity', 'undercurrent'\]
try:
dai.DataSource.write_bdb(
data=df_save,
id=table_name,
unique_together=\['instrument', 'date'\],
indexes=\['date'\]
)
print(f" 保存 ✓ {len(df_save)}条")
except Exception as e:
print(f" 保存 ✗ {str(e)\[:100\]}")
def calculate_range(self, start_date, end_date, batch_days=5,
save_to_db=True, table_name='factor_undercurrent_original'):
"""批量计算"""
print("=" \* 80)
print(" " \* 25 + "暗流涌动因子 - 严格原文版")
print("=" \* 80)
print("核心逻辑:")
print(" 1. 成交量熵值:48个5分钟区间 + 相对成交量 + 均值距离化 + 20日低频化")
print(" 2. 流动性弹性:激增定义(5分钟均量\*2) + 敏感系数 + 弹性=1-敏感 + 20日低频化")
print(" 3. 等权合成")
print("=" \* 80)
start = datetime.strptime(start_date, '%Y-%m-%d')
end = datetime.strptime(end_date, '%Y-%m-%d')
batches = \[\]
current = start
while current <= end:
batch_end = min(current + timedelta(days=batch_days-1), end)
batches.append((current.strftime('%Y-%m-%d'), batch_end.strftime('%Y-%m-%d')))
current = batch_end + timedelta(days=1)
print(f"\\n总共 {len(batches)} 个批次\\n")
all_factors = \[\]
success_count = 0
for i, (batch_start, batch_end) in enumerate(batches, 1):
print(f"\[批次 {i}/{len(batches)}\] {batch_start} \~ {batch_end}")
try:
df_batch = self.calculate_batch(batch_start, batch_end)
if len(df_batch) > 0:
all_factors.append(df_batch)
success_count += 1
if save_to_db:
self.save_to_bdb(df_batch, table_name)
except Exception as e:
print(f" 错误: {str(e)\[:100\]}")
import traceback
traceback.print_exc()
continue
print("\\n" + "=" \* 80)
print(f"完成!成功 {success_count}/{len(batches)} 个批次")
print("=" \* 80)
if all_factors:
df_all = pd.concat(all_factors, ignore_index=True)
return df_all
else:
return pd.DataFrame()
def main(): """主函数""" calculator = UndercurrentFactorOriginal()
# 计算2021-2026
df = calculator.calculate_range(
start_date='2022-01-01',
end_date='2026-01-28',
batch_days=5,
save_to_db=True,
table_name='factor_undercurrent_original'
)
if len(df) > 0:
print("\\n因子预览:")
print(df.head(20))
print("\\n因子统计:")
print(df\[\['entropy', 'elasticity', 'undercurrent'\]\].describe())
if name == "main": main()