Skip to content

Commit

Permalink
cryptofeed add open interest
Browse files Browse the repository at this point in the history
  • Loading branch information
AntoineLep committed Apr 27, 2022
1 parent ef55333 commit cccfc3d
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 50 deletions.
2 changes: 1 addition & 1 deletion config/application_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from strategies.multi_coin_abnormal_volume_tracker.multi_coin_abnormal_volume_tracker \
import MultiCoinAbnormalVolumeTracker

strategy = MultiCoinAbnormalVolumeTracker()
strategy = CryptofeedStrategy()

log = {
"level": "info",
Expand Down
71 changes: 29 additions & 42 deletions strategies/cryptofeed_strategy/cryptofeed_service.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,54 @@
import asyncio
import logging
import queue
from typing import List

from cryptofeed import FeedHandler
from cryptofeed.defines import LIQUIDATIONS
from cryptofeed.defines import LIQUIDATIONS, OPEN_INTEREST
from cryptofeed.exchanges import EXCHANGE_MAP

from strategies.cryptofeed_strategy.enums.cryptofeed_data_type_enum import CryptofeedDataTypeEnum

# Display all received data if set to true. Only the data > min_data_size is displayed otherwise
DISPLAY_ALL_LIQUIDATION_DATA = True
DISPLAY_ALL_DATA = False
exchanges = ['BINANCE_FUTURES', 'FTX']


class CryptofeedService(object):
LIQUIDATION_DATA: queue.Queue = queue.Queue()
data = {
CryptofeedDataTypeEnum.LIQUIDATIONS: queue.Queue(),
CryptofeedDataTypeEnum.OPEN_INTEREST: queue.Queue()
}

@staticmethod
def flush_liquidation_data_queue_items(min_data_size: int = 0):
def flush_liquidation_data_queue_items(data_type: CryptofeedDataTypeEnum) -> List:
"""
Flush TestStrategy.LIQUIDATION_DATA queue and returns the data having a value >= min_data_size
Flush TestStrategy.LIQUIDATION_DATA queue and returns the data
:return: The data having a size >= min_data_size
:param data_type: The type of data to flush
:return: The liquidation data
"""
items = []

while not CryptofeedService.LIQUIDATION_DATA.empty():
data = CryptofeedService.LIQUIDATION_DATA.get()
while not CryptofeedService.data[data_type].empty():
data = CryptofeedService.data[data_type].get()

if DISPLAY_ALL_LIQUIDATION_DATA:
if DISPLAY_ALL_DATA:
logging.info(data)

size = round(data.quantity * data.price, 2)

if size >= min_data_size:
end_c = '\033[0m'
side_c = '\033[91m' if data.side == 'sell' else '\33[32m'

size_c = ''

if size > 10_000:
size_c = '\33[32m'
if size > 25_000:
size_c = '\33[33m'
if size > 50_000:
size_c = '\33[31m'
if size > 100_000:
size_c = '\35[35m'

logging.info(f'{data.exchange:<18} {data.symbol:<18} Side: {side_c}{data.side:<8}{end_c} '
f'Quantity: {data.quantity:<10} Price: {data.price:<10} '
f'Size: {size_c}{size:<9}{end_c}') # ID: {data.id} Status: {data.status}')

items.append(data)
items.append(data)

return items

@staticmethod
def start_cryptofeed():
async def liquidations(data, receipt):
# Add raw data to TestStrategy.LIQUIDATION_DATA queue
CryptofeedService.LIQUIDATION_DATA.put(data)
# Add raw data to CryptofeedDataTypeEnum.LIQUIDATION_DATA queue
CryptofeedService.data[CryptofeedDataTypeEnum.LIQUIDATIONS].put(data)

async def open_interest(data, receipt):
# Add raw data to CryptofeedDataTypeEnum.OPEN_INTEREST queue
CryptofeedService.data[CryptofeedDataTypeEnum.OPEN_INTEREST].put(data)

# There is no current event loop in thread
loop = asyncio.new_event_loop()
Expand All @@ -66,10 +57,6 @@ async def liquidations(data, receipt):
f = FeedHandler()
configured = []

# ['BINANCE_DELIVERY', 'BINANCE_FUTURES', 'BITMEX', 'BYBIT', 'DERIBIT', 'FTX']
exchanges = ['BINANCE_FUTURES', 'FTX']

# print(type(EXCHANGE_MAP), EXCHANGE_MAP)
print("Querying exchange metadata")
for exchange_string, exchange_class in EXCHANGE_MAP.items():

Expand All @@ -79,15 +66,15 @@ async def liquidations(data, receipt):
if exchange_string in ['BITFLYER', 'EXX', 'OKEX']: # We have issues with these exchanges
continue

if LIQUIDATIONS in exchange_class.info()['channels']['websocket']:
if all(channel in exchange_class.info()['channels']['websocket'] for channel in [LIQUIDATIONS,
OPEN_INTEREST]):
configured.append(exchange_string)
print(f"Configuring {exchange_string}...", end='')
symbols = [sym for sym in exchange_class.symbols() if 'PINDEX' not in sym]
# symbols = ['LRC-USDT-PERP']
# print(symbols)

try:
f.add_feed(exchange_class(subscription={LIQUIDATIONS: symbols},
callbacks={LIQUIDATIONS: liquidations}))
f.add_feed(exchange_class(subscription={LIQUIDATIONS: symbols, OPEN_INTEREST: symbols},
callbacks={LIQUIDATIONS: liquidations, OPEN_INTEREST: open_interest}))
print(" Done")
except Exception as e:
print(e, exchange_string)
Expand Down
60 changes: 53 additions & 7 deletions strategies/cryptofeed_strategy/cryptofeed_strategy.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import logging
import threading
import time
from typing import List

from core.strategy.strategy import Strategy
from strategies.cryptofeed_strategy.cryptofeed_service import CryptofeedService
from strategies.cryptofeed_strategy.enums.cryptofeed_data_type_enum import CryptofeedDataTypeEnum
from strategies.cryptofeed_strategy.models.liquidation_data_dict import LiquidationDataDict
from tools.utils import flatten

SLEEP_TIME_BETWEEN_LOOPS = 10
Expand All @@ -19,8 +22,9 @@ def __init__(self):
logging.info("TestStrategy run strategy")
super(CryptofeedStrategy, self).__init__()

self.liquidations = []
self._t: threading.Thread = threading.Thread(target=CryptofeedService.start_cryptofeed)
self.liquidations: List[List[LiquidationDataDict]] = []
self.open_interest = {}
self._t: threading.Thread = threading.Thread(target=CryptofeedService.start_cryptofeed, args=[])
self._t.start()

def before_loop(self) -> None:
Expand All @@ -30,11 +34,8 @@ def before_loop(self) -> None:
def loop(self) -> None:
"""The strategy core loop method"""

# Flush liquidation data received into TestStrategy.LIQUIDATION_DATA queue
# Put in parameters the minimum value of the liquidation you want to retrieve
new_liquidations = CryptofeedService.flush_liquidation_data_queue_items(0)

self.liquidations.append(new_liquidations)
self.perform_new_liquidations()
self.perform_new_open_interest()

last_1_min_liquidations = flatten(
self.liquidations[-min(len(self.liquidations), 60 // SLEEP_TIME_BETWEEN_LOOPS):])
Expand Down Expand Up @@ -63,3 +64,48 @@ def after_loop(self) -> None:
def cleanup(self) -> None:
"""Clean strategy execution"""
self._t.join()

def perform_new_liquidations(self) -> None:
"""
Flush new received liquidations from cryptofeed service and add new data to the liquidation array
"""
new_liquidations = CryptofeedService.flush_liquidation_data_queue_items(CryptofeedDataTypeEnum.LIQUIDATIONS)

for data in new_liquidations:
size = round(data.quantity * data.price, 2)

end_c = '\033[0m'
side_c = '\033[91m' if data.side == 'sell' else '\33[32m'

size_c = ''

if size > 10_000:
size_c = '\33[32m'
if size > 25_000:
size_c = '\33[33m'
if size > 50_000:
size_c = '\33[31m'
if size > 100_000:
size_c = '\35[35m'

logging.info(f'{data.exchange:<18} {data.symbol:<18} Side: {side_c}{data.side:<8}{end_c} '
f'Quantity: {data.quantity:<10} Price: {data.price:<10} '
f'Size: {size_c}{size:<9}{end_c}') # ID: {data.id} Status: {data.status}')

self.liquidations.append(new_liquidations)

def perform_new_open_interest(self) -> None:
"""
Flush new received open interest from cryptofeed service and add new data to the open interest object
"""

new_oi = CryptofeedService.flush_liquidation_data_queue_items(CryptofeedDataTypeEnum.OPEN_INTEREST)

for oi in new_oi:
if oi.exchange not in self.open_interest:
self.open_interest[oi.exchange] = {}

self.open_interest[oi.exchange][oi.symbol] = {
"open_interest": oi.open_interest,
"timestamp": oi.timestamp
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from enum import Enum


class CryptofeedDataTypeEnum(Enum):
"""Cryptofeed data type enum"""

LIQUIDATIONS = "liquidations"
OPEN_INTEREST = "open_interest"
8 changes: 8 additions & 0 deletions strategies/cryptofeed_strategy/enums/cryptofeed_side_enum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from enum import Enum


class CryptofeedSideEnum(Enum):
"""Cryptofeed side enum"""

BUY = "buy"
SELL = "sell"
15 changes: 15 additions & 0 deletions strategies/cryptofeed_strategy/models/liquidation_data_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import TypedDict

from strategies.cryptofeed_strategy.enums.cryptofeed_side_enum import CryptofeedSideEnum


class LiquidationDataDict(TypedDict):
"""Liquidation data dict"""

exchange: str
price: float
quantity: float
side: CryptofeedSideEnum
status: str
symbol: str
timestamp: float
10 changes: 10 additions & 0 deletions strategies/cryptofeed_strategy/models/open_interest_data_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import TypedDict


class OpenInterestDataDict(TypedDict):
"""Open interest data dict"""

exchange: str
symbol: str
open_interest: float
timestamp: float

0 comments on commit cccfc3d

Please sign in to comment.