Skip to content

Commit

Permalink
multithreading
Browse files Browse the repository at this point in the history
  • Loading branch information
Emerson Pedroso committed Mar 4, 2021
1 parent 1956e0e commit 95facb5
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 107 deletions.
44 changes: 17 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Python Metatrader DataFrame API 2.0
# Python Metatrader DataFrame API 2.0.3

## Instalation for docker Metatrader 5 Server API

Expand Down Expand Up @@ -180,51 +180,41 @@ date
[3097 rows x 12 columns]
```

# Live data and streaming events
# Live streaming Price

```python
from ejtraderMT import Metatrader
from threading import Thread




api = Metatrader()

symbols = ["EURUSD","GBPUSD","AUDUSD"]
timeframe = "TICK"


# Live data stream subscribe
api.live(symbols,timeframe)




def price():
connect = api.live_price
while True:
price = connect.recv_json()
print(price)
# stream price
while True:
price = api.price(symbols,timeframe)
print(price)

```

def event():
connect = api.live_event
while True:
event = connect.recv_json()
print(event)
# Live streaming events

```python
from ejtraderMT import Metatrader


api = Metatrader()

t = Thread(target=price, daemon=True)
t.start()
symbols = ["EURUSD","GBPUSD","AUDUSD"]
timeframe = "TICK"

t = Thread(target=event, daemon=True)
t.start()

# stream event
while True:
pass
event = api.event(symbols,timeframe)
print(event)

```

# Trading and Orders Manipulation
Expand Down
164 changes: 141 additions & 23 deletions ejtraderMT/api/platafrom.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import time
from pytz import timezone
from tzlocal import get_localzone
import asyncio
import uvloop
from threading import Thread
from queue import Queue



class Functions:
def __init__(self, host=None, debug=None):
Expand Down Expand Up @@ -214,11 +216,15 @@ def __init__(self, host=None, real_volume=None, localtime=True):
self.real_volume = real_volume or False
self.localtime = localtime
self.utc_timezone = timezone('UTC')

self.live_price = self.api.live_socket()
self.live_event = self.api.streaming_socket()
self.my_timezone = get_localzone()
self.utc_brocker_offset = self._utc_brocker_offset()
self._priceQ = Queue()
self._eventQ = Queue()
self._historyQ = Queue()





def balance(self):
return self.api.Command(action="BALANCE")
Expand Down Expand Up @@ -347,9 +353,100 @@ def _utc_brocker_offset(self):
seconds = int(hour)*60
return seconds


def _price(self):
connect = self.api.live_socket()
while True:
price = connect.recv_json()
try:
price = price['data']
price = pd.DataFrame([price])
price = price.set_index([0])
price.index.name = 'date'
if self.allchartTF == 'TICK':
price.index = pd.to_datetime(price.index, unit='ms')
price.columns = ['bid', 'ask']
self._priceQ.put(price)
else:
if self.real_volume:
del price[5]
else:
del price[6]
price.index = pd.to_datetime(price.index, unit='s')
price.columns = ['open', 'high', 'low','close', 'volume','spread']

self._priceQ.put(price)
except KeyError:
pass



def _event(self):
connect = self.api.streaming_socket()
while True:
event = connect.recv_json()
try:
event = event['request']
event = pd.DataFrame(event, index=[0])
self._eventQ.put(event)
except KeyError:
pass

def price(self, symbol, chartTF):
self.api.Command(action="RESET")
self.allsymbol = symbol
self.allchartTF = chartTF
for active in symbol:
self.api.Command(action="CONFIG", symbol=active, chartTF=chartTF)
self.start_thread_price()
return self._priceQ.get()



def event(self, symbol, chartTF):
self.api.Command(action="RESET")
self.allsymbol = symbol
self.allchartTF = chartTF
for active in symbol:
self.api.Command(action="CONFIG", symbol=active, chartTF=chartTF)
self.start_thread_event()
return self._eventQ.get()




def start_thread_event(self):
try:
event = Thread(target=self._event, daemon=True)
event.start()
except:
print("Error: unable to start Event thread")


def start_thread_price(self):
try:
price = Thread(target=self._price, daemon=True)
price.start()
except:
print("Error: unable to start Price thread")




def start_thread_history(self):
try:
history = Thread(target=self.historyThread, daemon=True)
history.start()
except:
print("Error: unable to start History thread")









# convert datestamp to dia/mes/ano
def date_to_timestamp(self, s):
Expand All @@ -369,12 +466,7 @@ def brokerTimeCalculation(self,s):
return result


def live(self, symbol, chartTF):
self.api.Command(action="RESET")
for active in symbol:
self.api.Command(action="CONFIG", symbol=active, chartTF=chartTF)
print(f'subscribed : {active}')
time.sleep(1)



def timeframe_to_sec(self, timeframe):
Expand All @@ -401,39 +493,59 @@ def setlocaltime_dataframe(self, df):
df.index = df.index.tz_convert(self.my_timezone)
df.index = df.index.tz_localize(None)
return df



def history(self, symbol, chartTF, fromDate=None, toDate=None):
actives = symbol
def history(self,symbol,chartTF,fromDate=None,toDate=None,threadON=False):
self.symbol = symbol
self.chartTF = chartTF
self.fromDate = fromDate
self.toDate = toDate
self.start_thread_history()
return self._historyQ.get()



def historyThread(self):
actives = self.symbol
chartTF = self.chartTF
fromDate = self.fromDate
toDate = self.toDate
main = pd.DataFrame()
current = pd.DataFrame()
if(chartTF == 'TICK'):
chartConvert = 60
else:
chartConvert = self.timeframe_to_sec(chartTF)
for active in actives:
# the first symbol on list is the main and the rest will merge
if active == symbol[0]:
if active == actives[0]:
# get data
if fromDate and toDate:
data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF,
fromDate=self.date_to_timestamp(fromDate), toDate=self.date_to_timestamp(toDate))
elif isinstance(fromDate, int):
data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF,
fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + self.timeframe_to_sec(chartTF)) + fromDate * self.timeframe_to_sec(chartTF) - self.timeframe_to_sec(chartTF)) ))
fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + chartConvert) + fromDate * chartConvert - chartConvert) ))
elif isinstance(fromDate, str) and toDate==None:
data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF,
fromDate=self.date_to_timestamp(fromDate),toDate=self.date_to_timestamp_broker())
else:
data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF,
fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + self.timeframe_to_sec(chartTF)) + 100 * self.timeframe_to_sec(chartTF) - self.timeframe_to_sec(chartTF)) ))
fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + chartConvert) + 100 * chartConvert - chartConvert) ))
self.api.Command(action="RESET")
try:
main = pd.DataFrame(data['data'])
main = main.set_index([0])
main.index.name = 'date'
main.index = pd.to_datetime(main.index, unit='s')


# TICK DATA
if(chartTF == 'TICK'):
main.columns = ['bid', 'ask']
main.index = pd.to_datetime(main.index, unit='ms')
else:
# OHLC DATA
main.index = pd.to_datetime(main.index, unit='s')
if self.real_volume:
del main[5]
else:
Expand All @@ -449,26 +561,26 @@ def history(self, symbol, chartTF, fromDate=None, toDate=None):
fromDate=self.date_to_timestamp(fromDate), toDate=self.date_to_timestamp(toDate))
elif isinstance(fromDate, int):
data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF,
fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + self.timeframe_to_sec(chartTF)) + fromDate * self.timeframe_to_sec(chartTF) - self.timeframe_to_sec(chartTF)) ))
fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + chartConvert) + fromDate * chartConvert - chartConvert) ))
elif isinstance(fromDate, str) and toDate==None:
data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF,
fromDate=self.date_to_timestamp(fromDate),toDate=self.date_to_timestamp_broker())
else:
data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF,
fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + self.timeframe_to_sec(chartTF)) + 100 * self.timeframe_to_sec(chartTF) - self.timeframe_to_sec(chartTF)) ))
fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + chartConvert) + 100 * chartConvert - chartConvert) ))

self.api.Command(action="RESET")
try:
current = pd.DataFrame(data['data'])
current = current.set_index([0])
current.index.name = 'date'
current.index = pd.to_datetime(current.index, unit='s')
active = active.lower()
# TICK DATA
if(chartTF == 'TICK'):
current.index = pd.to_datetime(current.index, unit='ms')
current.columns = [f'{active}_bid', f'{active}_ask']
else:
# OHLC DATA
current.index = pd.to_datetime(current.index, unit='s')
if self.real_volume:
del current[5]
else:
Expand All @@ -487,6 +599,12 @@ def history(self, symbol, chartTF, fromDate=None, toDate=None):
except AttributeError:
pass
main = main.loc[~main.index.duplicated(keep='first')]
return main
self._historyQ.put(main)








1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pandas>=0.25.1
pyzmq==19.0.2
tzlocal==2.1
ejtraderDB==1.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def requirements(filename):

setup(
name='ejtraderMT',
version='2.0.3',
version='2.0.4',
packages=find_packages(),
url='https://ejtrader_mt.readthedocs.io/',
download_url='https://ejtrader.com',
Expand Down
Loading

0 comments on commit 95facb5

Please sign in to comment.