
量化投資與機器學習微信公眾號,是業內垂直於量化投資、對沖基金、Fintech、人工智慧、大資料等領域的主流自媒體。公眾號擁有來自公募、私募、券商、期貨、銀行、保險、高校等行業30W+關注者,曾榮獲AMMA優秀品牌力、優秀洞察力大獎,連續4年被騰訊雲+社群評選為“年度最佳作者”。
預定系列(點選可閱讀)
-
-
-
-
-
-
-
-
-
Backtrader 常見問題彙總(今日)
同時,我們對每段程式碼都做了解讀說明,願你在Quant的道路上學有所獲!
QIML官方Github也已上線
相關資料、程式碼一併同步
https://github.com/QuantWorld2022/backtrader
希望大家多Follow,多給星★

import
datetime
as
dt
import
backtrader
as
bt
from
backtrader
import
DataBase, date2num
classPsqlDatabase(DataBase):
'''
預設資料庫表格欄位如下:
ticker char(5),
date date,
high numeric(10,4),
low numeric(10,4),
open numeric(10,4),
close numeric(10,4),
volume integer,
unique (ticker, date)
'''
params = (
# 資料庫連線資訊
(
'user'
,
None
),
(
'password'
,
None
),
(
'host'
,
None
),
(
'port'
,
None
),
(
'dbname'
,
None
),
(
'table'
,
None
),
# 證券資訊
(
'ticker'
,
None
),
# 要提取的證券程式碼
(
'fromdate'
,
None
),
# 提取資料的起始時間(包含)
(
'todate'
,
None
),
# 提取資料的截止時間(包含)
# 每條線對應的提取出來的資料的列索引
(
'datetime'
,
0
),
(
'high'
,
1
),
(
'low'
,
2
),
(
'open'
,
3
),
(
'close'
,
4
),
(
'volume'
,
5
),
(
'openinterest'
,
-1
),
# -1 表示不存在該列資料
)
defstart(self):
conn = self._connect_db()
query = (
"""SELECT date, high, low, open, close, volume """
"""FROM {table} """
"""WHERE ticker = '{ticker}' """
.format(table=self.p.table,
ticker=self.p.ticker))
if
self.p.fromdate
isnotNone
:
query +=
" AND date >= '{fromdate}' "
.format(fromdate=dt.datetime.strftime(self.p.fromdate,
'%Y-%m-%d'
))
if
self.p.todate
isnotNone
:
query +=
" AND date <= '{todate}' "
.format(todate=dt.datetime.strftime(self.p.fromdate,
'%Y-%m-%d'
))
query +=
"""ORDER BY date asc"""
self.result = conn.execute(query)
self.price_rows = self.result.fetchall()
self.result.close()
self.price_i =
0
super(PsqlDatabase, self).start()
def_load(self):
if
self.price_i >= len(self.price_rows):
returnFalse
# 每迴圈一次_load(),填充一個 bar 的資料
row = self.price_rows[self.price_i]
self.price_i +=
1
for
datafield
in
self.getlinealiases():
# 檢視 Data Feeds 包含哪些線
if
datafield ==
'datetime'
:
self.lines.datetime[
0
] = date2num(row[self.p.datetime])
elif
datafield ==
'volume'
:
self.lines.volume[
0
] = row[self.p.volume]
else
:
colidx = getattr(self.params, datafield)
# 獲取列索引
if
colidx <
0
:
# 列索引小於0,表示不存在該列
continue
line = getattr(self.lines, datafield)
# 將資料賦值給對應的線
line[
0
] = float(row[colidx])
returnTrue
# 設定資料庫連線邏輯
def_connect_db(self):
from
sqlalchemy
import
create_engine
url =
'mysql+mysqldb://{user}:{password}@{host}:{port}/{dbname}'
.format(user=self.p.user,
password=self.p.password,
host=self.p.host,
port = self.p.port,
dbname=self.p.dbname)
engine = create_engine(url, echo=
False
)
conn = engine.connect()
return
conn
defpreload(self):
# 負責迴圈呼叫load()(_load()是被 load() 呼叫的)
super(PsqlDatabase, self).preload()
# self.price_rows 的資料都存入lines後,清除 self.price_rows 中的資料,釋放資源
self.price_rows =
None
cerebro = bt.Cerebro()
# 呼叫 MysqlData 類,得到例項
data = PsqlDatabase(user=
'xxxxx'
,
password=
'xxxx'
,
host=
'xxx'
,
port=
'xxxx'
,
dbname=
'xxxx'
,
table=
'xxxx'
,
ticker=
'xxxxx'
,
fromdate=
'xxxxx'
,
todate=
'xxxxx'
)
cerebro.adddata(data, name=
'xxxx'
)
# 將資料傳給大腦
-
params 屬性對應的是載入資料時涉及的各種引數,主要是新增了一部分和資料庫有關的資訊,7 條基礎 lines 的索引需要與 sql 語句中欄位的順序相一致;
-
start() 方法用於啟動資料載入,連線資料庫、從資料庫中讀取資料等操作邏輯會寫在該方法中;
-
stop() 方法用於關閉資料載入,斷開資料庫連線的操作邏輯可以寫在該方法中(上例未涉及stop());
-
_load() 方法負責將載入的資料,一個個賦值給 7 條基礎 lines,直到所有資料都已填充進 lines 為止(返回 False);
-
preload() 方法負責不斷的迴圈呼叫 load()(_load()是被 load() 呼叫的)直到下載完所有資料;
-
上面這些方法都是底層 DataBase 類中的方法,想要具體瞭解可以看底層程式碼 backtrader/feed.py at master · mementum/backtrader (github.com);
-
上面這個案例參考的 Github 中的 PSQL feed implementation by dolanwill · Pull Request #393 · mementum/backtrader (github.com),以及 Backtrader 社群中的討論 SQLite example | Backtrader Community;
-
Backtrader 的 DataFeeds 資料模組提供的 InfluxDB 類也是類似的實現邏輯:backtrader/influxfeed.py at master · mementum/backtrader (github.com);
-
如果想連線不同的資料庫,只需修改資料庫連線方法 _connect_db()、start() 中的查詢語句等邏輯即可。
.
..
params = (
# Possible values for datetime (must always be present)
# None : datetime is the "index" in the Pandas Dataframe
# -1 : autodetect position or case-wise equal name
# >= 0 : numeric index to the colum in the pandas dataframe
# string : column name (as index) in the pandas dataframe
(
'datetime'
,
None
),
...
# PandasData 預設是將 DataFrame 的索引作為 datetime
# 如果你已經將 datetime 設定為 index ,可以直接用下面的語句匯入資料:
data = bt.feeds.PandasData(dataname=price)
# 如果 datetime 只是 DataFrame 中的一列,且列名稱也一致(不區分大小寫),則需要設定引數:
data = bt.feeds.PandasData(dataname=price, datetime=
-1
)
# 或是指定 datetime 在第幾列,比如在 DataFrame 的第 7 列,則令 datetime=6
data = bt.feeds.PandasData(dataname=price, datetime=
6
)
...
# 新增 PyFolio 分析器
cerebro.addanalyzer(bt.analyzers.PyFolio, _name=
'pyfolio'
)
...
results = cerebro.run()
strat = results[
0
]
# 一次性獲取 4 個子分析器的計算結果
pyfoliozer = strat.analyzers.getbyname(
'pyfolio'
)
returns, positions, transactions, gross_lev = pyfoliozer.get_pf_items()
...
...
# 利用 Quantoption 的 Pyfolio 模組來繪製圖形
# 需要提前安裝好該模組 pip install pyfolio==0.5.1
import
pyfolio
as
pf
pf.create_full_tear_sheet(
returns,
positions=positions,
transactions=transactions,
gross_lev=gross_lev,
live_start_date=
'2005-05-01'
,
# This date is sample specific
round_trips=
True
)
As of (at least) 2017-07-25 the pyfolio APIs have changed and create_full_tear_sheet no longer has a gross_lev as a named argument.
import
pyfolio
as
pf
fig = pf.create_full_tear_sheet(
returns,
positions=positions,
transactions=transactions,
# gross_lev=gross_lev,
live_start_date=
'2020-05-01'
,
round_trips=
False
,
return_fig =
True# 後期用於儲存
)
# fig.savefig('returns_tear_sheet.pdf')
pip uninstall pyfolio
pip install git+https://github.com/quantopian/pyfolio
-
一種是透過 bt.analyzers.TimeReturn 返回業績基準的收益率,在此之前,需要確保已經將業績基準的行情資料adddata給大腦,還要給 bt.analyzers.TimeReturn 指定 data 引數;
-
另一種是透過 bt.observers.Benchmark 新增業績基準的觀測器,plot繪圖時展示的收益率曲線就是 bt.analyzers.TimeReturn 返回的收益率。
# 例項化大腦
cerebro = bt.Cerebro()
# 初始資金 1,000,000
cerebro.broker.setcash(
1000000.0
)
# 讀取行情資料
daily_price = pd.read_csv(
"./data/daily_price.csv"
, parse_dates=[
'datetime'
])
stock_price = daily_price.query(
f"sec_code=='600718.SH'"
).set_index(
'datetime'
)
datafeed1 = bt.feeds.PandasData(dataname=stock_price,
fromdate=pd.to_datetime(
'2019-01-02'
),
todate=pd.to_datetime(
'2021-01-28'
))
cerebro.adddata(datafeed1, name=
'600718.SH'
)
benchmark_price = daily_price.query(
f"sec_code=='600728.SH'"
).set_index(
'datetime'
)
datafeed2 = bt.feeds.PandasData(dataname=benchmark_price,
fromdate=pd.to_datetime(
'2019-01-02'
),
todate=pd.to_datetime(
'2021-01-28'
),
)
cerebro.adddata(datafeed2, name=
'600728.SH'
)
# 將編寫的策略新增給大腦,別忘了 !
cerebro.addstrategy(TestStrategy)
cerebro.addanalyzer(bt.analyzers.TimeReturn,_name=
'stock_returns'
)
# 返回 benchmark 的收益率
cerebro.addanalyzer(bt.analyzers.TimeReturn, data=datafeed2, _name=
'benchmark_returns'
)
# 新增業績基準的觀測器
cerebro.addobserver(bt.observers.Benchmark, data=datafeed2)
cerebro.addobserver(bt.observers.TimeReturn)
result = cerebro.run()
cerebro.plot(iplot=
True
)

classCommInfoFractional(bt.CommissionInfo):
defgetsize(self, price, cash):
'''Returns fractional size for cash operation @price'''
return
self.p.leverage * (cash / price)
# 然後透過 addcommissioninfo 將設定傳遞給 broker
cerebro.broker.addcommissioninfo(CommInfoFractional())
# 預設情況下的 getsize 的定義如下,只需
defgetsize(self, price, cash):
'''Returns the needed size to meet a cash operation at a given price'''
ifnot
self._stocklike:
return
int(self.p.leverage * (cash // self.get_margin(price)))
return
int(self.p.leverage * (cash // price)
Backtrader系列已全部更新完畢。
QIML會在今後的日子裡,在全網釋出一系列好用、實用、你絕對愛不釋手的量化開源工具包!
公眾號希望給國內量化投資圈貢獻一份自己的力量!
希望影響更多人瞭解量化、學習量化、找到屬於一條屬於自己的路!
如果你希望我們分享些什麼,也歡迎在評論區留言~
