diff --git a/README.md b/README.md index d447dac..33a2f9f 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Python Metatrader DataFrame API 2.0 +# Python Metatrader DataFrame API 2.0.3 ## Instalation for docker Metatrader 5 Server API @@ -180,14 +180,10 @@ date [3097 rows x 12 columns] ``` -# Live data and streaming events +# Live streaming Price ```python from ejtraderMT import Metatrader -from threading import Thread - - - api = Metatrader() @@ -195,36 +191,30 @@ symbols = ["EURUSD","GBPUSD","AUDUSD"] timeframe = "TICK" -# Live data stream subscribe -api.live(symbols,timeframe) - - - - -def price(): - connect = api.live_price - while True: - price = connect.recv_json() - print(price) +# stream price +while True: + price = api.price(symbols,timeframe) + print(price) +``` -def event(): - connect = api.live_event - while True: - event = connect.recv_json() - print(event) +# Live streaming events +```python +from ejtraderMT import Metatrader +api = Metatrader() -t = Thread(target=price, daemon=True) -t.start() +symbols = ["EURUSD","GBPUSD","AUDUSD"] +timeframe = "TICK" -t = Thread(target=event, daemon=True) -t.start() +# stream event while True: - pass + event = api.event(symbols,timeframe) + print(event) + ``` # Trading and Orders Manipulation diff --git a/ejtraderMT/api/platafrom.py b/ejtraderMT/api/platafrom.py index a1b320a..2b474b1 100755 --- a/ejtraderMT/api/platafrom.py +++ b/ejtraderMT/api/platafrom.py @@ -5,8 +5,10 @@ import time from pytz import timezone from tzlocal import get_localzone -import asyncio -import uvloop +from threading import Thread +from queue import Queue + + class Functions: def __init__(self, host=None, debug=None): @@ -214,11 +216,15 @@ def __init__(self, host=None, real_volume=None, localtime=True): self.real_volume = real_volume or False self.localtime = localtime self.utc_timezone = timezone('UTC') - - self.live_price = self.api.live_socket() - self.live_event = self.api.streaming_socket() self.my_timezone = get_localzone() self.utc_brocker_offset = self._utc_brocker_offset() + self._priceQ = Queue() + self._eventQ = Queue() + self._historyQ = Queue() + + + + def balance(self): return self.api.Command(action="BALANCE") @@ -347,9 +353,100 @@ def _utc_brocker_offset(self): seconds = int(hour)*60 return seconds + + def _price(self): + connect = self.api.live_socket() + while True: + price = connect.recv_json() + try: + price = price['data'] + price = pd.DataFrame([price]) + price = price.set_index([0]) + price.index.name = 'date' + if self.allchartTF == 'TICK': + price.index = pd.to_datetime(price.index, unit='ms') + price.columns = ['bid', 'ask'] + self._priceQ.put(price) + else: + if self.real_volume: + del price[5] + else: + del price[6] + price.index = pd.to_datetime(price.index, unit='s') + price.columns = ['open', 'high', 'low','close', 'volume','spread'] + + self._priceQ.put(price) + except KeyError: + pass + + + + def _event(self): + connect = self.api.streaming_socket() + while True: + event = connect.recv_json() + try: + event = event['request'] + event = pd.DataFrame(event, index=[0]) + self._eventQ.put(event) + except KeyError: + pass + + def price(self, symbol, chartTF): + self.api.Command(action="RESET") + self.allsymbol = symbol + self.allchartTF = chartTF + for active in symbol: + self.api.Command(action="CONFIG", symbol=active, chartTF=chartTF) + self.start_thread_price() + return self._priceQ.get() + + + + def event(self, symbol, chartTF): + self.api.Command(action="RESET") + self.allsymbol = symbol + self.allchartTF = chartTF + for active in symbol: + self.api.Command(action="CONFIG", symbol=active, chartTF=chartTF) + self.start_thread_event() + return self._eventQ.get() + + + + + def start_thread_event(self): + try: + event = Thread(target=self._event, daemon=True) + event.start() + except: + print("Error: unable to start Event thread") + + + def start_thread_price(self): + try: + price = Thread(target=self._price, daemon=True) + price.start() + except: + print("Error: unable to start Price thread") + + + + + def start_thread_history(self): + try: + history = Thread(target=self.historyThread, daemon=True) + history.start() + except: + print("Error: unable to start History thread") + + + + + # convert datestamp to dia/mes/ano def date_to_timestamp(self, s): @@ -369,12 +466,7 @@ def brokerTimeCalculation(self,s): return result - def live(self, symbol, chartTF): - self.api.Command(action="RESET") - for active in symbol: - self.api.Command(action="CONFIG", symbol=active, chartTF=chartTF) - print(f'subscribed : {active}') - time.sleep(1) + def timeframe_to_sec(self, timeframe): @@ -401,39 +493,59 @@ def setlocaltime_dataframe(self, df): df.index = df.index.tz_convert(self.my_timezone) df.index = df.index.tz_localize(None) return df + + - def history(self, symbol, chartTF, fromDate=None, toDate=None): - actives = symbol + def history(self,symbol,chartTF,fromDate=None,toDate=None,threadON=False): + self.symbol = symbol + self.chartTF = chartTF + self.fromDate = fromDate + self.toDate = toDate + self.start_thread_history() + return self._historyQ.get() + + + + def historyThread(self): + actives = self.symbol + chartTF = self.chartTF + fromDate = self.fromDate + toDate = self.toDate main = pd.DataFrame() current = pd.DataFrame() + if(chartTF == 'TICK'): + chartConvert = 60 + else: + chartConvert = self.timeframe_to_sec(chartTF) for active in actives: # the first symbol on list is the main and the rest will merge - if active == symbol[0]: + if active == actives[0]: # get data if fromDate and toDate: data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF, fromDate=self.date_to_timestamp(fromDate), toDate=self.date_to_timestamp(toDate)) elif isinstance(fromDate, int): data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF, - fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + self.timeframe_to_sec(chartTF)) + fromDate * self.timeframe_to_sec(chartTF) - self.timeframe_to_sec(chartTF)) )) + fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + chartConvert) + fromDate * chartConvert - chartConvert) )) elif isinstance(fromDate, str) and toDate==None: data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF, fromDate=self.date_to_timestamp(fromDate),toDate=self.date_to_timestamp_broker()) else: data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF, - fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + self.timeframe_to_sec(chartTF)) + 100 * self.timeframe_to_sec(chartTF) - self.timeframe_to_sec(chartTF)) )) + fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + chartConvert) + 100 * chartConvert - chartConvert) )) self.api.Command(action="RESET") try: main = pd.DataFrame(data['data']) main = main.set_index([0]) main.index.name = 'date' - main.index = pd.to_datetime(main.index, unit='s') + # TICK DATA if(chartTF == 'TICK'): main.columns = ['bid', 'ask'] + main.index = pd.to_datetime(main.index, unit='ms') else: - # OHLC DATA + main.index = pd.to_datetime(main.index, unit='s') if self.real_volume: del main[5] else: @@ -449,26 +561,26 @@ def history(self, symbol, chartTF, fromDate=None, toDate=None): fromDate=self.date_to_timestamp(fromDate), toDate=self.date_to_timestamp(toDate)) elif isinstance(fromDate, int): data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF, - fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + self.timeframe_to_sec(chartTF)) + fromDate * self.timeframe_to_sec(chartTF) - self.timeframe_to_sec(chartTF)) )) + fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + chartConvert) + fromDate * chartConvert - chartConvert) )) elif isinstance(fromDate, str) and toDate==None: data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF, fromDate=self.date_to_timestamp(fromDate),toDate=self.date_to_timestamp_broker()) else: data = self.api.Command(action="HISTORY", actionType="DATA", symbol=active, chartTF=chartTF, - fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + self.timeframe_to_sec(chartTF)) + 100 * self.timeframe_to_sec(chartTF) - self.timeframe_to_sec(chartTF)) )) + fromDate=self.datetime_to_timestamp(self.brokerTimeCalculation((10800 + chartConvert) + 100 * chartConvert - chartConvert) )) self.api.Command(action="RESET") try: current = pd.DataFrame(data['data']) current = current.set_index([0]) current.index.name = 'date' - current.index = pd.to_datetime(current.index, unit='s') active = active.lower() # TICK DATA if(chartTF == 'TICK'): + current.index = pd.to_datetime(current.index, unit='ms') current.columns = [f'{active}_bid', f'{active}_ask'] else: - # OHLC DATA + current.index = pd.to_datetime(current.index, unit='s') if self.real_volume: del current[5] else: @@ -487,6 +599,12 @@ def history(self, symbol, chartTF, fromDate=None, toDate=None): except AttributeError: pass main = main.loc[~main.index.duplicated(keep='first')] - return main + self._historyQ.put(main) + + + + + + \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 12e20bf..359b974 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ pandas>=0.25.1 pyzmq==19.0.2 tzlocal==2.1 -ejtraderDB==1.0 \ No newline at end of file diff --git a/setup.py b/setup.py index d1eed1f..830a540 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ def requirements(filename): setup( name='ejtraderMT', - version='2.0.3', + version='2.0.4', packages=find_packages(), url='https://ejtrader_mt.readthedocs.io/', download_url='https://ejtrader.com', diff --git a/test/live.py b/test/live.py index c3a0b78..81f477a 100644 --- a/test/live.py +++ b/test/live.py @@ -1,70 +1,36 @@ from ejtraderMT import Metatrader -from threading import Thread from ejtraderDB import DictSQLite -import asyncio -import uvloop - +import time api = Metatrader() - -symbols = ["EURUSD","GBPUSD","AUDUSD"] -timeframe = "TICK" -local = 'live' - -# Live data stream subscribe -api.live(symbols,timeframe) - -q = DictSQLite(name=local) - - - - -async def price(): - connect = api.live_price - while True: - price = connect.recv_json() - try: - symbol = price['symbol'] - price = price['data'] - q[symbol] = price - except KeyError: - pass - print(price) - -async def event(): - connect = api.live_event - while True: - event = connect.recv_json() - print(event) - - - - -active = [["EURUSD"],["GBPUSD"]] +symbols = ["EURUSD"] timeframe = "M1" -period = 43200 -q = DictSQLite(name="history") -async def main(): - while True: - for i in active: - df = api.history(i,timeframe,period) - print(df,end="\r") - q[f"{i}"] = df - -uvloop.install() -asyncio.run(main()) +# dados = api.history(symbols,timeframe,5) +# start_time = time.perf_counter() +# dados = api.history(symbols,timeframe,150,threadON=True) +# df = dados[symbols[0]] +# end_time = time.perf_counter() +# elapsed_time = end_time - start_time +# print(f"Elapsed run time: {elapsed_time} seconds") +# print(df) +# start_time = time.perf_counter() +# dados = api.history(symbols,timeframe,1) +# end_time = time.perf_counter() +# elapsed_time = end_time - start_time +# print(f"Elapsed run time: {elapsed_time} seconds") -uvloop.install() -asyncio.run(price()) -asyncio.run(event()) +# print(dados) +# price = api.price(symbols,timeframe) -# {'status': 'CONNECTED', 'symbol': 'GBPUSD', 'timeframe': 'TICK', 'data': [1614723675076, 1.39677, 1.3968]} -# {'status': 'CONNECTED', 'symbol': 'AUDUSD', 'timeframe': 'TICK', 'data': [1614723675076, 1.39677, 1.3968]} \ No newline at end of file +while True: + history = api.history(symbols,timeframe,5) + print(history) + # print(price)