BigQuant使用文档

日频回测demos

由qxiao创建,最终由qxiao 被浏览 4 用户

股票日频策略一:基本面选股策略

非ST、非科创板、非退市、非停牌,流通市值大于5亿元,净利润同比>30%,营业收入同比>30%,0<市盈率<50,市净率<5,总市值从小到大排列,每5个交易日调仓,持有前20只。

from bigquant import bigtrader, dai

def initialize(context: bigtrader.IContext):
    context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))

    rebalance_days = 5
    stock_num      = 20

    sql = """
    SELECT
        date,
        instrument,
        total_market_cap,
        float_market_cap,
        pe_ttm,
        pb,
        net_profit_growth_rate,
        revenue_growth_rate,
        1.0/$stock_num AS weight
    FROM (
        SELECT
            date,
            instrument,
            total_market_cap,
            float_market_cap,
            pe_ttm,
            pb,
            net_profit_ttm,
            operating_revenue_ttm,
            st_status,
            suspended,
            list_sector,
            list_days,
            (net_profit_ttm / m_lag(net_profit_ttm, 250) - 1) * 100 AS net_profit_growth_rate,
            (operating_revenue_ttm / m_lag(operating_revenue_ttm, 250) - 1) * 100 AS revenue_growth_rate
        FROM cn_stock_prefactors
    ) AS t
    WHERE
        st_status = 0
        AND suspended = 0
        AND list_sector IN (1, 2)
        AND list_days > 365
        AND float_market_cap > 500000000
        AND pe_ttm > 0 AND pe_ttm < 50
        AND pb > 0 AND pb < 5
        AND net_profit_growth_rate > 30
        AND revenue_growth_rate > 30
    ORDER BY date, total_market_cap ASC
    """

    df = dai.query(
        sql,
        filters={"date": [
            context.add_trading_days(context.start_date, -250),
            context.end_date
        ]},
        params={"stock_num": stock_num}
    ).df()

    df = df.groupby('date').head(stock_num)
    df = bigtrader.TradingDaysRebalance(rebalance_days, context=context).select_rebalance_data(df)

    context.data = df

performance = bigtrader.run(
    market=bigtrader.Market.CN_STOCK,
    frequency=bigtrader.Frequency.DAILY,
    start_date="2024-01-01",
    end_date="2026-01-21",
    capital_base=1000000,
    initialize=initialize,
    handle_data=bigtrader.HandleDataLib.handle_data_weight_based,
)

股票日频策略二:KDJ 指标策略(农业银行)

601288.SH,计算 KDJ(9,3,3) 指标,KDJ_K < 20 为买入信号(加仓至100%),KDJ_K > 80 为卖出信号(减仓至80%),首次建仓 80% 仓位,每1个交易日调仓。

from bigquant import bigtrader, dai
from datetime import datetime

def initialize(context: bigtrader.IContext):
    context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))

    context.stock          = '601288.SH'
    context.kdj_n          = 9
    context.kdj_m1         = 3
    context.kdj_m2         = 3
    context.buy_threshold  = 20
    context.sell_threshold = 80
    context.base_position  = 0.8
    context.full_position  = 1.0
    context.current_position_target = context.base_position
    context.is_first_trade = True

    sql = """
    SELECT
        date,
        instrument,
        close,
        m_ta_kdj_k(high, low, close, fastk_period := $kdj_n, slowk_period := $kdj_m1, slowd_period := $kdj_m2) AS kdj_k,
        m_ta_kdj_d(high, low, close, fastk_period := $kdj_n, slowk_period := $kdj_m1, slowd_period := $kdj_m2) AS kdj_d,
        m_ta_kdj_j(high, low, close, fastk_period := $kdj_n, slowk_period := $kdj_m1, slowd_period := $kdj_m2) AS kdj_j
    FROM cn_stock_prefactors
    WHERE instrument = $stock AND suspended = 0
    ORDER BY date
    """

    df = dai.query(
        sql,
        filters={"date": [
            context.add_trading_days(context.start_date, -60),
            context.end_date
        ]},
        params={"stock": context.stock, "kdj_n": context.kdj_n, "kdj_m1": context.kdj_m1, "kdj_m2": context.kdj_m2}
    ).df()

    df = df[df['date'] >= context.start_date]
    context.data = df

def handle_data(context: bigtrader.IContext, data: bigtrader.IBarData):
    current_date = data.current_dt.strftime('%Y-%m-%d')
    today_data   = context.data[context.data['date'] == current_date]

    if today_data.empty:
        return

    kdj_k = today_data.iloc[0]['kdj_k']
    kdj_d = today_data.iloc[0]['kdj_d']
    kdj_j = today_data.iloc[0]['kdj_j']

    if kdj_k is None or kdj_k != kdj_k:
        return

    current_position = context.portfolio.positions.get(context.stock)
    current_value    = current_position.market_value if current_position else 0
    total_value      = context.portfolio.portfolio_value
    current_weight   = current_value / total_value if total_value > 0 else 0

    target_weight = context.current_position_target
    signal_desc   = "持有"

    if context.is_first_trade and current_weight == 0:
        target_weight = context.base_position
        signal_desc   = "首次建仓"
        context.is_first_trade = False
    elif kdj_k < context.buy_threshold:
        target_weight = context.full_position
        signal_desc   = "买入信号"
    elif kdj_k > context.sell_threshold:
        target_weight = context.base_position
        signal_desc   = "卖出信号"

    context.current_position_target = target_weight

    if abs(target_weight - current_weight) > 0.01:
        context.order_target_percent(context.stock, target_weight)
        context.logger.info(
            f"{current_date}: {signal_desc} | KDJ(K={kdj_k:.2f}, D={kdj_d:.2f}, J={kdj_j:.2f}) | "
            f"当前仓位: {current_weight*100:.1f}% -> 目标仓位: {target_weight*100:.1f}%"
        )

performance = bigtrader.run(
    market=bigtrader.Market.CN_STOCK,
    frequency=bigtrader.Frequency.DAILY,
    start_date="2020-01-01",
    end_date=datetime.now().strftime("%Y-%m-%d"),
    capital_base=1000000,
    initialize=initialize,
    handle_data=handle_data,
)

ETF 日频策略一:宽基 ETF 低波动策略

标的池:沪深300ETF / 中证500ETF / 科创50ETF,因子:近30个交易日日收益率标准差(波动率),每次调仓选波动率最低的2只ETF,等权配置(各50%),每10个交易日调仓一次。

from datetime import datetime
from bigquant import bigtrader, dai

def initialize(context: bigtrader.IContext):
    context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))

    fund_pool  = ["510300.SH", "510500.SH", "588000.SH"]
    vol_window = 30

    sql = """
    SELECT
        date,
        instrument,
        -m_stddev(close / m_lag(close, 1) - 1, $vol_window) AS score,
        0.5 AS weight
    FROM cn_fund_bar1d
    WHERE instrument IN $fund_pool
    QUALIFY score IS NOT NULL
    ORDER BY date, instrument
    """

    df = dai.query(
        sql,
        filters={"date": [
            context.add_trading_days(context.start_date, -60),
            context.end_date,
        ]},
        params={"fund_pool": fund_pool, "vol_window": vol_window},
    ).df()

    df = df.sort_values(["date", "score"], ascending=[True, False])
    df = df.groupby("date").head(2)
    df["weight"] = 0.5
    df = bigtrader.TradingDaysRebalance(10, context=context).select_rebalance_data(df)

    context.data = df

performance = bigtrader.run(
    market=bigtrader.Market.CN_FUND,
    frequency=bigtrader.Frequency.DAILY,
    start_date="2020-01-01",
    end_date=datetime.now().strftime("%Y-%m-%d"),
    capital_base=1_000_000,
    initialize=initialize,
    handle_data=bigtrader.HandleDataLib.handle_data_weight_based,
    order_price_field_buy="open",
    order_price_field_sell="open",
)

ETF 日频策略二:宽基 ETF 动量轮动

标的池:沪深300ETF / 中证500ETF / 科创50ETF,因子:近20个交易日收益率(动量因子),每次调仓选动量最强的1只ETF全仓持有,每5个交易日调仓一次。

from datetime import datetime
from bigquant import bigtrader, dai

FUND_POOL      = ["510300.SH", "510500.SH", "588000.SH"]
MOMENTUM_DAYS  = 20
REBALANCE_DAYS = 5

def initialize(context: bigtrader.IContext):
    context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))

    sql = """
    SELECT date, instrument, close / m_lag(close, $momentum_days) - 1 AS momentum
    FROM cn_fund_bar1d
    WHERE instrument IN $fund_pool
    QUALIFY momentum IS NOT NULL
    ORDER BY date, instrument
    """

    df = dai.query(
        sql,
        filters={"date": [
            context.add_trading_days(context.start_date, -30),
            context.end_date,
        ]},
        params={"fund_pool": FUND_POOL, "momentum_days": MOMENTUM_DAYS},
    ).df()

    df = df.sort_values(["date", "momentum"], ascending=[True, False])
    df = df.groupby("date").head(1).set_index("date")

    context.signal_df         = df
    context.rebalance_counter = 0

def handle_data(context: bigtrader.IContext, data):
    context.rebalance_counter += 1
    if context.rebalance_counter % REBALANCE_DAYS != 1:
        return

    today = data.current_dt.strftime("%Y-%m-%d")
    if today not in context.signal_df.index:
        return

    target_instrument = context.signal_df.loc[today, "instrument"]

    for instrument, position in context.portfolio.positions.items():
        if position.amount > 0 and instrument != target_instrument:
            context.order_target_percent(instrument, 0)

    context.order_target_percent(target_instrument, 1.0)

performance = bigtrader.run(
    market=bigtrader.Market.CN_FUND,
    frequency=bigtrader.Frequency.DAILY,
    start_date="2020-01-01",
    end_date=datetime.now().strftime("%Y-%m-%d"),
    capital_base=1_000_000,
    initialize=initialize,
    handle_data=handle_data,
    order_price_field_buy="open",
    order_price_field_sell="open",
)

可转债日频策略一:低溢价率 + 低价格周度轮动

筛选条件:转股溢价率 < 10%,转债价格 < 110 元,剩余余额 > 1 亿元,剩余天数 > 120 天。按转债价格升序排列,持有价格最低的 8 只,等权持仓(各 12.5%),每 5 个交易日调仓一次。

from bigquant import bigtrader, dai
from datetime import datetime

def initialize(context: bigtrader.IContext):
    context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))

    context.bond_num = 8

    sql = """
    SELECT
        m.date,
        m.instrument,
        b.close,
        m.conversion_premium_rate,
        m.bond_balance,
        m.remaining_days
    FROM cn_cbond_analyze_metric AS m
    INNER JOIN cn_cbond_bar1d AS b USING (date, instrument)
    WHERE
        m.conversion_premium_rate < 10
        AND b.close < 110
        AND m.bond_balance > 1
        AND m.remaining_days > 120
    QUALIFY c_rank(b.close, ascending := true) <= $bond_num
    ORDER BY m.date, b.close ASC
    """

    df = dai.query(
        sql,
        filters={"date": [context.start_date, context.end_date]},
        params={"bond_num": context.bond_num}
    ).df()

    df["date"] = df["date"].astype(str).str[:10]
    context.daily_candidates = {
        date: group["instrument"].tolist()
        for date, group in df.groupby("date")
    }

    context.rebalance_period = bigtrader.TradingDaysRebalance(5, context=context)

def handle_data(context: bigtrader.IContext, data: bigtrader.IBarData):
    if not context.rebalance_period.is_signal_date(data.current_dt.date()):
        return

    today               = data.current_dt.strftime("%Y-%m-%d")
    target_instruments  = context.daily_candidates.get(today, [])
    weight              = 1.0 / context.bond_num
    target_set          = set(target_instruments)

    current_positions = {
        pos.instrument: pos
        for pos in context.portfolio.positions.values()
        if pos.amount > 0
    }
    current_set = set(current_positions.keys())

    # 先卖出不在目标池中的持仓
    for instrument in current_set - target_set:
        context.order_target_percent(instrument, 0)

    if not target_instruments:
        return

    # 买入/调整目标券至固定等权仓位
    for instrument in target_instruments:
        context.order_target_percent(instrument, weight)

performance = bigtrader.run(
    market=bigtrader.Market.CN_CBOND,
    frequency=bigtrader.Frequency.DAILY,
    start_date="2021-01-01",
    end_date=datetime.now().strftime("%Y-%m-%d"),
    capital_base=500000,
    initialize=initialize,
    handle_data=handle_data,
    order_price_field_buy='open',
    order_price_field_sell='open'
)

可转债日频策略二:双低策略

双低值 = 转股溢价率(%)+ 转债价格(元),筛选双低值 < 120 的转债,过滤剩余余额 < 1亿和剩余天数 < 120天,按双低值升序排列取前10只等权持有,每5个交易日调仓一次。

from bigquant import bigtrader, dai
from datetime import datetime

def initialize(context: bigtrader.IContext):
    context.set_commission(bigtrader.PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))

    rebalance_days = 5
    bond_num       = 10

    sql = """
    SELECT
        date,
        instrument,
        close,
        conversion_premium_rate,
        (conversion_premium_rate + close) AS dual_low,
        1.0 / $bond_num AS weight
    FROM cn_cbond_analyze_metric
    INNER JOIN cn_cbond_bar1d USING (date, instrument)
    WHERE
        (conversion_premium_rate + close) < 120
        AND bond_balance > 1
        AND remaining_days > 120
        AND conversion_premium_rate > -10
    QUALIFY c_rank(conversion_premium_rate + close, ascending := true) <= $bond_num
    ORDER BY date, dual_low ASC
    """

    df = dai.query(
        sql,
        filters={"date": [
            context.add_trading_days(context.start_date, -30),
            context.end_date
        ]},
        params={"bond_num": bond_num}
    ).df()

    df = bigtrader.TradingDaysRebalance(rebalance_days, context=context).select_rebalance_data(df)

    context.data = df

performance = bigtrader.run(
    market=bigtrader.Market.CN_CBOND,
    frequency=bigtrader.Frequency.DAILY,
    start_date="2021-01-01",
    end_date=datetime.now().strftime("%Y-%m-%d"),
    capital_base=500000,
    initialize=initialize,
    handle_data=bigtrader.HandleDataLib.handle_data_weight_based,
    order_price_field_buy='open',
    order_price_field_sell='open'
)

\

{link}