精品课程

kinghh 作业三

由bq588ft6创建,最终由bq588ft6 被浏览 5 用户

from bigmodule import M

# <aistudiograph>

# @param(id="m5", name="initialize")
# 交易引擎:初始化函数,只执行一次
def m5_initialize_bigquant_run(context):
    from bigtrader.finance.commission import PerOrder

    # 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
    context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
    # 止盈阈值:比如 20%
    context.take_profit = 0.20
    context.entry_price = {}
    #print("INIT OK", context.take_profit, "entry_price inited")
# @param(id="m5", name="before_trading_start")
# 交易引擎:每个单位时间开盘前调用一次。
def m5_before_trading_start_bigquant_run(context, data):
    # 盘前处理,订阅行情等
    pass

# @param(id="m5", name="handle_tick")
# 交易引擎:tick数据处理函数,每个tick执行一次
def m5_handle_tick_bigquant_run(context, tick):
    pass

# @param(id="m5", name="handle_data")
def m5_handle_data_bigquant_run(context, data):
    import pandas as pd
    import pandas as pd

    # ====== [定位1] 每月1号打印:当前到底有没有真实持仓 ======
    if data.current_dt.day == 1:
        acc_pos = context.get_account_positions()
        print("\n[DBG1] date=", data.current_dt,
              "get_account_positions cnt=", len(acc_pos),
              "keys sample=", list(acc_pos.keys())[:10])

        # 同时看看 portfolio.positions 有没有东西(用来对比)
        try:
            pf_pos = context.portfolio.positions
            print("[DBG1] portfolio.positions cnt=", len(pf_pos),
                  "keys sample=", list(pf_pos.keys())[:10])
        except Exception as e:
            print("[DBG1] portfolio.positions access err:", e)
    if data.current_dt.day == 1:  # 少打点日志
        print("date=", data.current_dt, "context.data size=", len(context.data))
    # ========= 止盈(每天都执行)=========
    positions = context.get_account_positions()
    tp_sold = set()

     # ========= 止盈(每天都执行)=========
    positions = context.get_account_positions()
    tp_sold = set()

    for inst, pos in positions.items():
        amount = getattr(pos, "amount", None)
        if amount is None:
            amount = getattr(pos, "", 0)
        if not amount or amount <= 0:
            continue

        entry = context.entry_price.get(inst)
        if entry is None or entry <= 0:
            entry = getattr(pos, "avg_cost", None) or getattr(pos, "cost_basis", None)

        last = data.current(inst, "close")
        if entry is None or last is None:
            continue

        ret = last / entry - 1
        if data.current_dt.day == 1:
            if not hasattr(context, "_dbg2_cnt"):
                context._dbg2_cnt = 0
            if context._dbg2_cnt < 3:
                context._dbg2_cnt += 1
                print("[DBG2]", data.current_dt, inst, "amount=", amount, "entry=", entry, "last=", last, "ret=", ret)
                if ret >= context.take_profit:
                    print("TAKE_PROFIT_TRIGGER", data.current_dt, inst, "entry", entry, "last", last, "ret", ret)
            context.order_target(inst, 0)
            tp_sold.add(inst)
    # 下一个交易日不是调仓日,则不生成信号
    if not context.rebalance_period.is_signal_date(data.current_dt.date()):
        return

    # 从传入的数据 context.data 中读取今天的信号数据
    today_df = context.data[context.data["date"] == data.current_dt.strftime("%Y-%m-%d")]
    target_instruments = set(today_df["instrument"])

    # 获取当前已持有股票
    holding_instruments = set(context.get_account_positions().keys())

    # 卖出不在目标持有列表中的股票
    for instrument in holding_instruments - target_instruments:
        context.order_target_percent(instrument, 0)
    # 买入目标持有列表中的股票
    for i, x in today_df.iterrows():
        if x.instrument in tp_sold:
            continue
        # 处理 null 或者 decimal.Decimal 类型等
        position = 0.0 if pd.isnull(x.position) else float(x.position)
        context.order_target_percent(x.instrument, position)

# @param(id="m5", name="handle_trade")
# 交易引擎:成交回报处理函数,每个成交发生时执行一次
def m5_handle_order_m5_handle_trade_bigquant_run(context, order):
    if order is None:
        return
    print("ORDER",
          context.current_dt,
          order.instrument,
          "dir", order.direction,
          "status", order.order_status,
          "qty", order.order_qty,
          "filled", order.filled_qty,
          "avg", order.avg_price)

def m5_handle_trade_bigquant_run(context, trade):
    if trade is None:
        return

    # ====== [定位3] 打印 trade 有哪些字段(只打印前几次,避免爆日志)======
    if not hasattr(context, "_dbg_trade_cnt"):
        context._dbg_trade_cnt = 0
    if context._dbg_trade_cnt < 5:
        context._dbg_trade_cnt += 1
        try:
            print("[DBG3] trade dir(trade) sample:", dir(trade))
        except Exception as e:
            print("[DBG3] dir(trade) err:", e)

    # 兼容取成交价字段
    price = (getattr(trade, "filled_price", None)
             or getattr(trade, "price", None)
             or getattr(trade, "avg_price", None)
             or getattr(trade, "trade_price", None))

    direction = str(getattr(trade, "direction", ""))

    # 只要是买入成交就记录入场价
    if direction == "1" and price is not None:
        context.entry_price[trade.instrument] = float(price)
        print("[DBG3B] set entry_price", trade.instrument, context.entry_price[trade.instrument])

    print("TRADE", context.current_dt, trade.instrument,
          "dir", getattr(trade, "direction", None),
          "qty", getattr(trade, "filled_qty", None),
          "price_used", price)

# @param(id="m5", name="after_trading")
# 交易引擎:盘后处理函数,每日盘后执行一次
def m5_after_trading_bigquant_run(context, data):
    pass

# @module(position="-377,-876", comment="""选股""", comment_collapsed=True)
m1 = M.cn_stock_basic_selector.v8(
    exchanges=["""上交所""", """深交所""", """北交所"""],
    list_sectors=["""主板""", """创业板""", """科创板"""],
    indexes=["""中证500""", """上证指数""", """创业板指""", """深证成指""", """北证50""", """上证50""", """科创50""", """沪深300""", """中证1000""", """中证100""", """深证100"""],
    st_statuses=["""正常""", """ST""", """*ST"""],
    margin_tradings=["""两融标的""", """非两融标的"""],
    sw2021_industries=["""农林牧渔""", """采掘""", """基础化工""", """钢铁""", """有色金属""", """建筑建材""", """机械设备""", """电子""", """汽车""", """交运设备""", """信息设备""", """家用电器""", """食品饮料""", """纺织服饰""", """轻工制造""", """医药生物""", """公用事业""", """交通运输""", """房地产""", """金融服务""", """商贸零售""", """社会服务""", """信息服务""", """银行""", """非银金融""", """综合""", """建筑材料""", """建筑装饰""", """电力设备""", """国防军工""", """计算机""", """传媒""", """通信""", """煤炭""", """石油石化""", """环保""", """美容护理"""],
    drop_suspended=True,
    m_name="""m1"""
)

# @module(position="-383,-783", comment="""""", comment_collapsed=True)
m6 = M.input_features_dai.v30(
    input_1=m1.data,
    mode="""表达式""",
    expr="""total_market_cap as score""",
    expr_filters="""turn<0.05
""",
    expr_tables="""cn_stock_prefactors""",
    extra_fields="""date, instrument""",
    order_by="""date, score""",
    expr_drop_na=True,
    sql="""-- 使用DAI SQL获取数据, 构建因子等, 如下是一个例子作为参考
-- DAI SQL 语法: https://bigquant.com/wiki/doc/dai-PLSbc1SbZX#h-sql%E5%85%A5%E9%97%A8%E6%95%99%E7%A8%8B
-- 使用数据输入1/2/3里的字段: e.g. input_1.close, input_1.* EXCLUDE(date, instrument)

SELECT
    -- 在这里输入因子表达式
    -- DAI SQL 算子/函数: https://bigquant.com/wiki/doc/dai-PLSbc1SbZX#h-%E5%87%BD%E6%95%B0
    -- 数据&字段: 数据文档 https://bigquant.com/data/home

    m_lag(close, 90) / close AS return_90,
    m_lag(close, 30) / close AS return_30,
    -- 下划线开始命名的列是中间变量, 不会在最终结果输出 (e.g. _rank_return_90)
    c_pct_rank(-return_90) AS _rank_return_90,
    c_pct_rank(return_30) AS _rank_return_30,

    c_rank(volume) AS rank_volume,
    close / m_lag(close, 1) as return_0,

    -- 日期和股票代码
    date, instrument
FROM
    -- 预计算因子 cn_stock_bar1d https://bigquant.com/data/datasources/cn_stock_bar1d
    cn_stock_factors
    -- SQL 模式不会自动join输入数据源, 可以根据需要自由灵活的使用
    -- JOIN input_1 USING(date, instrument)
WHERE
    -- WHERE 过滤, 在窗口等计算算子之前执行
    -- 剔除ST股票
    st_status = 0
QUALIFY
    -- QUALIFY 过滤, 在窗口等计算算子之后执行, 比如 m_lag(close, 3) AS close_3, 对于 close_3 的过滤需要放到这里
    -- 去掉有空值的行
    COLUMNS(*) IS NOT NULL
    -- _rank_return_90 是窗口函数结果,需要放在 QUALIFY 里
    AND _rank_return_90 > 0.1
    AND _rank_return_30 < 0.1
-- 按日期和股票代码排序, 从小到大
ORDER BY date, instrument
""",
    extract_data=False,
    m_cached=False,
    m_name="""m6"""
)

# @module(position="-375,-686", comment="""抽取预测数据""")
m3 = M.extract_data_dai.v20(
    sql=m6.data,
    start_date="""2024-01-01""",
    start_date_bound_to_trading_date=True,
    end_date="""2025-12-29""",
    end_date_bound_to_trading_date=True,
    before_start_days=240,
    keep_before=False,
    debug=False,
    m_cached=False,
    m_name="""m3"""
)

# @module(position="-380,-565", comment="""持股数量、打分到仓位""")
m4 = M.score_to_position.v5(
    input_1=m3.data,
    score_field="""score""",
    hold_count=10,
    position_expr="""-- DAI SQL 算子/函数: https://bigquant.com/wiki/doc/dai-PLSbc1SbZX#h-%E5%87%BD%E6%95%B0
-- 在这里输入表达式, 每行一个表达式, 输出仓位字段必须命名为 position, 模块会进一步做归一化
-- 排序倒数: 1 / score_rank AS position
-- 对数下降: 1 / log2(score_rank + 1) AS position
-- TODO 拟合、最优化 ..

-- 等权重分配
1 AS position
""",
    total_position=1,
    extract_data=True,
    m_name="""m4"""
)

# @module(position="-389,-440", comment="""交易,日线,设置初始化函数和K线处理函数,以及初始资金、基准等""")
m5 = M.bigtrader.v52(
    data=m4.data,
    start_date="""2024-01-01""",
    end_date="""2025-12-29""",
    initialize=m5_initialize_bigquant_run,
    before_trading_start=m5_before_trading_start_bigquant_run,
    handle_tick=m5_handle_tick_bigquant_run,
    handle_data=m5_handle_data_bigquant_run,
    handle_trade=m5_handle_trade_bigquant_run,
    after_trading=m5_after_trading_bigquant_run,
    capital_base=1000000,
    frequency="""daily""",
    product_type="""股票""",
    rebalance_period_type="""交易日""",
    rebalance_period_days="""3""",
    rebalance_period_roll_forward=True,
    backtest_engine_mode="""标准模式""",
    before_start_days=0,
    volume_limit=1,
    order_price_field_buy="""open""",
    order_price_field_sell="""open""",
    benchmark="""000300.SH""",
    plot_charts=True,
    debug=False,
    backtest_only=False,
    m_name="""m5"""
)
# </aistudiograph>

标签

量化交易
{link}