Skip to content

Commit

Permalink
Better qa automation, processor option to reverse
Browse files Browse the repository at this point in the history
Improved the QA report automation (e.g, clear old reports when a
processing job starts). Also added an option to the processor to do the
newest job first rather the the default of oldest first.
  • Loading branch information
rfdougherty committed Mar 6, 2014
1 parent 64c7ade commit a2b1d38
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
2 changes: 1 addition & 1 deletion nimsgears/model/nims.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ def from_mrfile(cls, mrfile):
phase_encode_undersample = mrfile.phase_encode_undersample,
slice_encode_undersample = mrfile.slice_encode_undersample,
acquisition_matrix = unicode(str(mrfile.acquisition_matrix)),
qa_status=u'pending',
qa_status = u'pending',
# to unpack fov, mm_per_vox, and acquisition_matrix: np.fromstring(str(mm)[1:-1],sep=',')
)
return epoch
Expand Down
15 changes: 11 additions & 4 deletions nimsproc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

class Processor(object):

def __init__(self, db_uri, nims_path, physio_path, task, filters, max_jobs, max_recon_jobs, reset, sleeptime, tempdir):
def __init__(self, db_uri, nims_path, physio_path, task, filters, max_jobs, max_recon_jobs, reset, sleeptime, tempdir, newest):
super(Processor, self).__init__()
self.nims_path = nims_path
self.physio_path = physio_path
Expand All @@ -36,6 +36,7 @@ def __init__(self, db_uri, nims_path, physio_path, task, filters, max_jobs, max_
self.max_recon_jobs = max_recon_jobs
self.sleeptime = sleeptime
self.tempdir = tempdir
self.newest = newest

self.alive = True
init_model(sqlalchemy.create_engine(db_uri))
Expand All @@ -52,7 +53,10 @@ def run(self):
query = query.filter(Job.task==self.task)
for f in self.filters:
query = query.filter(eval(f))
job = query.filter(Job.status==u'pending').order_by(Job.id).with_lockmode('update').first()
if self.newest:
job = query.filter(Job.status==u'pending').order_by(Job.id.desc()).with_lockmode('update').first()
else:
job = query.filter(Job.status==u'pending').order_by(Job.id).with_lockmode('update').first()

if job:
if isinstance(job.data_container, Epoch):
Expand Down Expand Up @@ -183,6 +187,8 @@ def find(self):
def process(self):
self.clean(self.job.data_container, u'derived')
self.clean(self.job.data_container, u'web')
self.clean(self.job.data_container, u'qa')
self.data_container.qa_status = u'rerun'
self.job.activity = u'generating NIfTI / running recon'
log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity))
transaction.commit()
Expand Down Expand Up @@ -262,7 +268,7 @@ def process(self):
if pf is not None:
criteria = pf.prep_convert()
if criteria != None:
q = Epoch.query.filter(Epoch.session==self.job.data_container.session)
q = Epoch.query.filter(Epoch.session==self.job.data_container.session).filter(Epoch.trashtime == None)
for fieldname,value in criteria.iteritems():
q = q.filter(getattr(Epoch,fieldname)==unicode(value))
epochs = [e for e in q.all() if e!=self.job.data_container]
Expand Down Expand Up @@ -322,6 +328,7 @@ def __init__(self):
self.add_argument('-f', '--logfile', help='path to log file')
self.add_argument('-l', '--loglevel', default='info', help='log level (default: info)')
self.add_argument('-q', '--quiet', action='store_true', default=False, help='disable console logging')
self.add_argument('-n', '--newest', action='store_true', default=False, help='do newest jobs first')


if __name__ == '__main__':
Expand All @@ -331,7 +338,7 @@ def __init__(self):

args = ArgumentParser().parse_args()
nimsutil.configure_log(args.logfile, not args.quiet, args.loglevel)
processor = Processor(args.db_uri, args.nims_path, args.physio_path, args.task, args.filter, args.jobs, args.reconjobs, args.reset, args.sleeptime, args.tempdir)
processor = Processor(args.db_uri, args.nims_path, args.physio_path, args.task, args.filter, args.jobs, args.reconjobs, args.reset, args.sleeptime, args.tempdir, args.newest)

def term_handler(signum, stack):
processor.halt()
Expand Down
23 changes: 23 additions & 0 deletions nimsproc/qa_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def generate_qa_report(epoch_id, nimspath, force=False, spike_thresh=6., nskip=6
return

print('%s epoch id %d (%s) QA: Starting QA report...' % (time.asctime(), epoch_id, str(epoch)))
# FIXME: use ds.kind==u'qa'
qa_ds = [ds for ds in epoch.datasets if ds.filetype==u'json' and ds.label==u'QA']
if len(qa_ds)>0:
if force:
Expand Down Expand Up @@ -168,6 +169,7 @@ def generate_qa_report(epoch_id, nimspath, force=False, spike_thresh=6., nskip=6
transrot[:,3:] *= 180./np.pi
qa_ds = Dataset.at_path(nimspath, u'json')
qa_ds.filenames = [u'qa_report.json']
qa_ds.kind = u'qa'
qa_ds.container = epoch
outfile = os.path.join(nimspath, qa_ds.relpath, qa_ds.filenames[0])
print("%s epoch id %d (%s) QA: writing report to %s..." % (time.asctime(), epoch_id, str(epoch), outfile))
Expand All @@ -190,6 +192,27 @@ def generate_qa_report(epoch_id, nimspath, force=False, spike_thresh=6., nskip=6
transaction.commit()
return

def clean_up(nims_path):
ep = Epoch.query.filter((Epoch.qa_status==u'done')).filter(Epoch.scan_type==scan_type).order_by(Epoch.timestamp.desc()).all()
eids = [e.id for e in ep]
for eid in eids:
epoch = Epoch.get(eid)
qa_ds = [ds for ds in epoch.datasets if ds.filetype==u'json' and ds.label==u'QA']
if len(qa_ds)==0:
epoch.qa_status=u'rerun'
else:
n = len(qa_ds)
for ds in qa_ds:
if not os.path.exists(os.path.join(nimspath, ds.relpath, ds.filenames[0])):
if os.path.isdir(os.path.join(nimspath, ds.relpath)):
shutil.rmtree(os.path.join(nimspath, ds.relpath))
ds.delete()
n -= 1
if n==0:
epoch.qa_status=u'rerun'
transaction.commit()
return

def run_a_job(nims_path, scan_type, spike_thresh, nskip):
# Get the latest functional epoch without qa and try it. (.desc() means descending order)
# We need to lock the column so that another process doesn't pick this one up before we have a chance to
Expand Down

0 comments on commit a2b1d38

Please sign in to comment.