Skip to content

Commit

Permalink
status_interval parameter implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
tuulos authored and tuulos committed Jan 28, 2009
1 parent 406c1ee commit 023a6fa
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 11 deletions.
11 changes: 10 additions & 1 deletion doc/py/core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ anymore. You can delete the unneeded job files as follows::
:class:`Job` --- Disco job
--------------------------

.. class:: Job(master, [name, input_files, fun_map, map_reader, reduce, partition, combiner, nr_maps, nr_reduces, sort, params, mem_sort_limit, async, clean, chunked, ext_params, required_modules])
.. class:: Job(master, [name, input_files, fun_map, map_reader, reduce, partition, combiner, nr_maps, nr_reduces, sort, params, mem_sort_limit, async, clean, chunked, ext_params, required_modules, status_interval])

Starts a new Disco job. You seldom instantiate this class
directly. Instead, the :meth:`Disco.new_job` is used to start a job
Expand Down Expand Up @@ -357,6 +357,15 @@ anymore. You can delete the unneeded job files as follows::
are required by job functions. Modules listed here are imported to the
functions' namespace.

* *status_interval* - print out "K items mapped / reduced" for
every Nth item. By default 100000. Setting the value to 0 disables
messages.

Increase this value, or set it to zero, if you get "Message rate limit
exceeded" error due to system messages. This might happen if your map /
reduce task is really fast. Decrease the value if you want to follow
your task in more real-time or you don't have many data items.

.. attribute:: Job.name

Name of the job. You can store or transfer the name string if
Expand Down
1 change: 1 addition & 0 deletions node/disco-worker
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ if __name__ == "__main__":
util.data_err("Decoding the job description failed", master_url)

job_name = dw.job_name = util.job_name = m['name']
dw.status_interval = int(m['status_interval'])

my_ver = ".".join(map(str, sys.version_info[:2]))
if m["version"] != my_ver:
Expand Down
9 changes: 6 additions & 3 deletions node/disconode/disco_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
job_name = ""
http_pool = {}

status_interval = 0

def init():
global HTTP_PORT, LOCAL_PATH, PARAMS_FILE, EXT_MAP, EXT_REDUCE,\
MAP_OUTPUT, CHUNK_OUTPUT, REDUCE_DL, REDUCE_SORTED, REDUCE_OUTPUT
Expand Down Expand Up @@ -273,7 +275,7 @@ def list_iterator(self, lst):
for x in lst:
yield x
i += 1
if not i % 100000:
if status_interval and not i % status_interval:
msg("%d entries reduced" % i)
msg("Reduce done: %d entries reduced in total" % i)

Expand All @@ -285,7 +287,8 @@ def multi_file_iterator(self, inputs, progress = True,
for x in reader(fd, sze, fname):
yield x
i += 1
if progress and not i % 100000:
if progress and status_interval and\
not i % status_interval:
msg("%d entries reduced" % i)

if progress:
Expand Down Expand Up @@ -318,7 +321,7 @@ def run_map(job_input, partitions, param):
p = fun_partition(key, nr_reduces, param)
partitions[p].add(key, value)
i += 1
if not i % 100000:
if status_interval and not i % status_interval:
msg("%d entries mapped" % i)

msg("Done: %d entries mapped in total" % i)
Expand Down
2 changes: 2 additions & 0 deletions pydisco/disco/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class Job(object):
"mem_sort_limit": 256 * 1024**2,
"chunked": None,
"ext_params": None,
"status_interval": 100000,
"required_modules": []}

def __init__(self, master, **kwargs):
Expand Down Expand Up @@ -190,6 +191,7 @@ def _run(self, **kw):
"params": cPickle.dumps(d("params")),
"sort": str(int(d("sort"))),
"mem_sort_limit": str(d("mem_sort_limit")),
"status_interval": str(d("status_interval")),
"required_modules": " ".join(d("required_modules"))}

if type(kw["map"]) == dict:
Expand Down
30 changes: 24 additions & 6 deletions test/test_ratelimit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,42 @@
import tserver, sys, random, time
from disco import Disco

def check_dead(job):
if job.jobinfo()['active'] == "dead":
job.purge()
else:
raise Exception("Rate limit failed")

def data_gen(path):
return "badger\n" * 100
return "badger\n" * 1000000

def fun_map(e, params):
msg(e)
return []

def fun_map2(e, params):
return []

tserver.run_server(data_gen)
inputs = tserver.makeurl([1])
job = Disco(sys.argv[1]).new_job(name = "test_ratelimit",
input = inputs, map = fun_map)

time.sleep(5)
check_dead(job)

job = Disco(sys.argv[1]).new_job(name = "test_ratelimit2",
input = inputs, map = fun_map2, status_interval = 1)

time.sleep(5)
check_dead(job)

job = Disco(sys.argv[1]).new_job(name = "test_ratelimit3",
input = inputs, map = fun_map2, status_interval = 0)
job.wait()
job.purge()

print "ok"

if job.jobinfo()['active'] == "dead":
print "ok"
job.purge()
else:
raise Exception("Rate limit failed")


4 changes: 3 additions & 1 deletion test/tserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):

class Handler(SimpleHTTPServer. SimpleHTTPRequestHandler):
def do_GET(self):
d = data_gen(self.path)
self.send_response(200)
self.send_header("Content-length", len(d))
self.end_headers()
self.wfile.write(data_gen(self.path))
self.wfile.write(d)

def makeurl(inputs):
host = "http:https://%s:%d" % (socket.gethostname(), PORT)
Expand Down

0 comments on commit 023a6fa

Please sign in to comment.