问答交流

【其他】冻结逻辑失效了,计划是止损10%冻结该股票14天,实际回测未冻结

由bqbkm8ac创建,最终由bqbkm8ac 被浏览 5 用户

from bigmodule import M
from datetime import date, timedelta

# <aistudiograph>

# @param(id="m5", name="initialize")
# 交易引擎:初始化函数,只执行一次
def m5_initialize_bigquant_run(context):
    from bigtrader.finance.commission import PerOrder
    context.data2 = context.options['data'].read()

    # 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
    context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
    context.pause_trading_days = 0
    context.position_cost = {}  # {instrument: {'cost_price': float, 'last_price': float}}
    context.cooldown_pool = {}   # {instrument: 解冻日期}

def process_cooldown(context, current_date2):
        expired = [k for k, v in context.cooldown_pool.items() if v <= current_date2]
        for k in expired:
           del context.cooldown_pool[k]
           
# @param(id="m5", name="before_trading_start")
# 交易引擎:每个单位时间开盘前调用一次。
def m5_before_trading_start_bigquant_run(context, data):
    # 盘前处理,订阅行情等
    pass

# @param(id="m5", name="handle_tick")
# 交易引擎:tick数据处理函数,每个tick执行一次
def m5_handle_tick_bigquant_run(context, tick):
    pass

# @param(id="m5", name="handle_data")
def m5_handle_data_bigquant_run(context, data):
    import pandas as pd
    current_date = data.current_dt.strftime("%Y-%m-%d")
    current_day_data2 = context.data2[context.data2["date"] == current_date]
    current_hold_instruments = set(context.get_account_positions().keys())
    today_df = context.data[context.data["date"] == data.current_dt.strftime("%Y-%m-%d")]

    current_date2 = data.current_dt.date()  # 直接获取datetime.date类型

    process_cooldown(context, current_date2)


# 初始化或更新暂停交易的天数变量
    if not hasattr(context, "pause_trading_days"):
        context.pause_trading_days = 0

# 获取今天是否大盘风控的信号
    current_day_market_risk_indicator = current_day_data2["market_risk_indicator"].iloc[0]


# 检查当前是否在暂停交易天数内
    if context.pause_trading_days > 0:
    # 减少暂停交易天数
        context.pause_trading_days -= 1
    # 暂停交易期间不做任何操作
        return
    

# 如果今天大盘风控
    if current_day_market_risk_indicator != 0:
    # 则卖出仓内所有股票
        for ins in current_hold_instruments:
            context.order_target_percent(ins, 0)
    # 设置暂停交易两天
        context.pause_trading_days = 1
    # 且今天不交易了
        return

    
    # 从传入的数据 context.data 中读取今天的信号数据
    
    target_instruments = set(today_df["instrument"])
    
    
    # 获取持仓成本价(需在开仓时记录)
    
    for instrument in context.get_account_positions():
        current_price = data.current(instrument, 'close')  # 使用收盘价更准确
        cost_price = context.get_position(instrument).cost_price
        loss_ratio = (cost_price - current_price) / cost_price
        
        if loss_ratio > 0.1:
             context.order_target_percent(instrument, 0)
    # 冻结14个交易日(约2周)
             unfreeze_date = current_date2 + timedelta(days=14)  # 自然日近似替代交易日
             context.cooldown_pool[instrument] = unfreeze_date
    
    filtered_target_instruments = target_instruments - set(context.cooldown_pool.keys())

    filtered_today_df = today_df[today_df['instrument'].isin(filtered_target_instruments)]
    

    # 下一个交易日不是调仓日,则不生成信号
    if not context.rebalance_period.is_signal_date(data.current_dt.date()):
        return

    # 获取当前已持有股票
    holding_instruments = set(context.get_account_positions().keys())
    # 卖出不在目标持有列表中的股票
    for instrument in holding_instruments - filtered_target_instruments:
        context.order_target_percent(instrument, 0)
    # 买入目标持有列表中的股票
    for i, x in filtered_today_df.iterrows():
        # 处理 null 或者 decimal.Decimal 类型等
        position = 0.0 if pd.isnull(x.position) else float(x.position)
        context.order_target_percent(x.instrument, position)

\

标签

交易引擎
{link}