Skip to content

Commit

Permalink
cryptofeed library integration
Browse files Browse the repository at this point in the history
  • Loading branch information
AntoineLep committed Apr 24, 2022
1 parent dac58c9 commit ef55333
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 5 deletions.
3 changes: 2 additions & 1 deletion config/application_config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from strategies.demo_strategy.demo_strategy import DemoStrategy
from strategies.cryptofeed_strategy.cryptofeed_strategy import CryptofeedStrategy
from strategies.trend_follow.trend_follow import TrendFollow
from strategies.twitter_elon_musk_doge_tracker.twitter_elon_musk_doge_tracker import TwitterElonMuskDogeTracker
from strategies.best_strat_ever.best_strategy_ever import BestStrategyEver
from strategies.listing_sniper.listing_sniper import ListingSniper
from strategies.multi_coin_abnormal_volume_tracker.multi_coin_abnormal_volume_tracker \
import MultiCoinAbnormalVolumeTracker

strategy = DemoStrategy()
strategy = MultiCoinAbnormalVolumeTracker()

log = {
"level": "info",
Expand Down
5 changes: 2 additions & 3 deletions ftx_algotrading.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)

logging.info("---------------")
logging.info("%s V%s" % (project_name, project_version))
logging.info(f"{project_name}, {project_version}")
logging.info("---------------")

strategy = application_config.strategy
Expand All @@ -24,7 +24,6 @@
strategy.run()
except KeyboardInterrupt:
strategy.cleanup()
logging.info("/!\\ Keyboard interruption: Stopping %s V%s" % (project_name,
project_version))
logging.info(f"/!\\ Keyboard interruption: Stopping {project_name} V{project_version}")
finally:
pass
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ stockstats~=0.4.1
websocket-client~=0.58.0
gevent~=21.1.2
python-dateutil~=2.8.1

cryptofeed~=2.2.2
4 changes: 4 additions & 0 deletions strategies/best_strat_ever/best_strategy_ever.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ def before_loop(self) -> None:

def loop(self) -> None:
"""The strategy core"""
logging.info("ticker")
logging.info(self.ftx_ws_client.get_ticker("DOGE-PERP"))

response = self.ftx_rest_api.get(f"markets/DOGE-PERP")
logging.info(f"FTX API response: {str(response)}")

response = self.ftx_rest_api.get(f"futures/DOGE-PERP/stats")
logging.info(f"FTX API response: {str(response)}")

logging.info("Retrieving orders")
response = self.ftx_rest_api.get("orders", {"market": "DOGE-PERP"})
logging.info(f"FTX API response: {str(response)}")
Expand Down
99 changes: 99 additions & 0 deletions strategies/cryptofeed_strategy/cryptofeed_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import asyncio
import logging
import queue

from cryptofeed import FeedHandler
from cryptofeed.defines import LIQUIDATIONS
from cryptofeed.exchanges import EXCHANGE_MAP

# Display all received data if set to true. Only the data > min_data_size is displayed otherwise
DISPLAY_ALL_LIQUIDATION_DATA = True


class CryptofeedService(object):
LIQUIDATION_DATA: queue.Queue = queue.Queue()

@staticmethod
def flush_liquidation_data_queue_items(min_data_size: int = 0):
"""
Flush TestStrategy.LIQUIDATION_DATA queue and returns the data having a value >= min_data_size
:return: The data having a size >= min_data_size
"""
items = []

while not CryptofeedService.LIQUIDATION_DATA.empty():
data = CryptofeedService.LIQUIDATION_DATA.get()

if DISPLAY_ALL_LIQUIDATION_DATA:
logging.info(data)

size = round(data.quantity * data.price, 2)

if size >= min_data_size:
end_c = '\033[0m'
side_c = '\033[91m' if data.side == 'sell' else '\33[32m'

size_c = ''

if size > 10_000:
size_c = '\33[32m'
if size > 25_000:
size_c = '\33[33m'
if size > 50_000:
size_c = '\33[31m'
if size > 100_000:
size_c = '\35[35m'

logging.info(f'{data.exchange:<18} {data.symbol:<18} Side: {side_c}{data.side:<8}{end_c} '
f'Quantity: {data.quantity:<10} Price: {data.price:<10} '
f'Size: {size_c}{size:<9}{end_c}') # ID: {data.id} Status: {data.status}')

items.append(data)

return items

@staticmethod
def start_cryptofeed():
async def liquidations(data, receipt):
# Add raw data to TestStrategy.LIQUIDATION_DATA queue
CryptofeedService.LIQUIDATION_DATA.put(data)

# There is no current event loop in thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

f = FeedHandler()
configured = []

# ['BINANCE_DELIVERY', 'BINANCE_FUTURES', 'BITMEX', 'BYBIT', 'DERIBIT', 'FTX']
exchanges = ['BINANCE_FUTURES', 'FTX']

# print(type(EXCHANGE_MAP), EXCHANGE_MAP)
print("Querying exchange metadata")
for exchange_string, exchange_class in EXCHANGE_MAP.items():

if exchange_string not in exchanges:
continue

if exchange_string in ['BITFLYER', 'EXX', 'OKEX']: # We have issues with these exchanges
continue

if LIQUIDATIONS in exchange_class.info()['channels']['websocket']:
configured.append(exchange_string)
print(f"Configuring {exchange_string}...", end='')
symbols = [sym for sym in exchange_class.symbols() if 'PINDEX' not in sym]
# symbols = ['LRC-USDT-PERP']
# print(symbols)
try:
f.add_feed(exchange_class(subscription={LIQUIDATIONS: symbols},
callbacks={LIQUIDATIONS: liquidations}))
print(" Done")
except Exception as e:
print(e, exchange_string)
pass

print(configured)

print("Starting feedhandler for exchanges:", ', '.join(configured))
f.run(install_signal_handlers=False)
65 changes: 65 additions & 0 deletions strategies/cryptofeed_strategy/cryptofeed_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import logging
import threading
import time

from core.strategy.strategy import Strategy
from strategies.cryptofeed_strategy.cryptofeed_service import CryptofeedService
from tools.utils import flatten

SLEEP_TIME_BETWEEN_LOOPS = 10
LIQUIDATION_HISTORY_RETENTION_TIME = 60 * 60 # 1 hour retention


class CryptofeedStrategy(Strategy):
"""The test strategy"""

def __init__(self):
"""The test strategy constructor"""

logging.info("TestStrategy run strategy")
super(CryptofeedStrategy, self).__init__()

self.liquidations = []
self._t: threading.Thread = threading.Thread(target=CryptofeedService.start_cryptofeed)
self._t.start()

def before_loop(self) -> None:
"""Called before each loop"""
pass

def loop(self) -> None:
"""The strategy core loop method"""

# Flush liquidation data received into TestStrategy.LIQUIDATION_DATA queue
# Put in parameters the minimum value of the liquidation you want to retrieve
new_liquidations = CryptofeedService.flush_liquidation_data_queue_items(0)

self.liquidations.append(new_liquidations)

last_1_min_liquidations = flatten(
self.liquidations[-min(len(self.liquidations), 60 // SLEEP_TIME_BETWEEN_LOOPS):])
last_5_min_liquidations = flatten(
self.liquidations[-min(len(self.liquidations), 60 * 5 // SLEEP_TIME_BETWEEN_LOOPS):])

last_1_min_liquidations_value = sum([round(data.quantity * data.price, 2) for data in last_1_min_liquidations])
last_5_min_liquidations_value = sum([round(data.quantity * data.price, 2) for data in last_5_min_liquidations])

logging.info(f'Liquidations in the last 1 minute: {len(last_1_min_liquidations)} for a total value of '
f'${last_1_min_liquidations_value}')
logging.info(f'Liquidations in the last 5 minutes: {len(last_5_min_liquidations)} for a total value of '
f'${last_5_min_liquidations_value}')

# Put your custom logic here
# ...

# Remove values older than LIQUIDATION_HISTORY_RETENTION_TIME
if len(self.liquidations) > LIQUIDATION_HISTORY_RETENTION_TIME // SLEEP_TIME_BETWEEN_LOOPS:
self.liquidations = self.liquidations[-LIQUIDATION_HISTORY_RETENTION_TIME // SLEEP_TIME_BETWEEN_LOOPS:]

def after_loop(self) -> None:
"""Called after each loop"""
time.sleep(SLEEP_TIME_BETWEEN_LOOPS)

def cleanup(self) -> None:
"""Clean strategy execution"""
self._t.join()
1 change: 0 additions & 1 deletion strategies/demo_strategy/demo_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def loop(self) -> None:
wallet["coin"] == 'USD']
logging.info(f"FTX USD Wallet: {str(wallets)}")


def after_loop(self) -> None:
"""Called after each loop"""
logging.info("DemoStrategy after_loop")
Expand Down
4 changes: 4 additions & 0 deletions tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ def expand_var_and_user(path) -> str:
return os.path.expanduser(os.path.expandvars(path))


def flatten(t):
return [item for sublist in t for item in sublist]


def check_fields_in_dict(dictionary, fields, dictionary_name) -> bool:
"""
Check that the fields are in the dict and raise an exception if not
Expand Down

0 comments on commit ef55333

Please sign in to comment.