Skip to content

Commit

Permalink
fix: run cryptofeed strategy in another thread to permit cryptofeed t…
Browse files Browse the repository at this point in the history
…o run in the main one
  • Loading branch information
AntoineLep committed May 1, 2022
1 parent 62e5afa commit f6be052
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 27 deletions.
5 changes: 0 additions & 5 deletions strategies/cryptofeed_strategy/cryptofeed_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import logging
import queue
from typing import List
Expand Down Expand Up @@ -50,10 +49,6 @@ async def open_interest_cb(data, receipt):
# Add raw data to CryptofeedDataTypeEnum.OPEN_INTEREST queue
CryptofeedService.data[CryptofeedDataTypeEnum.OPEN_INTEREST].put(data)

# There is no current event loop in thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

f = FeedHandler()
configured = []

Expand Down
86 changes: 64 additions & 22 deletions strategies/cryptofeed_strategy/cryptofeed_strategy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
import threading
import time
Expand All @@ -6,6 +7,7 @@
from core.strategy.strategy import Strategy
from strategies.cryptofeed_strategy.cryptofeed_service import CryptofeedService
from strategies.cryptofeed_strategy.enums.cryptofeed_data_type_enum import CryptofeedDataTypeEnum
from strategies.cryptofeed_strategy.enums.cryptofeed_side_enum import CryptofeedSideEnum
from tools.utils import flatten
from cryptofeed.types import Liquidation

Expand All @@ -22,10 +24,8 @@ def __init__(self):
logging.info("TestStrategy run strategy")
super(CryptofeedStrategy, self).__init__()

# Array of array of LiquidationDataDict, every sub array is the liquidation list received during the last
# SLEEP_TIME_BETWEEN_LOOPS sec. Use CryptofeedService EXCHANGES global to configure the list of exchange to
# retrieve data on
self.liquidations: List[List[Liquidation]] = []
# Array liquidation data
self.liquidations: List[Liquidation] = []

# {
# exchange1: {
Expand All @@ -37,8 +37,7 @@ def __init__(self):
# Use CryptofeedService EXCHANGES global to configure the list of exchange to retrieve data on
self.open_interest = {}

self._t: threading.Thread = threading.Thread(target=CryptofeedService.start_cryptofeed, args=[])
self._t.start()
self._t: threading.Thread = threading.Thread(target=self.strategy_runner)

def before_loop(self) -> None:
"""Called before each loop"""
Expand All @@ -50,24 +49,19 @@ def loop(self) -> None:
self.perform_new_liquidations()
self.perform_new_open_interest()

last_1_min_liquidations = flatten(
self.liquidations[-min(len(self.liquidations), 60 // SLEEP_TIME_BETWEEN_LOOPS):])
last_5_min_liquidations = flatten(
self.liquidations[-min(len(self.liquidations), 60 * 5 // SLEEP_TIME_BETWEEN_LOOPS):])
ftx_last_1_min_liquidations = self.get_liquidations(exchanges=["FTX"], max_age=60)
ftx_last_5_min_liquidations = self.get_liquidations(exchanges=["FTX"], max_age=60 * 5)
binance_last_1_min_liquidations = self.get_liquidations(exchanges=["BINANCE_FUTURES"], max_age=60)
binance_last_5_min_liquidations = self.get_liquidations(exchanges=["BINANCE_FUTURES"], max_age=60 * 5)

ftx_last_1_min_liquidations_value = sum([round(data.quantity * data.price, 2)
for data in last_1_min_liquidations
if data.exchange == "FTX"])
for data in ftx_last_1_min_liquidations])
ftx_last_5_min_liquidations_value = sum([round(data.quantity * data.price, 2)
for data in last_5_min_liquidations
if data.exchange == "FTX"])

for data in ftx_last_5_min_liquidations])
binance_last_1_min_liquidations_value = sum([round(data.quantity * data.price, 2)
for data in last_1_min_liquidations
if data.exchange == "BINANCE_FUTURES"])
for data in binance_last_1_min_liquidations])
binance_last_5_min_liquidations_value = sum([round(data.quantity * data.price, 2)
for data in last_5_min_liquidations
if data.exchange == "BINANCE_FUTURES"])
for data in binance_last_5_min_liquidations])

logging.info(f'[FTX] Liquidations in the last 1 minute: ${ftx_last_1_min_liquidations_value}')
logging.info(f'[FTX] Liquidations in the last 5 minutes: ${ftx_last_5_min_liquidations_value}')
Expand All @@ -78,8 +72,8 @@ def loop(self) -> None:
# ...

# Remove values older than LIQUIDATION_HISTORY_RETENTION_TIME
if len(self.liquidations) > LIQUIDATION_HISTORY_RETENTION_TIME // SLEEP_TIME_BETWEEN_LOOPS:
self.liquidations = self.liquidations[-LIQUIDATION_HISTORY_RETENTION_TIME // SLEEP_TIME_BETWEEN_LOOPS:]
self.liquidations = list(filter(lambda data: data.timestamp > time.time() - LIQUIDATION_HISTORY_RETENTION_TIME,
self.liquidations))

def after_loop(self) -> None:
"""Called after each loop"""
Expand All @@ -89,6 +83,34 @@ def cleanup(self) -> None:
"""Clean strategy execution"""
self._t.join()

def get_liquidations(self, exchanges: List[str] = None, symbols: List[str] = None, side: CryptofeedSideEnum = None,
max_age: int = -1):
"""
Get the liquidations that meet the given criteria
:param exchanges: List of exchanges to get liquidations for
:param symbols: List of symbols to get liquidations for
:param side: Side to retrieve liquidations for
:param max_age: Max liquidation age in seconds
:return: The list of liquidations that meet the criteria
"""

liquidations = self.liquidations

if exchanges is not None:
liquidations = list(filter(lambda data: data.exchange in exchanges, liquidations))

if symbols is not None:
liquidations = list(filter(lambda data: data.symbol in symbols, liquidations))

if side is not None:
liquidations = list(filter(lambda data: data.side == side, liquidations))

if max_age > -1:
liquidations = list(filter(lambda data: data.timestamp > time.time() - max_age, liquidations))

return liquidations

def perform_new_liquidations(self) -> None:
"""
Flush new received liquidations from cryptofeed service and add new data to the liquidation array
Expand Down Expand Up @@ -116,7 +138,7 @@ def perform_new_liquidations(self) -> None:
f'Quantity: {data.quantity:<10} Price: {data.price:<10} '
f'Size: {size_c}{size:<9}{end_c}') # ID: {data.id} Status: {data.status}')

self.liquidations.append(new_liquidations)
self.liquidations.append(data)

def perform_new_open_interest(self) -> None:
"""
Expand All @@ -133,3 +155,23 @@ def perform_new_open_interest(self) -> None:
"open_interest": oi.open_interest,
"timestamp": oi.timestamp
}

def run(self) -> None:
"""
Override default run method to launch strategy_runner in another thread so cryptofeed can be executed in the
main thread due to issues when running not in the main thread
"""
self._t.start()
CryptofeedService.start_cryptofeed()

def strategy_runner(self) -> None:
try:
while True:
self.before_loop()
self.loop()
self.after_loop()
except Exception as e:
logging.info("An error occurred when running strategy")
logging.info(e)
self.cleanup()
raise
8 changes: 8 additions & 0 deletions strategies/cryptofeed_strategy/enums/cryptofeed_side_enum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from enum import Enum


class CryptofeedSideEnum(Enum):
"""Cryptofeed side enum"""

BUY = "buy"
SELL = "sell"

0 comments on commit f6be052

Please sign in to comment.