Simple, clean, light python task distribution. No magic, no configuration, no broker abstraction
The library uses Redis as broker
Usage:
import redis
from workload import distributed
REDIS_POOL = redis.ConnectionPool()
@distributed('worker', redis_pool=REDIS_POOL)
def worker(job, country):
# do stuff
job.result('result') # add unique result
job.fanout(['task3', 'task4']) # schedule another tasks if needed
job.error('oops') # raise and catch exception
from jobs import worker
worker.distribute(['task1', 'task2'])
worker.wait_results()
from jobs import worker
worker.start_processing(concurrency=10)
workload.cycle is a loop which starts deferred tasks at specified time or interval
Usage:
from workload.cycle import cycle
from jobs import do_work
cycle([
# Run job every hour
(cycle.interal(hours=1), do_work),
# Run job every day at midnight
(cycle.at(hour=0), do_work),
])
In order to monitor deferred and distributed jobs an autogenerated admin can be used.
Frontend is written using jQuery and Bootstrap both served from corresponding official CDNs.
Since we want to keep default install as clean as possible, to run the admin the following dependencies need to be installed manually: falcon jinja2
The result of the create_admin_app will be WSGI app which can be served by any WSGI application server (uWSGI, gunicorn, werkzeug to name a few)
Admin table fields description:
Field | Description |
---|---|
Name | Fully qualified python function name |
Description | Function docstring |
Workload | Number of queued deferred jobs |
Field | Description |
---|---|
Name | Fully qualified python function name |
Description | Function docstring |
Workload | Amount of workload to process |
Duration | Current and last (smaller one) duration of job |
Workers | Amount of active workers processing the distributed job |
Usage:
from workload.admin import create_admin_app, START_DEFER
app = create_admin_app(
'/admin', # url prefix
[
# START_DEFER is a shortcut for lambda job: job.defer()
(deferred_worker, START_DEFER),
# without starting function job will be displayed without Start button
deferred_readonly_worker,
# starting function could be used for debug as well
(distributed_worker, lambda job: job.distribute(test_workload)),
],
title='My admin', # the name will be displayed at the top of the admin
redis_pool=REDIS_POOL, # redis pool to display server info, can be ommited
credentials=('user', secret'), # add credentials to enable basic auth
)
# to start local dev server
if __name__ == '__main__':
from werkzeug.serving import run_simple
run_simple('localhost', 8000, app, use_reloader=True)