【答读者问23】计算指标的时候是直接使用pandas计算好指标加载进去速度快,还是在backtrader中计算指标速度快?(2021-11-17更新,修复pandas增加列添加问题)
作者:yunjinqi   类别:    日期:2021-12-23 18:06:37    阅读:1686 次   消耗积分:0 分    

如题所示:想要验证下,是直接通过backtrader计算所需要的指标速度快,还是通过pandas计算好指标,通过扩展数据加载进去速度快,废话不多说,直接说下验证的思路,罗列验证的代码,给出测试的结果。

详细的验证步骤

  1. 生成一个1千,1万,10万,100万的随机数,保存本地,形成csv文件。

  2. 然后分别使用两种方法读取,计算指标

    • 方法一:使用pandas读取数据,并计算10周期和20周期的均线,分别保存为两个新的列:“short_ma"和”long_ma",然后加载到cerebro中,写策略的时候,就不再计算均线

    • 方法二:使用pandas数据直接加载到cerebro中,然后在策略里面计算10周期和20周期的均线,分别命名为“short_ma"和"long_ma"

  3. 测试在不同的数据量级别下使用的时间,然后画图。

测试代码

import numpy as np
import pandas as pd
import random 
import datetime
import backtrader as bt 
def generate_random_n_bar_df(n):
    start_datetime = datetime.datetime(1990,1,1,9,0,0)
    # bar的数据和时间都是乱生成的,估计没有那种行情是这种,但是应该是不影响测试结果的可靠性
    result=[[random.random(),random.random(),random.random(),random.random(),random.random(),random.random()] for i in range(n)]
    result_df = pd.DataFrame(result,columns=['open',"high","low","close","volume","openinterest"])
    result_df.index=pd.to_datetime([start_datetime+datetime.timedelta(seconds=i) for i in list(range(n))])
    return result_df
# 从1000到100万的bar的数目模拟生成
for n in [1000,10000,100000,1000000]:
    data = generate_random_n_bar_df(n)
    data.to_csv(f"data_{n}.csv")
    print(f"{n}个bar的模拟数据成功保存到工作目录")


class DirectStrategy(bt.Strategy):
    
    # params = (('short_window',10),('long_window',60))
    params = {"short_window":10,"long_window":20}
 
    def log(self, txt, dt=None):
        ''' log信息的功能'''
        dt = dt or bt.num2date(self.datas[0].datetime[0])
        print('%s, %s' % (dt.isoformat(), txt))
 
    def __init__(self):
        # 一般用于计算指标或者预先加载数据,定义变量使用
        self.short_ma = bt.indicators.SMA(self.datas[0].close,period=self.p.short_window)
        self.long_ma = bt.indicators.SMA(self.datas[0].close,period=self.p.long_window)
 
    def next(self):
        # Simply log the closing price of the series from the reference
        # self.log(f"工商银行,{self.datas[0].datetime.date(0)},收盘价为:{self.datas[0].close[0]}")
        # self.log(f"short_ma:{self.short_ma[0]},long_ma:{self.long_ma[0]}")
        # 得到当前的size
        data = self.datas[0]
        # self.log(f"close:{self.datas[0].close[0]},short_ma:{self.short_ma[0]},long_ma:{self.long_ma[0]}")
            
#     def notify_order(self, order):
#         if order.status in [order.Submitted, order.Accepted]:
#             # order被提交和接受
#             return
#         if order.status == order.Rejected:
#             self.log(f"order is rejected : order_ref:{order.ref}  order_info:{order.info}")
#         if order.status == order.Margin:
#             self.log(f"order need more margin : order_ref:{order.ref}  order_info:{order.info}")
#         if order.status == order.Cancelled:
#             self.log(f"order is concelled : order_ref:{order.ref}  order_info:{order.info}")
#         if order.status == order.Partial:
#             self.log(f"order is partial : order_ref:{order.ref}  order_info:{order.info}")
#         # Check if an order has been completed
#         # Attention: broker could reject order if not enougth cash
#         if order.status == order.Completed:
#             if order.isbuy():
#                 self.log("buy result : buy_price : {} , buy_cost : {} , commission : {}".format(
#                             order.executed.price,order.executed.value,order.executed.comm))
                
#             else:  # Sell
#                 self.log("sell result : sell_price : {} , sell_cost : {} , commission : {}".format(
#                             order.executed.price,order.executed.value,order.executed.comm))
    
#     def notify_trade(self, trade):
#         # 一个trade结束的时候输出信息
#         if trade.isclosed:
#             self.log('closed symbol is : {} , total_profit : {} , net_profit : {}' .format(
#                             trade.getdataname(),trade.pnl, trade.pnlcomm))
#         if trade.isopen:
#             self.log('open symbol is : {} , price : {} ' .format(
#                             trade.getdataname(),trade.price))
 
    
class NotDirectStrategy(bt.Strategy):
    
    # params = (('short_window',10),('long_window',60))
    params = {"short_window":10,"long_window":20}
 
    def log(self, txt, dt=None):
        ''' log信息的功能'''
        dt = dt or bt.num2date(self.datas[0].datetime[0])
        print('%s, %s' % (dt.isoformat(), txt))
 
    def __init__(self):
        # 一般用于计算指标或者预先加载数据,定义变量使用
        pass
 
    def next(self):
        # Simply log the closing price of the series from the reference
        # self.log(f"工商银行,{self.datas[0].datetime.date(0)},收盘价为:{self.datas[0].close[0]}")
        # self.log(f"short_ma:{self.short_ma[0]},long_ma:{self.long_ma[0]}")
        # 得到当前的size
        data = self.datas[0]
        # self.log(f"close:{data.close[0]},short_ma:{data.short_ma[0]},long_ma:{data.long_ma[0]}")
            
#     def notify_order(self, order):
#         if order.status in [order.Submitted, order.Accepted]:
#             # order被提交和接受
#             return
#         if order.status == order.Rejected:
#             self.log(f"order is rejected : order_ref:{order.ref}  order_info:{order.info}")
#         if order.status == order.Margin:
#             self.log(f"order need more margin : order_ref:{order.ref}  order_info:{order.info}")
#         if order.status == order.Cancelled:
#             self.log(f"order is concelled : order_ref:{order.ref}  order_info:{order.info}")
#         if order.status == order.Partial:
#             self.log(f"order is partial : order_ref:{order.ref}  order_info:{order.info}")
#         # Check if an order has been completed
#         # Attention: broker could reject order if not enougth cash
#         if order.status == order.Completed:
#             if order.isbuy():
#                 self.log("buy result : buy_price : {} , buy_cost : {} , commission : {}".format(
#                             order.executed.price,order.executed.value,order.executed.comm))
                
#             else:  # Sell
#                 self.log("sell result : sell_price : {} , sell_cost : {} , commission : {}".format(
#                             order.executed.price,order.executed.value,order.executed.comm))
    
#     def notify_trade(self, trade):
#         # 一个trade结束的时候输出信息
#         if trade.isclosed:
#             self.log('closed symbol is : {} , total_profit : {} , net_profit : {}' .format(
#                             trade.getdataname(),trade.pnl, trade.pnlcomm))
#         if trade.isopen:
#             self.log('open symbol is : {} , price : {} ' .format(
#                             trade.getdataname(),trade.price))

def run_direct_data(n):
    data_name = f"data_{n}.csv"  
    df = pd.read_csv(data_name,index_col=0)
    df.index = pd.to_datetime(df.index)
    datetime_list = list(df.index)
    # 添加cerebro
    cerebro = bt.Cerebro()
    # 添加策略
    cerebro.addstrategy(DirectStrategy)
    # 准备数据        
    params = dict(
                    fromdate = datetime_list[0],
                    todate = datetime_list[-1],
                    timeframe = bt.TimeFrame.Minutes,
                    compression = 1,
                    dtformat=('%Y-%m-%d %H:%M:%S'), # 日期和时间格式
                    tmformat=('%H:%M:%S'), # 时间格式
                    )
   
    feed =  bt.feeds.PandasDirectData(dataname=df,**params)
    # 添加合约数据
    cerebro.adddata(feed, name = "gsyh")
    cerebro.broker.setcommission(commission=0.0005)

    # 添加资金
    cerebro.broker.setcash(100000.0)

    # 开始运行
    cerebro.run()
    


class ExtendPandasFeed(bt.feeds.PandasDirectData):

    params = (
        ('datetime', 0),
        ('open', 1),
        ('high', 2),
        ('low', 3),
        ('close', 4),
        ('volume', 5),
        ('openinterest', 6),
        ("short_ma",7),
        ("long_ma",8)
        )
	lines = ("short_ma","long_ma",)
    datafields = [
        'datetime', 'open', 'high', 'low', 'close', 'volume', 'openinterest',"short_ma","long_ma"
    ]

def run_not_direct_data(n):
    data_name = f"data_{n}.csv"  
    df = pd.read_csv(data_name,index_col=0)
    df.index = pd.to_datetime(df.index)
    # 计算指标
    df['short_ma']=df['close'].rolling(10).mean()
    df['long_ma']=df['close'].rolling(20).mean()
    datetime_list = list(df.index)
    # 添加cerebro
    cerebro = bt.Cerebro()
    # 添加策略
    cerebro.addstrategy(NotDirectStrategy)
    # 准备数据        
    params = dict(
                    fromdate = datetime_list[0],
                    todate = datetime_list[-1],
                    timeframe = bt.TimeFrame.Minutes,
                    compression = 1,
                    
                    )
   
    feed =  ExtendPandasFeed(dataname=df,**params)
    # 添加合约数据
    cerebro.adddata(feed, name = "gsyh")
    cerebro.broker.setcommission(commission=0.0005)

    # 添加资金
    cerebro.broker.setcash(100000.0)

    # 开始运行
    cerebro.run()
# 对比了一下,两个输出的结果是一样的。
# run_not_direct_data(1000)   
# run_direct_data(1000)
# 统计两种方法需要的时间
direct_time_list =[]
not_direct_time_list =[]
bar_num_list = [1000,10000,100000,1000000]
for bar_num in bar_num_list:
    begin_time = datetime.datetime.now()
    run_direct_data(bar_num)
    end_time = datetime.datetime.now()
    consume_time = (end_time-begin_time).seconds
    direct_time_list.append(consume_time)
    begin_time = datetime.datetime.now()
    run_not_direct_data(bar_num)
    end_time = datetime.datetime.now()
    consume_time = (end_time-begin_time).seconds
    not_direct_time_list.append(consume_time)
# 画出相关的图
data = [
    
    go.Scatter(
        x=bar_num_list,
        y=not_direct_time_list,
        name = '提前使用pandas计算指标消耗的时间'
    ),
     go.Scatter(
        x=bar_num_list,
        y=direct_time_list,
        name = '在backtrader中计算指标消耗的时间'
    )
]
 
layout = go.Layout(
    title = '随着K线数目增加,两种计算指标的方式消耗的时间'
)
 
fig = go.Figure(data = data)
# 步骤四
fig.update_layout(
  title= '随着K线数目增加,两种计算指标的方式消耗的时间',
  xaxis_title="bar_num",
  yaxis_title="消耗时间(s)",
  xaxis = {"type":"log"}
)
fig.show()

测试结果

在这里插入图片描述

结果分析

使用pandas计算完指标,加载到cerebro之中,居然比在backtrader中直接计算指标消耗的时间多,我推测的要不然是一样,要不然就是使用pandas计算更优,结果和我想的不一样。可能原因在于我们扩展数据的时候是直接继承的PandasDirectData,没有做特别的优化。

理论上来说,如果backtrader计算指标的时候采用的是向量式的,那么两者的时间应该比较接近。如果backtrader使用的是事件驱动式来计算指标,那么backtrader应该消耗的时间更多一些。

backtrader计算指标即可以采用向量式的,也可以采用事件驱动式的,所以具体情形具体分析吧。

如果有很多指标需要计算,但是在回测的时候,这些指标又可以保持不变,那么使用pandas计算之后,使用pickle等序列化的工具保存到本地,然后每次需要的时候从pickle直接读取,会比从pandas读取重新计算指标快很多。


智慧、心灵、财富,总要有一个在路上,愿我们能在人生的道路上,不断成长、不断成熟~~~

感兴趣可以关注我的专栏:

my_quant_study_note:分享一些关于量化投资、量化交易相关的思考

backtrader量化投资回测与交易:本专栏免费,分享backtrader相关的内容。

量化投资神器-backtrader源码解析-从入门到精通:本专栏目前收费299元,预计更新100篇策略(更新中)+36篇backtrader讲解(已完成)+backtrader源码分析。


版权所有,转载本站文章请注明出处:云子量化, http://www.woniunote.com/article/52
上一篇:【答读者问22】如何用python爬取期货合约的保证金及手续费【网络爬虫】
下一篇:【答读者问24】作为一个大一新生,如果我想要成为一个quant,我需要做些什么呢?