Skip to content

tomgrek/mlq

Repository files navigation

MLQ, a queue for ML jobs

MLQ is a job queueing system, and framework for workers to process queued jobs, providing an easy way to offload long running jobs to other computers.

You've got an ML model and want to deploy it. Meaning that, you have a web app and want users to be able to re-train the model, and that takes a long time. Or perhaps even inference takes a long time. Long, relative to the responsiveness users expect from webapps, meaning, not immediate.

You can't do this stuff direct from your Flask app, because it would lock up the app and not scale beyond a couple of users.

The solution is to enqueue the user's request, and until then, show the user some loading screen, or tell them to check back in a few minutes. The ML stuff happens in the background, in a separate process, or perhaps on a different machine. When it's done, the user is notified (maybe via websockets; maybe their browser is polling at intervals).

Or perhaps your company has a limited resource, such as GPUs, and you need a solution for employees to access them from Jupyter one-by-one.

MLQ is designed to provide a performant, reliable, and most of all easy to use, queue and workers to solve the above common problems.

It's in Python 3.6+, is built on asyncio, and uses Redis as a queue backend.

Usage

pip install mlq

Requirements

You need access to a running Redis instance, for example apt install redis-server will get you one at localhost:6379, otherwise there is AWS's Elasticache and many other options.

Quick Start

This assumes: you have a web app with a Python backend. For a complete example, see here. In brief:

import time
from mlq.queue import MLQ

# Create MLQ: namespace, redis host, redis port, redis db
mlq = MLQ('example_app', 'localhost', 6379, 0)

job_id = mlq.post({'my_data': 1234})
result = None
while not result:
    time.sleep(0.1)
    job = mlq.get_job(job_id)
    result = job['result']

Then, of course you need a worker, or many workers, processing the job. Somewhere else (another terminal, a screen session, another machine, etc):

import asyncio
from mlq.queue import MLQ

mlq = MLQ('example_app', 'localhost', 6379, 0)

def simple_multiply(params_dict, *args):
    return params_dict['my_data'] * 2

async def main():
    print("Running, waiting for messages.")
    mlq.create_listener(simple_multiply)

if __name__ == '__main__':
    asyncio.run(main())

Job Lifecycle

  1. Submit a job with MLQ. Optionally, specify a callback URL that'll be hit, with some useful query params, once the job has been processed.
  2. Job goes into a queue managed by MLQ. Jobs are processed first-in, first-out (FIFO).
  3. Create a worker (or maybe a worker already exists). Optimally, create many workers. They all connect to a shared Redis instance.
  4. A worker (sorry, sometimes I call it a consumer), or many workers in parallel, will pick jobs out of the queue and process them by feeding them into listener functions that you define.
  5. As soon as a worker takes on the processing of some message/data, that message/data is moved into a processing queue. So, if the worker fails midway through, the message is not lost.
  6. Worker stores the output of its listener functions and hits the callback with a result. Optionally, a larger result -- perhaps binary -- is stored in Redis, waiting to be picked up and served by a backend API.
  7. Ask MLQ for the job result, if the callback was not enough for you.

Alternatively, the worker might fail to process the job before it gets to step 6. Maybe the input data was invalid, maybe it was a bad listener function; whatever happened, there was an exception:

  1. MLQ will move failed jobs into a dead letter queue - not lost, but waiting for you to fix the problem.

In another case, maybe the worker dies midway through processing a message. When that happens, also the job is not lost. It might just be a case of, for example, the spot instance price just jumped and your worker shut down. MLQ provides a reaper thread that can be run at regular intervals to requeue jobs whose processing has stalled. If the job is requeued enough times that it exceeds some threshold you've specified, something is wrong - it'll be moved to the dead letter queue.

Listener functions: arguments and return values

When creating a worker, you (probably should) give it a listener function to execute on items pulled from the queue. In the example above, that function is called simple_multiply. Its signature is simple_multiply(params_dict, *args). What is that?

  • params_dict is the original message that was pushed to the queue; in the example it'd be this dictionary: {'my_data': 1234}. It doesn't have to be a dict, rather, it's whatever you posted to the queue, so it could be an int or a string; any serializable type (it needs to be serializable because in the Redis queue it has to be stored as a string). Speaking of which, internally, MLQ does its serialization with MessagePack, which supports binary and is faster than JSON.

  • args is a dict with several useful things in. Most of them are concerned with distributed computing utilities (documented below) and so can safely be ignored by most users. But, you also get access to the dict args['full_message'] and the function args['update_progress']. The full message provides the queued message as MLQ sees it, including some possibly useful things:

{
    'id': msg_id,
    'timestamp': timestamp, # when the message was enqueued
    'worker': None, # uuid of the worker it's being processed on (if any)
    'processing_started': None, # when processing started on the worker
    'processing_finished': None,
    'progress': None, # an integer from -1 (failed), 0 (not started), to 100 (complete)
    'short_result': None, # the stored short result (if any)
    'result': None, # the stored full result (if any)
    'callback': callback, # URL to callback, if any, that user specified when posting the message
    'retries': 0, # how many times this job has attempted to be processed by workers and requeued
    'functions': functions, # w