Skip to content

Commit

Permalink
Memory space reduction
Browse files Browse the repository at this point in the history
Added trim program to remove job results that were
older than 30 days

reduced the number of results maintained per job to 5

all results for job expire if the job hasn't run for 40 days
  • Loading branch information
plamere committed Feb 24, 2019
1 parent 1d7dd76 commit db2f328
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 3 deletions.
29 changes: 26 additions & 3 deletions server/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
import simplejson as json
import threading
from datetime import datetime
from datetime import datetime,date

class Scheduler(object):
'''
Expand All @@ -25,7 +25,8 @@ def __init__(self, redis, pm):
self.wait_queue = 'sched-wait-queue'
self.max_time = 1000000
self.max_cerrors = 3
self.max_retained_results = 10
self.max_retained_results = 5
self.max_age_results = 30 * 24 * 60 * 60

def schedule(self, auth_code, user, pid, when, delta, total):
# as a pipleline
Expand Down Expand Up @@ -211,9 +212,11 @@ def process_job_result(self, skey, results):
results['oinfo'] = 'generated ' + str(ntracks) + ' tracks'
rkey = mk_sched_key('results', user, pid)
jresults = json.dumps(results, indent=2)
print jresults
# print jresults
self.r.lpush(rkey, jresults)
self.r.ltrim(rkey, 0, self.max_retained_results - 1)
self.r.expire(rkey, self.max_age_results)
show_results(results)


def now(self):
Expand Down Expand Up @@ -255,6 +258,26 @@ def show_info(self):
print 'job queue has', count, 'jobs'


def show_results(result):
try:
print "%s %s %.2f" % (result['status'], fmt_date(result['runtime']), result['time'])
print result['oinfo']
print result['info']
if result['status'] == 'ok':
print result['name']
print result['uri']
else:
print result['message']
print
except:
raise
print "trouble showing formatted result"
print json.dumps(result, indent=2)
print

def fmt_date(ts):
the_date = date.fromtimestamp(ts)
return the_date.strftime("%Y-%m-%d")


def mk_sched_key(op, user, pid):
Expand Down
151 changes: 151 additions & 0 deletions server/trim_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import sys
import redis
import json
import datetime


max_days_to_keep = 30

r = redis.StrictRedis(host='localhost', port=6379, db=0)

unexpired_byte_count = 0
expired_byte_count = 0
total_byte_count = 0


total_jobs = 0
removed_jobs = 0
trimmed_jobs = 0
keep_all_jobs = 0

MAX_AGE_RESULTS = 30 * 24 * 60 * 60

def trim_results(name):
global total_jobs, removed_jobs, trimmed_jobs, keep_all_jobs
results = r.lrange(name, 0, 100)
last_result_to_keep = -1
total_jobs += 1
for i, js in enumerate(results):
results = json.loads(js)
# print name, i, fmt_date(results['runtime'])
age = get_age(results['runtime'])
if age < max_days_to_keep:
last_result_to_keep = i
else:
break
if last_result_to_keep == -1:
removed_jobs += 1
r.delete(name)
else:
r.ltrim(name, 0, last_result_to_keep)
trimmed_jobs += 1

print "%d %s %4d %8d %8d %8d %8d" % (i, fmt_date(results['runtime']), get_age(results['runtime']), total_jobs, removed_jobs, trimmed_jobs, keep_all_jobs)

def count_results(name):
global total_byte_count, expired_byte_count, unexpired_byte_count
results = r.lrange(name, 0, 100)
for i, js in enumerate(results):
total_byte_count += len(js)
results = json.loads(js)
# print name, i, fmt_date(results['runtime'])
age = get_age(results['runtime'])
if age < max_days_to_keep:
unexpired_byte_count += len(js)
else:
expired_byte_count += len(js)
print "%d %s %4d %8d %8d %8d" % (i, fmt_date(results['runtime']), get_age(results['runtime']), unexpired_byte_count, expired_byte_count, total_byte_count)

def show_results(name):
results = r.lrange(name, 0, 100)
ttl = r.ttl(name)
for i, js in enumerate(results):
results = json.loads(js)
# print json.dumps(results, indent=4)
print "TTL", ttl
show_result(results)

def expire_key(name):
r.expire(name, MAX_AGE_RESULTS)


def show_result(result):
print "%s %s %.2f" % (result['status'], fmt_date(result['runtime']), result['time'])
print result['oinfo']
print result['info']
if result['status'] == 'ok':
print result['name']
print result['uri']
else:
print result['message']
print

def fmt_date(ts):
date = datetime.date.fromtimestamp(ts)
return date.strftime("%Y-%m-%d")

def get_age(ts):
date = datetime.date.fromtimestamp(ts)
now = datetime.date.today()
delta = now - date
return delta.days


def trim():
cursor, knames = r.scan(0, match="sched-results-*")
for name in knames:
trim_results(name)

while cursor != 0:
cursor, knames = r.scan(cursor, match="sched-results-*")
for name in knames:
trim_results(name)

def count():
cursor, knames = r.scan(0, match="sched-results-*")
for name in knames:
count_results(name)

while cursor != 0:
cursor, knames = r.scan(cursor, match="sched-results-*")
for name in knames:
count_results(name)

def show():
cursor, knames = r.scan(0, match="sched-results-*")
for name in knames:
show_results(name)

while cursor != 0:
cursor, knames = r.scan(cursor, match="sched-results-*")
for name in knames:
show_results(name)

def expire():
cursor, knames = r.scan(0, match="sched-results-*")
for name in knames:
expire_key(name)

while cursor != 0:
cursor, knames = r.scan(cursor, match="sched-results-*")
for name in knames:
expire_key(name)


if __name__ == '__main__':

args = sys.argv[1:]

while args:
arg = args.pop(0)

if arg == '--trim':
trim()
elif arg == '--count':
count()
elif arg == '--show':
show()
elif arg == '--expire':
expire()
else:
print "unknown arg", arg

0 comments on commit db2f328

Please sign in to comment.