import backtrader as btimport datetimeimport pandas as pdimport numpy as npimport os,sysimport copyimport talibimport math import warnings warnings.filterwarnings("ignore")import pyfolio as pf# 我们使用的时候,直接用我们新的类读取数据就可以了。class TestSignalStrategy(bt.Strategy): params = (('period',30), ('hold_percent',0.02) ) def log(self, txt, dt=None): ''' Logging function fot this strategy''' dt = dt or self.datas[0].datetime.date(0) print('{}, {}'.format(dt.isoformat(), txt)) def __init__(self): # Keep a reference to the "close" line in the data[0] dataseries self.bar_num=0 # 读取交易信号 self.signal_df = pd.read_csv("/home/yun/data/交易信号.csv") self.signal_df['date']=pd.to_datetime(self.signal_df['date']) # 交易的初始状态 self.first_trade = True def prenext(self): self.next() def next(self): # 假设有100万资金,每次成份股调整,每个股票使用1万元 self.bar_num+=1 # 前一交易日和当前的交易日 current_date = self.datas[0].datetime.date(0).strftime("%Y-%m-%d") # 获取下一个交易日,历史数据的最后一个bar没有下个交易日,是None try: next_date = self.datas[0].datetime.date(1) except: next_date = None if next_date is not None: # 获取当前的信号信息 next_signal_df = self.signal_df[self.signal_df['date']==pd.to_datetime(next_date)] if len(next_signal_df)==0: self.log("下个交易日的信号不存在") else: signal = int(next_signal_df['signal']) close_price = float(next_signal_df['close']) rate = float(next_signal_df['rate']) # 如果是第一次交易,由于没有底仓,只允许做多 if self.first_trade: # 如果是做空信号,忽略 if signal==-1: pass # 做多信号 if signal == 1: total_value = self.broker.get_value() next_open_price = self.datas[0].open[1] # 45%仓位 target_size = (0.01*0.45*total_value/next_open_price )*100 # 下单 self.buy(self.datas[0],size = target_size) # 接下来就不是第一次交易了 self.first_trade = False else: # 现有持仓 now_hold_size = self.getposition(self.datas[0]).size # 做多,持仓会变化 if signal == 1: total_value = self.broker.get_value() next_open_price = self.datas[0].open[1] # 45%仓位 target_size = (0.01*0.45*total_value/next_open_price )*100 # 下单开仓 self.buy(self.datas[0],size = target_size) # 同时下单平仓 self.sell(self.datas[0],size = now_hold_size,exectype=bt.Order.Limit, price=close_price*(1+rate)) # 做空,底仓不变 if signal == -1: total_value = self.broker.get_value() next_open_price = self.datas[0].open[1] # 50%仓位 target_size = (0.01*0.45*total_value/next_open_price )*100 # 卖出开仓,手数不能超过底仓 if target_size>now_hold_size: target_size = now_hold_size self.sell(self.datas[0],size = target_size) # 同时下单平仓 self.buy(self.datas[0],size = now_hold_size,exectype=bt.Order.Limit, price=close_price*(1+rate)) def notify_order(self, order): if order.status in [order.Submitted, order.Accepted]: return if order.status == order.Rejected: self.log(f"Rejected : order_ref:{order.ref} data_name:{order.p.data._name}") if order.status == order.Margin: self.log(f"Margin : order_ref:{order.ref} data_name:{order.p.data._name}") if order.status == order.Cancelled: self.log(f"Concelled : order_ref:{order.ref} data_name:{order.p.data._name}") if order.status == order.Partial: self.log(f"Partial : order_ref:{order.ref} data_name:{order.p.data._name}") if order.status == order.Completed: if order.isbuy(): self.log(f" BUY : data_name:{order.p.data._name} price : {order.executed.price} , cost : {order.executed.value} , commission : {order.executed.comm}") else: # Sell self.log(f" SELL : data_name:{order.p.data._name} price : {order.executed.price} , cost : {order.executed.value} , commission : {order.executed.comm}") def notify_trade(self, trade): # 一个trade结束的时候输出信息 if trade.isclosed: self.log('closed symbol is : {} , total_profit : {} , net_profit : {}' .format( trade.getdataname(),trade.pnl, trade.pnlcomm)) # self.trade_list.append([self.datas[0].datetime.date(0),trade.getdataname(),trade.pnl,trade.pnlcomm]) if trade.isopen: self.log('open symbol is : {} , price : {} ' .format( trade.getdataname(),trade.price)) def stop(self): pass # 初始化cerebro,获得一个实例cerebro = bt.Cerebro()# cerebro.broker = bt.brokers.BackBroker(shortcash=True) # 0.5%data_root = "/home/yun/data/stock/day/"file_list =sorted(os.listdir(data_root))params=dict( fromdate = datetime.datetime(2016,1,1), todate = datetime.datetime(2019,12,31), timeframe = bt.TimeFrame.Days, dtformat = ("%Y-%m-%d"), # compression = 1, datetime = 0, open = 1, high = 2, low =3, close =4, volume =5, openinterest=-1)# 加载指数数据df = pd.read_csv("/home/yun/data/stock/day/600000.XSHG.csv")df.columns = ['datetime','open','high','low','close','volume','openinterest']df.index = pd.to_datetime(df['datetime'])df = df[['open','high','low','close','volume','openinterest']]df = df[(df.index<=params['todate'])&(df.index>=params['fromdate'])]# feed = bt.feeds.GenericCSVData(dataname = "/home/yun/data/stock/index.csv",**params)feed = bt.feeds.PandasDirectData(dataname = df)# 添加数据到cerebrocerebro.adddata(feed, name = '600000')# 添加手续费,按照万分之二收取cerebro.broker.setcommission(commission=0.0002,stocklike=True)# 设置初始资金为100万cerebro.broker.setcash(1_0000_0000)# 添加策略cerebro.addstrategy(TestSignalStrategy)cerebro.addanalyzer(bt.analyzers.PyFolio)# 运行回测results = cerebro.run()# 打印相关信息pyfoliozer = results[0].analyzers.getbyname('pyfolio')returns, positions, transactions, gross_lev = pyfoliozer.get_pf_items()pf.create_full_tear_sheet( returns, positions=positions, transactions=transactions, # gross_lev=gross_lev, live_start_date='2019-01-01', )
【答读者问15】backtrader如何使用其他软件产生的交易信号做回测?
作者:yunjinqi
类别:量化框架
日期:2021-12-23 17:52:45
阅读:2726 次
消耗积分:0 分
版权所有,转载本站文章请注明出处:云子量化, https://www.yunjinqi.top/article/44
最新文章
系统当前共有 404 篇文章