Back testing system
File structure:
Root
├── docker-compose.yml
|
├── docker_backtrader
│ ├── Dockefile
│ ├── requirements.txt
│ ├── backtest
│ │ ├── Strategies
│ │ │ ├── . . . .
│ │ ├── backtest.py
│ │ ├── GetDataFromMySQL.py
│ │ ├── Tables.py
│
│
├── grafana
│ ├── Dockerfile
| ├── grafana.ini
| ├── provisioning
│ │ ├── dashboards
| | | ├── dashboard.yml
│ │ ├── datasources
| | | ├── datasource.yml
| |
| ├── dashboards
|
├── Airflow
| ├── config
| ├── dags
| ├── logs
| ├── plugins
|
├── docker_airflow
| ├── Dockerfile
| ├── requirements.txt
|
├── Mongo
| ├── data
|
├── MySql
| ├── . . . .
|
├── .env
├── data
Services
in this structure we have:
services:
postgres:
container_name: postgres_MyProject
image: postgres:13
....
....
redis:
container_name: redis_MyProject
image: redis:7
....
....
airflow-webserver:
container_name: airflow_webserver_MyProject
....
....
airflow-scheduler:
container_name: airflow_scheduler_MyProject
....
....
airflow-worker:
container_name: airflow_worker_MyProject
....
....
airflow-triggerer:
container_name: airflow_triggerer_MyProject
build:
context: ./docker_airflow
....
....
# This image use in first time
airflow-init:
container_name: airflow_init_MyProject
....
....
airflow-cli:
container_name: airflow_cli_MyProject
....
....
flower:
container_name: flower_MyProject
....
....
db-mongo:
container_name: mongoDatabase_MyProject
image: mongo:4.4
....
....
mongo-express:
container_name: mongo_express_MyProject
image: mongo-express
....
....
db-mysql:
image: mysql:8.0
container_name: mysql_MyProject
....
....
phpmyadmin:
container_name: phpmyadmin_MyProject
depends_on:
- db-mysql
image: phpmyadmin
....
....
grafana:
container_name: grafana_MyProject
build:
context: ./grafana
....
....
backtrader:
container_name: backtrader_MyProject
build:
context: ./docker_backtrader
....
....
volumes:
postgres-db-volume:
MySql_db:
grafana-data:
networks:
main_networks:
Step 1:
Clone a repository:
$ git clone https://github.com/sajadtaj/Docker_Airflow_Mongo_MySql_Grafana.git
Step 2:
Create some directories:
mkdir -p ./Airflow/dags ./Airflow/logs ./Airflow/plugins ./Airflow/config
echo -e "AIRFLOW_UID=$(id -u)" > .env
Step 3:
Change All Container name:
All container name finish by word _MyProject , Drag this word and change this for total project
Step 4:
in this project exist some password that better change (in services):
Step 5:
Initialize the database :
docker compose up airflow-init
unfortunately Airflow dosn`t Update SqlAlchemy, then you must be careful in use sqlAlchemy class, maybe airflow dont support it:
# from sqlalchemy.orm import DeclarativeBase
# class Base(DeclarativeBase):
# pass
# in Airflow you must use:
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class BTC_USD(Base):
__tablename__ = 'BTC_USD'
id = Column(Integer, primary_key=True)
Open = Column(Float)
High = Column(Float)
Low = Column(Float)
Close = Column(Float)
AdjClose = Column(Float)
Volume = Column(BIGINT)
date = Column(DateTime)
def __repr__(self):
return f"< BTC_USD(date='{self.date}', Close='{self.Close}')>"
Step 6:
Set All Services in same Network: main_networks
Step 7:
if you cont clone the repo,please copy a Grafana Folder to the root of project
Step 8:
if You needs install some python packages for function in Dags (AirFlow) set the name of library in :
Root > docker_airflow > requirements.txt
and in docker-compose.yml set this address:
airflow-triggerer:
container_name: airflow_triggerer_MyProject
build:
context: ./docker_airflow #Your Address
Step 9:
It is so better define tags for each images in docker-compose.
Step 10:
change the database name in (Optional):
领英推荐
db-mysql:
image: mysql:8.0
container_name: mysql_MyProject
environment:
- MYSQL_ROOT_PASSWORD=test
- MYSQL_DATABASE=testDB
- MYSQL_PASSWORD=test
Step 11:
in Grafana For Set MySql Database to your Dashboards use below address :
For connect Grafana to MySql :> datasourcesSet : host.docker.internal:3306
Step 12:
if clone this project for initial first database:
???????? , ??????_??????, ????????, ????????
go to Airflow > dags > Run nitialTables.py to define and create first databases
After run nitialTables.py ,tables (META , BTC_USD, AMZN, DELL) are created and fill by data from start=`2015-01-01`, end=`2024-01-01`. Dont worry,data Update automaticly done by (Airflow) Update_BTCUSD_Dag.py for first time this dag crawl all data between end=2024-01-01 and Today, and after this every day update by new data.we have :
#python
from .Update_Moduls import update_data_from_yfinance # For more Detail see Update_Moduls
from .InitialTables import Table_Name # For more Detail see InitialTables
#!------------------+
#! Define Updater |
#!------------------+
def Updater(**kwargs):
for key in Table_Name:
print(key,Table_Name[key])
update_data_from_yfinance(symbol=key,Model_Name=Table_Name[key])
#!------------------+
#! Define DAG |
#!------------------+
# Define DAG parameters
default_args = {
'owner': 'Xirano', # Replace with your email
}
# Set DAG ID and schedule
dag = DAG(
dag_id = 'UpdateEveryDay',
default_args = default_args,
start_date = days_ago(1), # Adjust as needed
schedule_interval="@daily", # Base for cron expression
)
task = PythonOperator(
task_id='Update Stock table',
provide_context=True, # Pass context to function if needed
python_callable=Updater,
dag=dag,
)
Step 13:
├── docker_backtrader
│ ├── Dockefile
│ ├── requirements.txt
│ ├── backtest
│ │ ├── Strategies
│ │ │ ├── . . . . # Define All your Strategies Here
│ │ ├── backtest.py
│ │ ├── GetDataFromMySQL.py
│ │ ├── Tables.py
In This Step 13 Describe The Backtrader Structure:
1. Docker Compose Configuration (`docker-compose.yml`):
# Docker-compose.yml
services:
backtrader:
build: ./docker_backtrader
volumes: # Optional, if you need to mount host directories
- ./data:/app/data # Example: Mount host's "data" folder to container's "app/data"
2. Building the Backtrader Docker Image (Dockerfile):
# Dockerfile
FROM python:3.9
LABEL builder = "Xirano"
LABEL maintainer = "[email protected]"
LABEL version = "1.00"
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY /backtest .
CMD ["python", "backtest.py"]
Create a requirements.txt file in docker_backtrader listing required libraries :
3. Implementing Data Retrieval with SQLAlchemy (docker_backtrader/backtest/GetDataFromMySQL.py ):
#python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import datetime
import backtrader as bt
import pandas as pd
from Tables import Table_Name
def GetDataFromMySQl(Table:str='AMZN',Start:datetime='2023-01-01',End:datetime='2024-01-01'):
Table = Table_Name[Table]
URLS = "mysql+pymysql://root:tajedin@localhost:3306/Test_Grafana"
engine = create_engine(URLS, echo=False)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
session = SessionLocal()
start_date = Start
end_date = End
query = session.query(Table) # Use Table if defined
query = query.filter(Table.date >= start_date, Table.date <= end_date) # Replace if not using model
query = query.order_by(Table.date) # Replace if not using model
data = query.all()
df = pd.DataFrame([d.__dict__ for d in data])
df = df[['date','Open','High','Low','Close','AdjClose','Volume']]
df.set_index('date', inplace=True)
session.close()
return df
4. Define DataBase Table for Reading (docker_backtrader/backtest/Tables.py ):
5. Create Your Strategies (`docker_backtrader/backtest/Strategies`):
6. Backtesting Script (docker_backtrader/backtest/backtest.py ):
#python
import backtrader as bt
from GetDataFromMySQL import GetDataFromMySQl
from pymongo import MongoClient
import requests
import json
from datetime import datetime, timedelta
from Stratrgies.Momentum import Momentum_Strategies
from Stratrgies.Test_Strategies import TestStrategy
class BackTest():
def __init__(self,Table:str,Start,End,setcash,Strategy) :
self.Table = Table ,
self.Start = Start ,
self.End = End ,
self.setcash = setcash ,
self.Strategy = Strategy,
self.cerebro = bt.Cerebro()
self.__Analyzers(),
self.__RunStrategy(),
#-----------------+
# Run Strategy |
#-----------------+
def __RunStrategy(self):
data_Parsed = bt.feeds.PandasData(
dataname = GetDataFromMySQl(Table=self.Table[0],Start=self.Start[0],End=self.End[0] ), # GEt Data From MySQL
datetime = None,
open = 0,
high = 1,
low = 2,
close = 4,
volume = 5,
openinterest =-1 # Not available
)
self.cerebro.adddata(data_Parsed)
self.cerebro.addstrategy(self.Strategy[0])
self.cerebro.broker.setcash(self.setcash[0])
#-----------------+
# Analyzers |
#-----------------+
def __Analyzers(self):
self.cerebro.addanalyzer(bt.analyzers.PyFolio)
self.cerebro.addanalyzer(bt.analyzers.DrawDown)
self.cerebro.addanalyzer(bt.analyzers.AnnualReturn)
self.cerebro.addanalyzer(bt.analyzers.TimeDrawDown)
self.cerebro.addanalyzer(bt.analyzers.PositionsValue)
self.cerebro.addanalyzer(bt.analyzers.LogReturnsRolling)
self.cerebro.addanalyzer(bt.analyzers.SharpeRatio, riskfreerate=0.03)
self.cerebro.addanalyzer(bt.analyzers.SQN)
self.cerebro.addanalyzer(bt.analyzers.TradeAnalyzer)
#-----------------+
# Log |
#-----------------+
def Log(self):
print('Starting Portfolio Value: %.2f' % self.cerebro.broker.getvalue())
self.results = self.cerebro.run()
print('Final Portfolio Value: %.2f' % self.cerebro.broker.getvalue())
return self.results
#-----------------+
# Result |
#-----------------+
def Result(self):
strategy_instance = self.results[0]
# strategy_instance.cash_history
pyfolio = self.results[0] .analyzers.getbyname('pyfolio')
returns, positions, transactions, gross_lev = pyfolio.get_pf_items()
drawdown = self.results[0].analyzers.drawdown.get_analysis()
annualreturn = self.results[0].analyzers.annualreturn.get_analysis()
TradeAnalyzer = self.results[0].analyzers.tradeanalyzer.get_analysis()
sharperatio = self.results[0].analyzers.sharperatio.get_analysis()
data_to_insert = {
'Strategy_name' : str(self.Strategy),
'returns' :returns.to_json(orient='index',date_format='iso'),
'positions' :positions.to_json(orient='index',date_format='iso'),
'transactions' :transactions.to_json(orient='index',date_format='iso'),
'drawdown' :drawdown,
'annualreturn' :[(str(year), value) for year, value in annualreturn.items()],
'TradeAnalyzer' :TradeAnalyzer,
'sharperatio' :sharperatio,
}
connection_string = 'mongodb://admin:pass@localhost:27017'
client = MongoClient(connection_string)
db = client['Strategy']
collection = db['Result']
result = collection.insert_one(data_to_insert)
6-1 Backtrader Class
Backtrader Class include 4 Functions:
user must call and create instance of BackTest class:
#Python
instance = BackTest(
Table = 'META',
Start = '2020-01-01',
End = '2024-01-01',
setcash = 100000,
Strategy = Momentum_Strategies
)
We use MongoDB, For Store the result of each strategy:
Contact
Email: [email protected]
LinkedIn: https://www.dhirubhai.net/in/sajad-taj