Skip to content

Commit

Permalink
fix loop
Browse files Browse the repository at this point in the history
  • Loading branch information
traderpedroso committed May 28, 2023
1 parent c024fed commit acb0c1e
Showing 1 changed file with 95 additions and 78 deletions.
173 changes: 95 additions & 78 deletions ejtraderMT/api/mql.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ def __init__(self, host=None, debug=None):
try:
self.sys_socket = context.socket(zmq.REQ)
# set port timeout
self.sys_socket.RCVTIMEO = sys_timeout * 1000
self.sys_socket.RCVTIMEO = sys_timeout
self.sys_socket.connect("tcp:https://{}:{}".format(self.HOST, self.SYS_PORT))

self.data_socket = context.socket(zmq.PULL)
# set port timeout
self.data_socket.RCVTIMEO = data_timeout * 1000
self.data_socket.RCVTIMEO = data_timeout
self.data_socket.connect("tcp:https://{}:{}".format(self.HOST, self.DATA_PORT))

self.indicator_data_socket = context.socket(zmq.PULL)
# set port timeout
self.indicator_data_socket.RCVTIMEO = data_timeout * 1000
self.indicator_data_socket.RCVTIMEO = data_timeout
self.indicator_data_socket.connect(
"tcp:https://{}:{}".format(self.HOST, self.INDICATOR_DATA_PORT)
)
Expand Down Expand Up @@ -288,8 +288,8 @@ def calendar(self, symbol=None, fromDate=None, toDate=None, database=None):
self.__database = database
try:
start(self._calendar, repeat=1, max_threads=20)
except:
logging.info("Error: unable to start History thread")
except Exception as e:
logging.info(f"Error: {e}")

return self.__calendarQ.get()

Expand Down Expand Up @@ -320,20 +320,10 @@ def _calendar(self, data):
toDate = start_date
toDate += delta2
toDate = toDate.strftime("%d/%m/%Y")

if fromDate and toDate:
try:
df = self.__api.Command(
action="CALENDAR",
actionType="DATA",
symbol=symbol,
fromDate=self.__date_to_timestamp(fromDate),
toDate=self.__date_to_timestamp(toDate),
)

except:
pass
elif isinstance(fromDate, str) and toDate == None:
attempts = 0
success = False

while not success and attempts < 5:
try:
df = self.__api.Command(
action="CALENDAR",
Expand All @@ -342,8 +332,14 @@ def _calendar(self, data):
fromDate=self.__date_to_timestamp(fromDate),
toDate=self.__date_to_timestamp(toDate),
)
except:
pass
success = True

except Exception as e:
logging.info(f"Error while processing {symbol}. Error message: {str(e)}")
attempts += 1
if attempts == 5 and not success:
print(f"Check if {symbol} is avalible from {fromDate}")
break
self.__api.Command(action="RESET")
try:
df = pd.DataFrame(df["data"])
Expand All @@ -361,13 +357,18 @@ def _calendar(self, data):
df = df.dropna(subset=["date"])
df = df.set_index("date")
df.index = pd.to_datetime(df.index)
except:
except Exception as e:
logging.info(f"Error while processing {symbol} Dataframe. Error message: {str(e)}")
pass

appended_data.append(df)
start_date += delta
pbar.close()

df = pd.concat(appended_data)
try:
df = pd.concat(appended_data)
except Exception as e:
logging.info(f"Error while processing {symbol} Dataframe. Error message: {str(e)}")
pass

if self.__database:
start(self.__save_to_db, data=[df], repeat=1, max_threads=20)
Expand Down Expand Up @@ -735,6 +736,8 @@ def __historyThread_save(self, data):
toDate = start_date
toDate += delta2
toDate = toDate.strftime("%d/%m/%Y")
attempts = 0
success = False

if chartTF == "TICK":
chartConvert = 60
Expand All @@ -746,28 +749,29 @@ def __historyThread_save(self, data):
# the first symbol on list is the main and the rest will merge
if active == actives[0]:
self.__active_name = active
# get data
if fromDate and toDate:
data = self.__api.Command(
action="HISTORY",
actionType="DATA",
symbol=active,
chartTF=chartTF,
fromDate=self.__date_to_timestamp(fromDate),
toDate=self.__date_to_timestamp(toDate),
)

elif isinstance(fromDate, str) and toDate == None:
data = self.__api.Command(
action="HISTORY",
actionType="DATA",
symbol=active,
chartTF=chartTF,
fromDate=self.__date_to_timestamp(fromDate),
toDate=self.__date_to_timestamp(toDate),
)

self.__api.Command(action="RESET")
while not success and attempts < 5:
try:
data = self.__api.Command(
action="HISTORY",
actionType="DATA",
symbol=active,
chartTF=chartTF,
fromDate=self.__date_to_timestamp(fromDate),
toDate=self.__date_to_timestamp(toDate),
)
success = True
except Exception as e:
logging.info(f"Error while processing {active}. Error message: {str(e)}")
attempts += 1
if attempts == 5 and not success:
print(f"Check if {active} is avalible from {fromDate}")
break

try:
self.__api.Command(action="RESET")
except:
pass

try:
main = pd.DataFrame(data["data"])
main = main.set_index([0])
Expand All @@ -791,31 +795,33 @@ def __historyThread_save(self, data):
"volume",
"spread",
]
except KeyError:
except Exception as e:
# logging.info(f"Error while processing Dataframe {active}. Error message: {str(e)}")
pass
else:
# get data
if fromDate and toDate:
data = self.__api.Command(
action="HISTORY",
actionType="DATA",
symbol=active,
chartTF=chartTF,
fromDate=self.__date_to_timestamp(fromDate),
toDate=self.__date_to_timestamp(toDate),
)

elif isinstance(fromDate, str) and toDate == None:
data = self.__api.Command(
action="HISTORY",
actionType="DATA",
symbol=active,
chartTF=chartTF,
fromDate=self.__date_to_timestamp(fromDate),
toDate=self.__date_to_timestamp(toDate),
)

self.__api.Command(action="RESET")
while not success and attempts < 5:
try:
data = self.__api.Command(
action="HISTORY",
actionType="DATA",
symbol=active,
chartTF=chartTF,
fromDate=self.__date_to_timestamp(fromDate),
toDate=self.__date_to_timestamp(toDate),
)
success = True
except Exception as e:
logging.info(f"Error while processing {active}. Error message: {str(e)}")
attempts += 1
if attempts == 5 and not success:
print(f"Check if {active} is avalible from {fromDate}")
pass

try:
self.__api.Command(action="RESET")
except Exception as e:
logging.info(f"Error while processing commad Reset {active}. Error message: {str(e)}")
pass
try:
current = pd.DataFrame(data["data"])
current = current.set_index([0])
Expand Down Expand Up @@ -844,23 +850,32 @@ def __historyThread_save(self, data):
# main = pd.merge(main, current, how='inner',
# left_index=True, right_index=True)
main = pd.merge(main, current, on="date")
except KeyError:
except Exception as e:
logging.info(f"Error while processing Dataframe {active}. Error message: {str(e)}")
pass

main = main.loc[~main.index.duplicated(keep="first")]
appended_data.append(main)

try:
main = main.loc[~main.index.duplicated(keep="first")]
appended_data.append(main)
except Exception as e:
logging.info(f"Error while processing {active}. Error message: {str(e)}")
pass
start_date += delta
pbar.close()
df = pd.concat(appended_data)
try:
df = pd.concat(appended_data)
except Exception as e:
logging.info(f"Error while processing {active}. Error message: {str(e)}")
pass

if self.__database:
start(self.__save_to_db, data=[df], repeat=1, max_threads=20)
else:
try:
self.__set_utc_or_localtime_tz_df(df)
self.__historyQ.put(df)

except AttributeError:
except Exception as e:
logging.info(f"Error while processing {active}. Error message: {str(e)}")
pass

def __save_to_db(self, df):
Expand All @@ -869,14 +884,16 @@ def __save_to_db(self, df):
try:
self.__set_utc_or_localtime_tz_df(df)

except AttributeError:
except Exception as e:
logging.info(f"Error while processing database. Error message: {str(e)}")
pass

q[f"{self._symbol}"] = df
else:
try:
self.__set_utc_or_localtime_tz_df(df)
except AttributeError:
except Exception as e:
logging.info(f"Error while processing utc or localtime tz. Error message: {str(e)}")
pass
if self.dbtype == "INFLUXDB":
self.__client.write_points(df, f"{self._symbol}", protocol=self.protocol)

0 comments on commit acb0c1e

Please sign in to comment.