From 537e841664a263f55987bdb7862eed1952b23dc9 Mon Sep 17 00:00:00 2001 From: "Kevin S. Hahn" Date: Wed, 24 Sep 2014 15:13:00 -0700 Subject: [PATCH 1/5] adapt nims1 to use nimsdata2 --- apache.conf | 20 +- nimsdata | 2 +- nimsgears/controllers/root.py | 6 +- nimsgears/controllers/search.py | 4 +- nimsgears/model/nims.py | 22 +- nimsproc/nimsphysio.py | 611 ++++++++++++++++++++++++++++++++ nimsproc/processor.py | 117 +++--- nimsproc/scheduler.py | 2 +- nimsproc/sorter.py | 138 +++----- nimsutil/nimsutil.py | 34 +- 10 files changed, 780 insertions(+), 176 deletions(-) create mode 100644 nimsproc/nimsphysio.py diff --git a/apache.conf b/apache.conf index 00dcab3..da677ed 100644 --- a/apache.conf +++ b/apache.conf @@ -1,28 +1,20 @@ -WSGIDaemonProcess nims processes=4 threads=16 maximum-requests=1000 display-name=%{GROUP} +WSGIDaemonProcess nims user=nims group=nims processes=2 threads=4 maximum-requests=1000 display-name=%(GROUP) + WSGIApplicationGroup %{GLOBAL} WSGIProcessGroup nims -Redirect /nimsgears /nims -WSGIScriptAlias /nims /var/local/nims/nimsgears/public/nims.wsgi +WSGIScriptAlias /nims /var/local/nims/nimsgears/public/nims.wsgi -Alias /nims/images /var/local/nims/nimsgears/public/images -Alias /nims/css /var/local/nims/nimsgears/public/css -Alias /nims/javascript /var/local/nims/nimsgears/public/javascript -Alias /nims/static /var/local/nims/nimsgears/public/static +Alias /nims/images /var/local/nims/nimsgears/public/images +Alias /nims/css /var/local/nims/nimsgears/public/css +Alias /nims/javascript /var/local/nims/nimsgears/public/javascript AddType application/octet-stream .7 .bvec .bval .dcm .dat - - Order allow,deny - Allow from all - Options Indexes FollowSymLinks - AllowOverride AuthConfig - - AuthType WebAuth Require valid-user diff --git a/nimsdata b/nimsdata index eeeeb37..687554c 160000 --- a/nimsdata +++ b/nimsdata @@ -1 +1 @@ -Subproject commit eeeeb37d8470e9bffe9e2805f91b433c9e94a603 +Subproject commit 687554c7df81faafb68ea963cdbd8da206956fe0 diff --git a/nimsgears/controllers/root.py b/nimsgears/controllers/root.py index d6ad8d6..bfe18c0 100644 --- a/nimsgears/controllers/root.py +++ b/nimsgears/controllers/root.py @@ -11,7 +11,7 @@ from tg.i18n import ugettext as _, lazy_ugettext as l_ import webob.exc -import nimsdata +import nimsdata.medimg.nimsmontage import nimsutil from nimsgears.model import * @@ -89,7 +89,7 @@ def pyramid(self, **kwargs): ds = Dataset.get(kwargs['dataset_id']) if user.has_access_to(ds): db_file = os.path.join(store_path, ds.relpath, ds.filenames[0]) - return dict(zip(['dataset_id', 'tile_size', 'x_size', 'y_size'], (ds.id,) + nimsdata.nimsmontage.get_info(db_file))) + return dict(zip(['dataset_id', 'tile_size', 'x_size', 'y_size'], (ds.id,) + nimsdata.medimg.nimsmontage.get_info(db_file))) @expose('nimsgears.templates.qa_report', render_params={'doctype': None}) def qa_report(self, **kwargs): @@ -156,7 +156,7 @@ def pyramid_tile(self, *args): response.etag = args[0] response.cache_control = 'max-age = 86400' response.last_modified = ds.updatetime - return nimsdata.nimsmontage.get_tile(os.path.join(store_path, ds.relpath, ds.filenames[0]), z, x, y) + return nimsdata.medimg.nimsmontage.get_tile(os.path.join(store_path, ds.relpath, ds.filenames[0]), z, x, y) @expose(content_type='application/octet-stream') def file(self, **kwargs): diff --git a/nimsgears/controllers/search.py b/nimsgears/controllers/search.py index ba65d2b..7abe62d 100644 --- a/nimsgears/controllers/search.py +++ b/nimsgears/controllers/search.py @@ -10,7 +10,7 @@ import numpy import datetime -import nimsdata +import nimsdata.medimg.dcm.mr.generic_mr from nimsgears.model import * from nimsgears.controllers.nims import NimsController @@ -97,7 +97,7 @@ def index(self): epoch_columns = [('Group', 'col_sunet'), ('Experiment', 'col_exp'), ('Date & Time', 'col_datetime'), ('Exam', 'col_exam'), ('Type Scan', 'col_scantype'), ('Description', 'col_desc')] dataset_columns = [('Data Type', 'col_type')] - scantype_values = [''] + sorted(nimsdata.nimsmrdata.scan_types.all) + scantype_values = [''] + sorted(nimsdata.medimg.dcm.mr.generic_mr.scan_types.all) psd_names_tuples = DBSession.query(Epoch.psd).distinct(Epoch.psd) psd_values = [''] + sorted([elem[0] for elem in psd_names_tuples]) return dict(page='search', diff --git a/nimsgears/model/nims.py b/nimsgears/model/nims.py index eb2ff7e..ad2c6b9 100644 --- a/nimsgears/model/nims.py +++ b/nimsgears/model/nims.py @@ -293,7 +293,7 @@ def __unicode__(self): class Job(Entity): timestamp = Field(DateTime, default=datetime.datetime.now) - status = Field(Enum(u'pending', u'running', u'done', u'failed', u'abandoned', name=u'job_status')) + status = Field(Enum(u'pending', u'running', u'done', u'failed', u'abandoned', 'rerun', name=u'job_status')) task = Field(Enum(u'find', u'proc', u'find&proc', name=u'job_task')) needs_rerun = Field(Boolean, default=False) progress = Field(Integer) @@ -509,13 +509,13 @@ def __unicode__(self): @classmethod def from_mrfile(cls, mrfile): - subj_code, group_name, exp_name = nimsutil.parse_patient_id(mrfile.patient_id, ResearchGroup.all_ids()) + subj_code, group_name, exp_name = nimsutil.parse_patient_id__(mrfile.subj_code, mrfile.group_name, mrfile.project_name, ResearchGroup.all_ids()) query = cls.query.join(Experiment, cls.experiment).filter(Experiment.name == exp_name) query = query.join(ResearchGroup, Experiment.owner).filter(ResearchGroup.gid == group_name) if subj_code: subject = query.filter(cls.code==subj_code).first() elif mrfile.subj_firstname and mrfile.subj_lastname: - subject = query.filter(cls.firstname==mrfile.subj_firstname).filter(cls.lastname==mrfile.subj_lastname).filter(cls.dob==mrfile.subj_dob).first() + subject = query.filter(cls.firstname==unicode(mrfile.subj_firstname)).filter(cls.lastname==unicode(mrfile.subj_lastname)).filter(cls.dob==mrfile.subj_dob).first() else: subject = None if not subject: @@ -525,8 +525,8 @@ def from_mrfile(cls, mrfile): experiment=experiment, person=Person(), code=subj_code[:31] or experiment.next_subject_code, - firstname=mrfile.subj_firstname[:63], - lastname=mrfile.subj_lastname[:63], + firstname=(unicode(mrfile.subj_firstname) or u'')[:63], + lastname=(unicode(mrfile.subj_lastname) or u'')[:63], dob=mrfile.subj_dob, ) return subject @@ -605,7 +605,7 @@ def from_mrfile(cls, mrfile): # central authority and/or querying the schedule database. But for now, # just let the operator be None if the user isn't already in the system. operator = User.by_uid(unicode(mrfile.operator), create=False) - session = Session(uid=uid, exam=mrfile.exam_no, subject=subject, operator=operator) + session = Session(uid=uid, exam=mrfile.exam_no if isinstance(mrfile.exam_no, int) else 0, subject=subject, operator=operator) return session @classmethod @@ -708,13 +708,13 @@ def from_mrfile(cls, mrfile): epoch = cls.query.filter_by(uid=uid).filter_by(acq=mrfile.acq_no).first() if not epoch: session = Session.from_mrfile(mrfile) - if session.timestamp is None or session.timestamp > mrfile.timestamp: + if session.timestamp is None or (mrfile.timestamp is not None and session.timestamp > mrfile.timestamp): session.timestamp = mrfile.timestamp epoch = cls( session = session, timestamp = mrfile.timestamp, - duration = datetime.timedelta(0, mrfile.duration), - prescribed_duration = datetime.timedelta(0, mrfile.prescribed_duration), + duration = datetime.timedelta(0, mrfile.duration or 0), + prescribed_duration = datetime.timedelta(0, mrfile.prescribed_duration or 0), uid = uid, series = mrfile.series_no, acq = mrfile.acq_no, @@ -758,11 +758,11 @@ def toplevel_query(cls): @property def name(self): - return '%d_%d_%d' % (self.session.exam, self.series, self.acq) + return '%d_%d%s' % (self.session.exam, self.series, '_%d' % self.acq if self.acq is not None else '') @property def dirname(self): - return '%d_%d_%s' % (self.series, self.acq, self.description) + return '%d%s_%s' % (self.series, '_%d' % self.acq if self.acq is not None else '', self.description) @property def contains_trash(self): diff --git a/nimsproc/nimsphysio.py b/nimsproc/nimsphysio.py new file mode 100644 index 0000000..2c3a7e7 --- /dev/null +++ b/nimsproc/nimsphysio.py @@ -0,0 +1,611 @@ +#!/usr/bin/env python +# +# @author: Bob Dougherty +# (Note that the regressor computation code was mostly transcribed from Catie Chang's +# Matlab implementation of retroicor_rvhr.) + +""" +The CNI physiological data procesor. Takes physio data (cardiac and respiration), +cleans it to be synchronous with the scan, and computes retroicor and rvhrcor regressors. +See: + +* Glover GH, Li TQ, Ress D. Image-based method for retrospective correction of + physiological motion effects in fMRI: RETROICOR. Magn Reson Med. 2000 Jul;44(1):162-7. + PubMed PMID: 10893535 + +* Chang C, Cunningham JP, Glover GH. Influence of heart rate on the BOLD signal: + the cardiac response function. Neuroimage. 2009 Feb 1;44(3):857-69. doi: + 10.1016/j.neuroimage.2008.09.029. Epub 2008 Oct 7. PubMed PMID: 18951982 +""" + +import gzip +import json +import logging +import nibabel +import tarfile +import zipfile +import argparse +import datetime +import warnings +import itertools +import numpy as np +import bson.json_util + +from nimsdata import nimsdata + +log = logging.getLogger('nimsphysio') + + +class NIMSPhysioError(nimsdata.NIMSDataError): + pass + + +class NIMSPhysio(nimsdata.NIMSReader): + """ + Read and process physiological data recorded during an MR scan. + + This class reads the physio data and generates RETROICOR and RETORVHR + regressors from the data. + + Takes either a list of the physio files or a filename that points to a + zip or tgz file containing the files. + + If tr and/or nframes are missing, the data will not be properly time-shifted + to the start of the scan and the regressors won't be valid. + + Ideally, you should specify the slice_order, in which case, num_slices can be + omitted since it will be inferred from the slice_order list. If you don't, + the code will assume a standard interleaved acquisition. If neither slice_order + nor num_slices is specified, the regressors can't be computed. + + Example: + import physio + p = physio.PhysioData(filename='physio.zip', tr=2, nframes=120, nslices=36) + p.generate_regressors(outname='retroicor.csv') + """ + domain = 'mr' + filetype = 'gephysio' + parse_priority = 7 + required_metadata_fields = ['group', 'experiment', 'session', 'epoch', 'timestamp'] + state = ['orig'] + + # TODO: simplify init to take no args. We need to add the relevant info to the json file. + def __init__(self, filenames, tr=2, nframes=100, slice_order=None, nslices=1, card_dt=0.01, resp_dt=0.04): + # does not use super(NIMSPhysio, self).__init__(filename) because nimsphysio expects a LIST of inputs + self._schema_init(self.project_properties) + self._schema_init(self.session_properties) + self._schema_init(self.acquisition_properties) + self.data = None + self.metadata_status = 'empty' + + # The is_valid method uses some crude heuristics to detect valid data. + # To be valid, the number of temporal frames must be reasonable, and either the cardiac + # standard deviation or the respiration low-frequency power meet the following criteria. + self.min_number_of_frames = 8 + self.min_card_std = 4. + self.min_resp_lfp = 40. + # FIXME: How to infer the file format automatically? + self.format_str = 'ge' + self.tr = float(tr) + self.nframes = nframes + if slice_order == None: + # Infer a standard GE interleave slice order + self.slice_order = np.array(range(0, nslices, 2) + range(1, nslices, 2)) + log.warning('No explicit slice order set; inferring interleaved.') + else: + self.slice_order = np.array(slice_order) + self.card_wave = None + self.card_trig = None + self.card_dt = float(card_dt) + self.card_time = None + self.heart_rate = None + self.resp_wave = None + self.resp_trig = None + self.resp_dt = float(resp_dt) + self.resp_time = None + self.regressors = None + self.phases = None + self.scan_duration = (self.nframes or 1) * self.tr + self.exam_uid = '' + self.series_uid = '' + self.series_no = '' + self.acq_no = '' + #self.subj_firstname = None + #self.subj_lastname = None + #self.subj_dob = None + #self.subj_sex = None + try: + if self.format_str=='ge': + self.read_ge_data(filenames) + else: + raise NIMSPhysioError('only GE physio format is currently supported') + # insert other vendor's read_data functions here + except Exception as e: + raise NIMSPhysioError(e) + + def read_ge_data(self, filename): + archive = None + if isinstance(filename, basestring): + with open(filename, 'rb') as fp: + magic = fp.read(4) + if magic == '\x50\x4b\x03\x04': + archive = zipfile.ZipFile(filename) + files = [(fn, archive.open(fn)) for fn in archive.namelist()] + elif magic[:2] == '\x1f\x8b': + archive = tarfile.open(filename, 'r:*') + files = [(fn, archive.extractfile(archive.getmember(fn))) for fn in archive.getnames()] + else: + raise NIMSPhysioError('only tgz and zip files are supported') + else: + files = [(fn, open(fn)) for fn in filename] # assume that we were passed a list of filenames + for fn, fd in files: + for substr, attr in ( + ('RESPData', 'resp_wave'), + ('RESPTrig', 'resp_trig'), + ('PPGData', 'card_wave'), + ('PPGTrig', 'card_trig'), + ): + if substr in fn: + with warnings.catch_warnings(): + warnings.simplefilter('ignore') + setattr(self, attr, np.loadtxt(fd)) + break + else: + if fn.endswith('_physio.json'): + metadata = json.load(fd, object_hook=bson.json_util.object_hook) + for f in self.required_metadata_fields: + if f not in metadata: + raise NIMSPhysioError('incomplete json file') + for attribute, value in metadata.iteritems(): + if isinstance(value, datetime.datetime): + value = value.replace(tzinfo=None) + setattr(self, attribute, value) + if archive: + archive.close() + + if self.resp_wave!=None and self.card_wave!=None: + # move time zero to correspond to the start of the fMRI data + offset = self.resp_dt * self.resp_wave.size - self.scan_duration + self.resp_time = self.resp_dt * np.arange(self.resp_wave.size) - offset + + offset = self.card_dt * self.card_wave.size - self.scan_duration + self.card_time = self.card_dt * np.arange(self.card_wave.size) - offset + self.card_trig = self.card_trig * self.card_dt - offset + self.hr_instant = 60. / np.diff(self.card_trig) + + @classmethod + def derived_metadata(cls, orig_metadata): + return {f: getattr(orig_metadata, 'nims_'+f) for f in cls.required_metadata_fields} + + def load_data(self): + pass + + @property + def nims_group_id(self): + return self.group + + @property + def nims_project(self): + return self.experiment + + @property + def nims_session_id(self): + return self.session + + @property + def nims_session_label(self): + pass + + @property + def nims_session_subject(self): + pass + + @property + def nims_acquisition_id(self): + return self.epoch + + @property + def nims_acquisition_description(self): + pass + + @property + def nims_acquisition_label(self): + return '%d.%d' % (self.series_no, self.acq_no) if self.acq_no is not None else str(self.series_no) + + @property + def nims_type(self): + return ('original', 'physio', self.filetype) + + @property + def nims_file_name(self): + return self.nims_epoch + '_' + self.filetype + + @property + def nims_file_ext(self): + return '' + + @property + def nims_file_domain(self): + return self.domain + + @property + def nims_file_type(self): + return self.filetype + + @property + def nims_file_kinds(self): + return ['resp', 'ecg'] + + @property + def nims_file_state(self): + pass + + @property + def nims_type(self): + return ('original', 'physio', self.filetype) + + @property + def nims_timestamp(self): # FIXME: should return UTC time and timezone + return self.timestamp.replace(tzinfo=bson.tz_util.FixedOffset(-7*60, 'pacific')) #FIXME: use pytz + + @property + def nims_timezone(self): + return None + + @property + def card_trig_chopped(self): + # find the first trigger that is >0 + start_ind = np.argmax(self.card_trig>0) + return self.card_trig[start_ind:] + + @property + def resp_wave_chopped(self): + start_ind = np.argmax(self.resp_time>0) + return self.resp_wave[start_ind:] + + def compute_regressors(self, legacy_rvhr=False, hr_min=30, hr_max=180): + """ + * catie chang, catie.chang@nih.gov + * bob dougherty, bobd@stanford.edu + + * 2011.12.13: original matlab implementation (catie) + * 2012.02.14: modified from retroicor_main.m. This version + optionally includes RVHRcor regressors too! (RV*RRF, HR*CRF, + + time derivatives). (catie, feeling the love) + * 2012.12.14: translated to Python (bob) + + See the following for background: + Glover et al., 2000: MRM 44, 162-167. + Birn et al., 2006: Neuroimage 31, 1536-1548. + Chang et al., 2009: Neuroimage 47, 1448-1459 (appendix A) + Chang et al., 2009: Neuroimage 44, 857-869 + + --------------------------- + INPUTS: + --------------------------- + legacy_rvhr: True to use Catie's original algorithm for computing heartrate, + false to use Bob's algorithm, which should be more robust. + hr_min, hr_max: For Bob's heartrate algorithm, heartrate values outside this + range will be discarded. (Has no effect if legacy_rvhr=True) + + The following are set as instance vars: + * slice order: vector indicating order of slice acquisition + (e.g. [30 28 26, .... 29 27 ... 1] for 30 "interleaved down" slices) + * tr: in seconds + * nframes: number of frames in the timeseries + * card_trig: vector of cardiac (R-wave peak) times, in seconds. + * resp_wave: respiration amplitude signal + * resp_dt: sampling interval between the points in respiration + amplitude signal (in seconds, e.g. resp_dt=0.04 for 25 Hz sampling) + + (** setting card_trig = [] will ignore cardiac in both corrections) + (** setting resp_wave = [] will ignore respiration in both corrections) + + --------------------------- + OUTPUTS: + --------------------------- + * self.phases: list of cardiac & respiration phases for each slice (numpy arrays). + phases[i,:,0] contains the cardiac phase for slice "i" and + phases[i,:,1] contains the resp phases for slice "i". + * self.regressors: retroicor & rvhrcor regressors as [#timepoints x #regressors x #slices]. + I.e., the regressors for slice "i" are the columns of REGRESSORS[:,:,i]. + * + """ + import scipy.stats + import scipy.signal + + if self.nframes < 3: + self.regressors = None + log.warning('Need at least 3 temporal frames to compute regressors!') + return + + resp_wave = self.resp_wave_chopped + card_trig = self.card_trig_chopped + + t_win = 6 * 0.5 # 6-sec window for computing RV & HR, default + nslc = len(self.slice_order) + + # Find the derivative of the respiration waveform + # shift to zero-min + resp_wave = resp_wave - resp_wave.min() + # bin respiration signal into 100 values + Hb,bins = np.histogram(resp_wave, 100) + # calculate the derivative + # first, filter respiratory signal - just in case + f_cutoff = 1. # max allowable freq + fs = 1. / self.resp_dt; + wn = f_cutoff / (fs / 2) + ntaps = 20 + b = scipy.signal.firwin(ntaps, wn) + respfilt = scipy.signal.filtfilt(b, [1], resp_wave) + drdt = np.diff(respfilt) + + # -------------------------------------------------------------- + # find cardiac and respiratory phase vectors + # -------------------------------------------------------------- + self.phases = np.zeros((nslc, self.nframes, 2)) + for sl in range(nslc): + # times (for each frame) at which this slice was acquired (midpoint): + cur_slice_acq = (sl==self.slice_order).nonzero()[0][0] + slice_times = np.arange((self.tr/nslc)*(cur_slice_acq+0.5), self.scan_duration, self.tr) + for fr in range(self.nframes): + # cardiac + prev_trigs = np.nonzero(card_trig < slice_times[fr])[0] + if prev_trigs.size == 0: + t1 = 0. + else: + t1 = card_trig[prev_trigs[-1]] + next_trigs = np.nonzero(card_trig > slice_times[fr])[0] + if next_trigs.size == 0: + t2 = self.nframes*self.tr + else: + t2 = card_trig[next_trigs[0]] + phi_cardiac = (slice_times[fr] - t1) * 2. * np.pi / (t2 - t1) + + # respiration: (based on amplitude histogram) + # find the closest index in resp waveform + iphys = np.min((np.max((0, np.round(slice_times[fr] / self.resp_dt))), drdt.size-1)) + amp = resp_wave[iphys] + dbins = np.abs(amp-bins) + thisBin = dbins.argmin() #closest resp_wave histo bin + numer = Hb[0:thisBin].sum().astype(float) + phi_resp = np.pi * np.sign(drdt[iphys]) * (numer / respfilt.size) + + # store + self.phases[sl,fr,:] = [phi_cardiac, phi_resp] + + # -------------------------------------------------------------- + # generate slice-specific retroicor regressors + # -------------------------------------------------------------- + REGRESSORS_RET = np.zeros((self.nframes, 8, nslc)) + for sl in range(nslc): + phi_c = self.phases[sl,:,0] + phi_r = self.phases[sl,:,1] + + # Fourier expansion of cardiac phase + c1_c = np.cos(phi_c) + s1_c = np.sin(phi_c) + c2_c = np.cos(2*phi_c) + s2_c = np.sin(2*phi_c) + + # Fourier expansion of respiratory phase + c1_r = np.cos(phi_r) + s1_r = np.sin(phi_r) + c2_r = np.cos(2*phi_r) + s2_r = np.sin(2*phi_r) + covs = np.array((c1_c, s1_c, c2_c, s2_c,c1_r, s1_r, c2_r, s2_r)) + + REGRESSORS_RET[:,:,sl] = covs.transpose() + + # -------------------------------------------------------------- + # generate slice-specific rvhrcor regressors + # -------------------------------------------------------------- + REGRESSORS_RVHR = np.zeros((self.nframes, 4, nslc)) + self.heart_rate = np.zeros((self.nframes, nslc)) + t = np.arange(0, 40-self.tr, self.tr) # 40-sec impulse response + for sl in range(nslc): + # times (for each frame) at which this slice was acquired (midpoint): + cur_slice_acq = (sl==self.slice_order).nonzero()[0][0] + slice_times = np.arange((self.tr/nslc)*(cur_slice_acq+0.5), self.scan_duration, self.tr) + # make slice RV*RRF regressor + rv = np.zeros(self.nframes) + for tp in range(self.nframes): + i1 = max(0, np.floor((slice_times[tp] - t_win) / self.resp_dt)) + i2 = min(resp_wave.size, np.floor((slice_times[tp] + t_win) / self.resp_dt)) + if i2 < i1: + raise NIMSPhysioError('Respiration data is shorter than the scan duration.') + rv[tp] = np.std(resp_wave[i1:i2]) + + # conv(rv, rrf) + rv -= rv.mean() + R = 0.6 * (t**2.1) * np.exp(-t/1.6) - 0.0023 * (t**3.54) * np.exp(-t/4.25) + R = R / R.max() + rv_rrf = np.convolve(rv, R)[0:rv.size] + # time derivative + rv_rrf_d = np.diff(rv_rrf) + rv_rrf_d = np.concatenate(([rv_rrf_d[0]], rv_rrf_d)) + + # make slice HR*CRF regressor + # Catie's original code: + if legacy_rvhr: + hr = np.zeros(self.nframes) + for tp in range(self.nframes): + inds = np.nonzero(np.logical_and(card_trig >= (slice_times[tp]-t_win), card_trig <= (slice_times[tp]+t_win)))[0] + if inds.size < 2: + if tp==0: + hr[tp] = 60 + else: + hr[tp] = hr[tp-1] + else: + hr[tp] = (inds[-1] - inds[0]) * 60. / (card_trig[inds[-1]] - card_trig[inds[0]]) # bpm + else: + # Bob's new version: + trig_time_delta = np.diff(card_trig) + hr_instant = 60. / trig_time_delta + hr_time = card_trig[:-1] + trig_time_delta / 2. + # Clean a bit. We interpolate below, so it's safe to just discard bad values. + keep_inds = np.logical_and(hr_instant>=hr_min, hr_instant<=hr_max) + hr_time = hr_time[keep_inds] + hr_instant = hr_instant[keep_inds] + if len(hr_instant) > 2: + hr = np.interp(slice_times, hr_time, hr_instant) + else: + hr = np.zeros(slice_times.shape) + + # conv(hr, crf) + self.heart_rate[:,sl] = hr + hr -= hr.mean() + H = 0.6 * (t**2.7) * np.exp(-t/1.6) - 16 * scipy.stats.norm.pdf(t, 12, 3) + H /= H.max() + hr_crf = np.convolve(hr,H)[0:hr.size] + # time derivative + hr_crf_d = np.diff(hr_crf) + hr_crf_d = np.concatenate(([hr_crf_d[0]], hr_crf_d)) + REGRESSORS_RVHR[:,:,sl] = np.array((rv_rrf, rv_rrf_d, hr_crf, hr_crf_d)).transpose() + + # -------------------------------------------------------------- + # final set of physio regressors + # -------------------------------------------------------------- + self.regressors = np.concatenate((REGRESSORS_RET, REGRESSORS_RVHR, self.heart_rate[:,np.newaxis,:]), axis=1) + for sl in range(nslc): + x = np.arange(self.regressors.shape[0]).transpose() + for reg in range(self.regressors.shape[1] - 1): + self.regressors[:,reg,sl] -= np.polyval(np.polyfit(x, self.regressors[:,reg,sl], 2), x) + + + def denoise_image(self, regressors): + """ + correct the image data: slice-wise + FIXME: NOT TESTED + """ + PCT_VAR_REDUCED = zeros(npix_x,npix_y,nslc) + nslc = d.shape[2] + self.nframes = d.shape[3] + npix_x = d.shape[0] + npix_y = d.shape[1] + d_corrected = np.zeros(d.shape) + for jj in range(nslc): + slice_data = np.squeeze(d[:,:,jj,:]) + Y_slice = slice_data.reshape((npix_x*npix_y, self.nframes)).transpose() #ntime x nvox + t = np.arange(self.nframes).transpose() + # design matrix + XX = np.array((t, t**2., REGRESSORS[:,:,jj])) + XX = np.concatenate((np.ones((XX.shape[0],1)), np.zscore(XX))) + Betas = np.pinv(XX) * Y_slice + Y_slice_corr = Y_slice - XX[:,3:-1] * Betas[3:-1,:] # keep + # calculate percent variance reduction + var_reduced = (np.var(Y_slice,0,1) - np.var(Y_slice_corr,0,1)) / np.var(Y_slice,0,1) + PCT_VAR_REDUCED[:,:,jj] = var_reduced.transpose().reshape((npix_x, npix_y)) + # fill corrected volume + V_slice_corr = Y_slice_corr.transpose() + for ii in range(self.nframes): + d_corrected[:,:,jj,ii] = V_slice_corr[:,ii].reshape((npix_x,npix_y)) + return d_corrected, PCT_VAR_REDUCED + + def write_regressors_legacy(self, filename): + self.compute_regressors() + # Write the array to disk + # Thanks to Joe Kington on StackOverflow (http://stackoverflow.com/questions/3685265/how-to-write-a-multidimensional-array-to-a-text-file) + with file(filename, 'w') as outfile: + # Write a little header behind comments + # Any line starting with "#" will be ignored by numpy.loadtxt + outfile.write('# slice_order = [ %s ]\n' % ','.join([str(d) for d in self.slice_order])) + outfile.write('# Full array shape: {0}\n'.format(self.regressors.shape)) + outfile.write('# time x regressor for each slice in the acquired volume\n') + outfile.write('# regressors: [ %s ]\n' % ','.join(self.regressor_names)) + for i in range(self.regressors.shape[2]): + outfile.write('# slice %d\n' % i) + # Format as left-justified columns 7 chars wide with 2 decimal places. + np.savetxt(outfile, self.regressors[:,:,i], fmt='%-7.6f') + + def _write_regressors(self, fileobj, header_notes=''): + # Write a little header behind comments + # Any line starting with "#" will be ignored by numpy.loadtxt + fileobj.write('#slice_order = [ %s ]\n' % ','.join([str(d) for d in self.slice_order])) + if header_notes: + fileobj.write('#' + header_notes + '\n') + # print out all the column headings: + nslices = len(self.slice_order) + fileobj.write('#' + ','.join([h[0]+h[1] for h in itertools.product(['slice'+str(s) for s in range(nslices)], self.regressor_names)]) + '\n') + new_shape = (self.regressors.shape[0], self.regressors.shape[1]*self.regressors.shape[2]) + np.savetxt(fileobj, self.regressors.reshape(new_shape, order='F'), fmt='%0.5f', delimiter=',') + #d = {key: value for (key, value) in sequence} + #d['slice_order'] = self.slice_order + #with file(filename, 'w') as outfile: + # json.dump(d, outfile) + + def write_regressors(self, filename): + """ Save the regressors in a simple csv format file. If the filename ends with .gz, the file will be gzipped. """ + self.compute_regressors() + if filename.endswith('.gz'): + with gzip.open(filename, 'wb') as fp: + self._write_regressors(fp) + else: + with file(filename, 'w') as fp: + self._write_regressors(fp) + + def write_raw_data(self, filename): + """ Save the raw physio data in a json file. If the filename ends with .gz, the file will be gzipped. """ + d = {'resp_time':self.resp_time.round(3).tolist(), 'resp_wave':self.resp_wave.astype(int).tolist(), 'resp_trig':self.resp_trig.round(3).tolist(), + 'card_time':self.card_time.round(3).tolist(), 'card_wave':self.card_wave.astype(int).tolist(), 'card_trig':self.card_trig.round(3).tolist()} + if filename.endswith('.gz'): + with gzip.open(filename, 'wb') as fp: + json.dump(d, fp) + else: + with file(filename, 'w') as fp: + json.dump(d, fp) + + @property + def regressor_names(self): + return ('c1_c', 's1_c', 'c2_c', 's2_c', 'c1_r', 's1_r', 'c2_r', 's2_r', 'rv_rrf', 'rv_rrf_d', 'hr_crf', 'hr_crf_d', 'hr') + + def is_valid(self, resp_freq_cutoff=1.0): + if self.nframes < self.min_number_of_frames or self.resp_wave==None or self.card_wave==None: + return False + # Heuristics to detect invalid data + # When not connected, the PPG output is very low amplitude noise + hr_instant = 60. / np.diff(self.card_trig) + proportion_good_hr = np.sum(np.logical_and(hr_instant>=30, hr_instant<=200)) / float(len(hr_instant)) + card_valid = self.card_wave.std() > self.min_card_std and proportion_good_hr>0.2 + # The respiration signal is heavily low-pass filtered, but valid data should still + # have much more low-frequency energy + freq = np.abs(np.fft.rfft(self.resp_wave)) + fs = 1. / (self.resp_dt*self.resp_wave.shape[0]) + f_bin = int(round(resp_freq_cutoff/fs)) + if f_bin self.min_resp_lfp + else: + resp_valid = False + return card_valid or resp_valid + + +class ArgumentParser(argparse.ArgumentParser): + + def __init__(self): + super(ArgumentParser, self).__init__() + self.description = """ Processes physio data to make them amenable to retroicor.""" + self.add_argument('physio_file', help='path to physio data') + self.add_argument('outbase', help='basename for output files') + self.add_argument('-n', '--nifti_file', help='path to corresponding nifti file') + # TODO: allow tr, nframes, and nslices to be entered as args if no nifti is provided + # TODO: allow user to specify custom slice orders + self.add_argument('-p', '--preprocess', action='store_true', help='Also save pre-processed physio data') + + +if __name__ == '__main__': + args = ArgumentParser().parse_args() + logging.basicConfig(level=logging.DEBUG) + if args.nifti_file: + ni = nibabel.load(args.nifti_file) + slice_order = np.argsort(ni.get_header().get_slice_times()) + phys = NIMSPhysio(args.physio_file, tr=ni.get_header().get_zooms()[3], nframes=ni.shape[3], slice_order=slice_order) + else: + log.warning('regressors will not be valid!') + phys = NIMSPhysio(args.physio_file) + if args.preprocess: + np.savetxt(args.outbase + '_resp.txt', phys.resp_wave) + np.savetxt(args.outbase + '_pulse.txt', phys.card_trig) + np.savetxt(args.outbase + '_slice.txt', phys.slice_order) + phys.write_regressors(args.outbase + '_reg.txt') diff --git a/nimsproc/processor.py b/nimsproc/processor.py index 4e9a52e..837aa40 100755 --- a/nimsproc/processor.py +++ b/nimsproc/processor.py @@ -19,6 +19,10 @@ import nimsutil import nimsdata +import nimsdata.medimg.nimsdicom +import nimsdata.medimg.nimspfile +import nimsphysio # for compatibility + from nimsgears.model import * log = logging.getLogger('processor') @@ -61,9 +65,9 @@ def run(self): if job: if isinstance(job.data_container, Epoch) and job.data_container.primary_dataset!=None: ds = job.data_container.primary_dataset - if ds.filetype == nimsdata.nimsdicom.NIMSDicom.filetype: + if ds.filetype == nimsdata.medimg.nimsdicom.NIMSDicom.filetype: pipeline_class = DicomPipeline - elif ds.filetype == nimsdata.nimsraw.NIMSPFile.filetype: + elif ds.filetype == nimsdata.medimg.nimspfile.NIMSPFile.filetype: pipeline_class = PFilePipeline pipeline = pipeline_class(job, self.nims_path, self.physio_path, self.tempdir, self.max_recon_jobs) @@ -137,6 +141,8 @@ def clean(self, data_container, kind): @abc.abstractmethod def find(self): + if self.physio_path is None: return + self.clean(self.job.data_container, u'peripheral') transaction.commit() DBSession.add(self.job) @@ -146,7 +152,7 @@ def find(self): if dc.physio_recorded: physio_files = nimsutil.find_ge_physio(self.physio_path, dc.timestamp+dc.prescribed_duration, dc.psd.encode('utf-8')) if physio_files: - physio = nimsdata.nimsphysio.NIMSPhysio(physio_files, dc.tr, dc.num_timepoints) + physio = nimsphysio.NIMSPhysio(physio_files, dc.tr, dc.num_timepoints) if physio.is_valid(): self.job.activity = u'valid physio found' log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) @@ -171,7 +177,7 @@ def find(self): try: reg_filename = '%s_physio_regressors.csv.gz' % self.job.data_container.name physio.write_regressors(os.path.join(self.nims_path, dataset.relpath, reg_filename)) - except nimsdata.nimsphysio.NIMSPhysioError: + except nimsphysio.NIMSPhysioError: self.job.activity = u'error generating regressors from physio data' log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) else: @@ -193,7 +199,6 @@ def process(self): self.clean(self.job.data_container, u'derived') self.clean(self.job.data_container, u'web') self.clean(self.job.data_container, u'qa') - self.job.data_container.qa_status = u'rerun' self.job.activity = u'generating NIfTI / running recon' log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) transaction.commit() @@ -212,45 +217,62 @@ def process(self): with nimsutil.TempDir(dir=self.tempdir) as outputdir: outbase = os.path.join(outputdir, ds.container.name) dcm_tgz = os.path.join(self.nims_path, ds.relpath, os.listdir(os.path.join(self.nims_path, ds.relpath))[0]) - dcm_acq = nimsdata.nimsdicom.NIMSDicom(dcm_tgz) - conv_type, conv_file = dcm_acq.convert(outbase) - - if conv_type: - outputdir_list = os.listdir(outputdir) - self.job.activity = (u'generated %s' % (', '.join([f for f in outputdir_list])))[:255] - log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) - conv_ds = Dataset.at_path(self.nims_path, unicode(conv_type)) - DBSession.add(self.job) - DBSession.add(self.job.data_container) + dcm_acq = nimsdata.parse(dcm_tgz, filetype='dicom', load_data=True) - conv_ds.kind = u'derived' - conv_ds.container = self.job.data_container - filenames = [] - for f in outputdir_list: - filenames.append(f) - shutil.copy2(os.path.join(outputdir, f), os.path.join(self.nims_path, conv_ds.relpath)) - conv_ds.filenames = filenames - transaction.commit() - - if conv_type == 'nifti': - pyramid_ds = Dataset.at_path(self.nims_path, u'img_pyr') - DBSession.add(self.job) - DBSession.add(self.job.data_container) - nims_montage = nimsdata.nimsmontage.generate_montage(conv_file) - nims_montage.write_sqlite_pyramid(os.path.join(self.nims_path, pyramid_ds.relpath, self.job.data_container.name+'.pyrdb')) - self.job.activity = u'image pyramid generated' - log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) - pyramid_ds.kind = u'web' - pyramid_ds.container = self.job.data_container - pyramid_ds.filenames = os.listdir(os.path.join(self.nims_path, pyramid_ds.relpath)) - transaction.commit() + if dcm_acq.data is None: + self.job_activity = (u'dicom %s has no data' % dcm_tgz) + log.warn('%d %s %s' % (self.job.id, self.job, self.job.activity)) + else: + if dcm_acq.is_screenshot: + conv_files = nimsdata.write(dcm_acq, dcm_acq.data, outbase, filetype='png') + if conv_files: + outputdir_list = os.listdir(outputdir) + self.job.activity = (u'generated %s' % (', '.join([f for f in outputdir_list])))[:255] + log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) + conv_ds = Dataset.at_path(self.nims_path, u'bitmap') + DBSession.add(self.job) + DBSession.add(self.job.data_container) + conv_ds.kind = u'derived' + conv_ds.container = self.job.data_container + filenames = [] + for f in outputdir_list: + filenames.append(f) + shutil.copy2(os.path.join(outputdir, f), os.path.join(self.nims_path, conv_ds.relpath)) + conv_ds.filenames = filenames + transaction.commit() + else: + # to enable legacy NIMS v1 re-orientation, pass voxel_order='LPS' to nimsdata.write + # conv_files = nimsdata.write(dcm_acq, dcm_acq.data, outbase, filetype='nifti', voxel_order='LPS') + conv_files = nimsdata.write(dcm_acq, dcm_acq.data, outbase, filetype='nifti', voxel_order='LPS') + if conv_files: + # if nifti was successfully created + outputdir_list = os.listdir(outputdir) + self.job.activity = (u'generated %s' % (', '.join([f for f in outputdir_list])))[:255] + log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) + conv_ds = Dataset.at_path(self.nims_path, u'nifti') + DBSession.add(self.job) + DBSession.add(self.job.data_container) + conv_ds.kind = u'derived' + conv_ds.container = self.job.data_container + filenames = [] + for f in outputdir_list: + filenames.append(f) + shutil.copy2(os.path.join(outputdir, f), os.path.join(self.nims_path, conv_ds.relpath)) + conv_ds.filenames = filenames + transaction.commit() + pyramid_ds = Dataset.at_path(self.nims_path, u'img_pyr') + DBSession.add(self.job) + DBSession.add(self.job.data_container) + outpath = os.path.join(self.nims_path, pyramid_ds.relpath, self.job.data_container.name) + nims_montage = nimsdata.write(dcm_acq, dcm_acq.data, outpath, filetype='montage') + self.job.activity = u'image pyramid generated' + log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) + pyramid_ds.kind = u'web' + pyramid_ds.container = self.job.data_container + pyramid_ds.filenames = os.listdir(os.path.join(self.nims_path, pyramid_ds.relpath)) + transaction.commit() - if conv_type == 'bitmap': - # Hack to make sure screen-saves go to the end of the time-sorted list - DBSession.add(self.job) - DBSession.add(self.job.data_container) - self.job.data_container.timestamp = datetime.datetime.combine(date=self.job.data_container.timestamp.date(), time=datetime.time(23,59, 59)) - transaction.commit() + DBSession.add(self.job) DBSession.add(self.job) @@ -271,8 +293,8 @@ def process(self): # FIXME: if there are >1 pfiles, what to do? Try them all? for pfile in sorted(pfiles): try: - pf = nimsdata.nimsraw.NIMSPFile(os.path.join(self.nims_path, ds.relpath, pfile)) - except nimsdata.nimsraw.NIMSPFileError: + pf = nimsdata.parse(os.path.join(self.nims_path, ds.relpath, pfile), filetype='pfile', load_data=True) + except nimsdata.medimg.nimspfile.NIMSPFileError: pf = None else: break @@ -311,8 +333,8 @@ def process(self): pyramid_ds = Dataset.at_path(self.nims_path, u'img_pyr') DBSession.add(self.job) DBSession.add(self.job.data_container) - nims_montage = nimsdata.nimsmontage.generate_montage(conv_file) - nims_montage.write_sqlite_pyramid(os.path.join(self.nims_path, pyramid_ds.relpath, self.job.data_container.name+'.pyrdb')) + outpath = os.path.join(self.nims_path, pyramid_ds.relpath, self.job.data_container.name+'.pyrdb') + nims_montage = nimsdata.write(pf, pf.data, outpath, filetype='montage') self.job.activity = u'image pyramid generated' log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) pyramid_ds.kind = u'web' @@ -329,7 +351,7 @@ def __init__(self): super(ArgumentParser, self).__init__() self.add_argument('db_uri', metavar='URI', help='database URI') self.add_argument('nims_path', metavar='DATA_PATH', help='data location') - self.add_argument('physio_path', metavar='PHYSIO_PATH', help='path to physio data') + self.add_argument('physio_path', metavar='PHYSIO_PATH', nargs='?', help='path to physio data') self.add_argument('-T', '--task', help='find|proc (default is all)') self.add_argument('-e', '--filter', default=[], action='append', help='sqlalchemy filter expression') self.add_argument('-j', '--jobs', type=int, default=1, help='maximum number of concurrent threads') @@ -345,7 +367,6 @@ def __init__(self): if __name__ == '__main__': # workaround for http://bugs.python.org/issue7980 - import datetime # used in nimsutil datetime.datetime.strptime('0', '%S') args = ArgumentParser().parse_args() diff --git a/nimsproc/scheduler.py b/nimsproc/scheduler.py index ffce2d2..0148b98 100755 --- a/nimsproc/scheduler.py +++ b/nimsproc/scheduler.py @@ -62,7 +62,7 @@ def run(self): log.info(u'Compressing %s %s' % (dc, ds.filetype)) dataset_path = os.path.join(self.nims_path, ds.relpath) if ds.filetype == nimsdata.nimsdicom.NIMSDicom.filetype: - arcdir = '%d_%d_%d_dicoms' % (dc.session.exam, dc.series, dc.acq) + arcdir = '%s_%s_%s_dicoms' % (dc.session.exam, dc.series, dc.acq) arcdir_path = os.path.join(dataset_path, arcdir) os.mkdir(arcdir_path) for filename in [f for f in os.listdir(dataset_path) if not f.startswith(arcdir)]: diff --git a/nimsproc/sorter.py b/nimsproc/sorter.py index 1af6fc2..31e4eea 100755 --- a/nimsproc/sorter.py +++ b/nimsproc/sorter.py @@ -1,127 +1,99 @@ #!/usr/bin/env python # -# @author: Reno Bowen -# Gunnar Schaefer +# @author: Gunnar Schaefer, Reno Bowen import os import time import shutil -import signal import logging -import argparse import datetime - -import sqlalchemy import transaction -import nimsutil import nimsdata -from nimsgears import model +import nimsgears.model log = logging.getLogger('sorter') +import warnings +warnings.filterwarnings('error') class Sorter(object): - def __init__(self, db_uri, sort_path, preserve_path, nims_path, dir_mode, sleep_time): + def __init__(self, stage_path, preserve_path, nims_path, sleep_time): super(Sorter, self).__init__() - self.sort_path = nimsutil.make_joined_path(sort_path) - self.preserve_path = nimsutil.make_joined_path(preserve_path) if preserve_path else None - self.nims_path = nimsutil.make_joined_path(nims_path) - self.dir_mode = dir_mode + self.stage_path = stage_path + self.preserve_path = preserve_path + self.nims_path = nims_path self.sleep_time = sleep_time self.alive = True - model.init_model(sqlalchemy.create_engine(db_uri)) def halt(self): self.alive = False def run(self): - """Insert files, if valid, into database and associated filesystem.""" while self.alive: - stage_contents = [os.path.join(self.sort_path, sc) for sc in os.listdir(self.sort_path) if not sc.startswith('.')] - stage_contents = [sc for sc in stage_contents if os.path.isdir(sc)] # ignore toplevel files - if stage_contents: - sort_path = min(stage_contents, key=os.path.getmtime) # oldest first - log.info('Sorting %s' % os.path.basename(sort_path)) - for dirpath, dirnames, filenames in os.walk(sort_path, topdown=False): - aux_paths = {} - for aux_file in filter(lambda fn: fn.startswith('_'), filenames): - main_file = aux_file.lstrip('_').rpartition('_')[0] - aux_paths[main_file] = aux_paths.get(main_file, []) + [os.path.join(dirpath, aux_file)] - filenames = filter(lambda fn: not fn.startswith('_'), filenames) - if self.dir_mode and filenames and not dirnames: # at lowest sub-directory - self.sort_directory(dirpath, filenames, aux_paths) + stage_items = [os.path.join(self.stage_path, si) for si in os.listdir(self.stage_path) if not si.startswith('.')] # ignore dot files + if stage_items: + for stage_item in sorted(stage_items, key=os.path.getmtime): # oldest first + if os.path.islink(stage_item): + os.remove(stage_item) + elif 'gephysio' in os.path.basename(stage_item): # HACK !!!!!!!!!!!!!!!! NIMS 1.0 cannot sort gephysio + os.remove(stage_item) + elif os.path.isfile(stage_item): + self.sort(stage_item) else: - self.sort_files(dirpath, filenames, aux_paths) - log.info('Sorted %s' % os.path.basename(sort_path)) + for subpath in [os.path.join(dirpath, fn) for (dirpath, _, filenames) in os.walk(stage_item) for fn in filenames]: + if not os.path.islink(subpath) and not subpath.startswith('.'): + self.sort(subpath) + shutil.rmtree(stage_item) else: - log.debug('Waiting for work...') + log.debug('Waiting for data...') time.sleep(self.sleep_time) - def sort_files(self, dirpath, filenames, aux_paths): - for filepath, filename in [(os.path.join(dirpath, fn), fn) for fn in filenames]: - log.debug('Sorting %s' % filename) - try: - mrfile = nimsdata.parse(filepath) - except nimsdata.NIMSDataError: - if self.preserve_path: - preserve_path = nimsutil.make_joined_path(self.preserve_path, os.path.dirname(os.path.relpath(filepath, self.sort_path))) - shutil.move(filepath, os.path.join(preserve_path, filename)) - else: - dataset = model.Dataset.from_mrfile(mrfile, self.nims_path) - new_filenames = [filename] - shutil.move(filepath, os.path.join(self.nims_path, dataset.relpath, filename)) - for aux_path in aux_paths.get(os.path.splitext(filename)[0] if dataset.compressed else filename, []): - new_filenames.append(os.path.basename(aux_path)) - shutil.move(aux_path, os.path.join(self.nims_path, dataset.relpath, os.path.basename(aux_path))) - dataset.filenames = set(dataset.filenames + new_filenames) - dataset.updatetime = datetime.datetime.now() - dataset.untrash() - transaction.commit() - shutil.rmtree(dirpath) - - def sort_directory(self, dirpath, filenames, aux_paths): - log.debug('Sorting %s in directory mode' % os.path.basename(dirpath)) + def sort(self, filepath): + filename = os.path.basename(filepath) try: - mrfile = nimsdata.parse(os.path.join(dirpath, filenames[0])) + log.info('Parsing %s' % filename) + mrfile = nimsdata.parse(filepath) except nimsdata.NIMSDataError: + log.warning('Cannot sort %s' % filename) if self.preserve_path: - preserve_path = nimsutil.make_joined_path(self.preserve_path, os.path.relpath(dirpath, self.sort_path)) - for filename in os.listdir(dirpath): - shutil.move(os.path.join(dirpath, filename), os.path.join(preserve_path, filename)) + preserve_path = os.path.join(self.preserve_path, os.path.relpath(filepath, self.stage_path).replace('/', '_')) + log.debug('Preserving %s' % filename) + shutil.move(filepath, preserve_path) else: - dataset = model.Dataset.from_mrfile(mrfile, self.nims_path) - for filepath, aux_paths in [(os.path.join(dirpath, filename), aux_paths.get(filename, [])) for filename in filenames]: - shutil.move(filepath, os.path.join(self.nims_path, dataset.relpath, os.path.basename(filepath))) - for aux_path in aux_paths: - shutil.move(aux_path, os.path.join(self.nims_path, dataset.relpath, os.path.basename(aux_path))) + log.info('Sorting %s' % filename) + filename = '_'.join(filename.rsplit('_')[-4:]) + dataset = nimsgears.model.Dataset.from_mrfile(mrfile, self.nims_path) + shutil.move(filepath, os.path.join(self.nims_path, dataset.relpath, filename)) + dataset.filenames = [filename] dataset.updatetime = datetime.datetime.now() dataset.untrash() transaction.commit() - shutil.rmtree(dirpath) - - -class ArgumentParser(argparse.ArgumentParser): - - def __init__(self): - super(ArgumentParser, self).__init__() - self.add_argument('db_uri', help='database URI') - self.add_argument('sort_path', help='path to staging area') - self.add_argument('nims_path', help='data destination') - self.add_argument('-d', '--dirmode', action='store_true', help='assume files are pre-sorted by directory') - self.add_argument('-t', '--toplevel', action='store_true', help='handle toplevel files') - self.add_argument('-p', '--preserve_path', help='preserve unsortable files here') - self.add_argument('-s', '--sleeptime', type=int, default=10, help='time to sleep before checking for new files') - self.add_argument('-f', '--logfile', help='path to log file') - self.add_argument('-l', '--loglevel', default='info', help='log level (default: info)') - self.add_argument('-q', '--quiet', action='store_true', default=False, help='disable console logging') if __name__ == '__main__': - args = ArgumentParser().parse_args() + import signal + import argparse + import sqlalchemy + + import nimsutil + + arg_parser = argparse.ArgumentParser() + arg_parser.add_argument('db_uri', help='database URI') + arg_parser.add_argument('stage_path', help='path to staging area') + arg_parser.add_argument('nims_path', help='data destination') + arg_parser.add_argument('-t', '--toplevel', action='store_true', help='handle toplevel files') + arg_parser.add_argument('-p', '--preserve_path', help='preserve unsortable files here') + arg_parser.add_argument('-s', '--sleeptime', type=int, default=10, help='time to sleep before checking for new files') + arg_parser.add_argument('-f', '--logfile', help='path to log file') + arg_parser.add_argument('-l', '--loglevel', default='info', help='log level (default: info)') + arg_parser.add_argument('-q', '--quiet', action='store_true', default=False, help='disable console logging') + args = arg_parser.parse_args() + nimsutil.configure_log(args.logfile, not args.quiet, args.loglevel) - sorter = Sorter(args.db_uri, args.sort_path, args.preserve_path, args.nims_path, args.dirmode, args.sleeptime) + nimsgears.model.init_model(sqlalchemy.create_engine(args.db_uri)) + sorter = Sorter(args.stage_path, args.preserve_path, args.nims_path, args.sleeptime) def term_handler(signum, stack): sorter.halt() diff --git a/nimsutil/nimsutil.py b/nimsutil/nimsutil.py index b0c44d9..18e3765 100644 --- a/nimsutil/nimsutil.py +++ b/nimsutil/nimsutil.py @@ -7,7 +7,6 @@ import re import gzip import shutil -import subprocess import string import tarfile import difflib @@ -84,6 +83,21 @@ def parse_patient_id(patient_id, known_ids): exp_id = patient_id return (unicode(subj_code), unicode(lab_id), unicode(exp_id)) +def parse_patient_id__(subj_code, lab_id, exp_id, known_ids): + """ + Accept a NIMS-formatted patient id and return lab id and experiment id. + + We use fuzzy matching to find the best matching known lab id. If we can't + do so with high confidence, the lab id is set to 'unknown'. + """ + lab_id_matches = difflib.get_close_matches(lab_id, known_ids, cutoff=0.8) + if len(lab_id_matches) == 1: + lab_id = lab_id_matches[0] + else: + exp_id = lab_id + '/' + exp_id + lab_id = 'unknown' + return (unicode(subj_code), unicode(lab_id), unicode(exp_id)) + def clean_string(string): """ @@ -158,7 +172,7 @@ def ldap_query(uid): def find_ge_physio(data_path, timestamp, psd_name): physio_files = os.listdir(data_path) if not physio_files: - raise Exception(msg='physio files unavailable') + raise Exception('physio files unavailable') physio_dict = {} leadtime = datetime.timedelta(days=1) @@ -195,18 +209,12 @@ def hrsize(size): def gzip_inplace(path, mode=None): gzpath = path + '.gz' - # The following with pigz is ~8x faster than the python code - if os.path.isfile('/usr/bin/pigz'): - subprocess.call('pigz -4 -p4 %s' % path, shell=True) - elif os.path.isfile('/usr/bin/gzip') or os.path.isfile('/bin/gzip'): - subprocess.call('gzip -4 %s' % path, shell=True) - else: - with gzip.open(gzpath, 'wb', compresslevel=4) as gzfile: - with open(path) as pathfile: - gzfile.writelines(pathfile) - shutil.copystat(path, gzpath) - os.remove(path) + with gzip.open(gzpath, 'wb', compresslevel=4) as gzfile: + with open(path) as pathfile: + gzfile.writelines(pathfile) + shutil.copystat(path, gzpath) if mode: os.chmod(gzpath, mode) + os.remove(path) def redigest(path): From efe5f3a0751505ad87d3c7f92fe34f0b13825b24 Mon Sep 17 00:00:00 2001 From: "Kevin S. Hahn" Date: Wed, 5 Nov 2014 17:27:33 -0800 Subject: [PATCH 2/5] adapt sorter and processors for tgz datasets - pfiles.tgz are unpacked for parsing, and sorting - updates db models to include num_mux_cal_cycles to allow processor to identify candidate muxepi aux files from db --- nimsdata | 2 +- nimsgears/model/nims.py | 8 +- nimsproc/processor.py | 278 ++++++++++++++++++++++++++++------------ nimsproc/sorter.py | 28 +++- nimsproc/tempdir.py | 94 ++++++++++++++ 5 files changed, 325 insertions(+), 85 deletions(-) create mode 100644 nimsproc/tempdir.py diff --git a/nimsdata b/nimsdata index 687554c..e7536f5 160000 --- a/nimsdata +++ b/nimsdata @@ -1 +1 @@ -Subproject commit 687554c7df81faafb68ea963cdbd8da206956fe0 +Subproject commit e7536f5ac9550e9fc55f0a737abeed69210ca4cb diff --git a/nimsgears/model/nims.py b/nimsgears/model/nims.py index ad2c6b9..84f06cd 100644 --- a/nimsgears/model/nims.py +++ b/nimsgears/model/nims.py @@ -509,7 +509,7 @@ def __unicode__(self): @classmethod def from_mrfile(cls, mrfile): - subj_code, group_name, exp_name = nimsutil.parse_patient_id__(mrfile.subj_code, mrfile.group_name, mrfile.project_name, ResearchGroup.all_ids()) + subj_code, group_name, exp_name = nimsutil.parse_patient_id(mrfile.patient_id, ResearchGroup.all_ids()) query = cls.query.join(Experiment, cls.experiment).filter(Experiment.name == exp_name) query = query.join(ResearchGroup, Experiment.owner).filter(ResearchGroup.gid == group_name) if subj_code: @@ -605,7 +605,7 @@ def from_mrfile(cls, mrfile): # central authority and/or querying the schedule database. But for now, # just let the operator be None if the user isn't already in the system. operator = User.by_uid(unicode(mrfile.operator), create=False) - session = Session(uid=uid, exam=mrfile.exam_no if isinstance(mrfile.exam_no, int) else 0, subject=subject, operator=operator) + session = Session(uid=uid, exam=int(mrfile.exam_no) if isinstance(mrfile.exam_no, (unicode, int)) else 0, subject=subject, operator=operator) return session @classmethod @@ -696,6 +696,7 @@ class Epoch(DataContainer): phase_encode_undersample = Field(Float) slice_encode_undersample = Field(Float) acquisition_matrix = Field(Unicode(255)) + num_mux_cal_cycle = Field(Integer) session = ManyToOne('Session') @@ -727,7 +728,7 @@ def from_mrfile(cls, mrfile): flip_angle = mrfile.flip_angle, pixel_bandwidth = mrfile.pixel_bandwidth, num_slices = mrfile.num_slices, - num_timepoints = mrfile.num_timepoints, + num_timepoints = mrfile.num_timepoints or 1, num_averages = mrfile.num_averages, num_echos = mrfile.num_echos, receive_coil_name = unicode(mrfile.receive_coil_name), @@ -744,6 +745,7 @@ def from_mrfile(cls, mrfile): phase_encode_undersample = mrfile.phase_encode_undersample, slice_encode_undersample = mrfile.slice_encode_undersample, acquisition_matrix = unicode(str(mrfile.acquisition_matrix)), + num_mux_cal_cycle = mrfile.num_mux_cal_cycle if mrfile.filetype == u'pfile' else None, # hack for pfile qa_status = u'pending', # to unpack fov, mm_per_vox, and acquisition_matrix: np.fromstring(str(mm)[1:-1],sep=',') ) diff --git a/nimsproc/processor.py b/nimsproc/processor.py index 837aa40..ec5d0c9 100755 --- a/nimsproc/processor.py +++ b/nimsproc/processor.py @@ -5,6 +5,7 @@ import os import abc +import glob import time import shutil import signal @@ -13,6 +14,7 @@ import argparse import datetime import threading +import numpy as np import sqlalchemy import transaction @@ -21,7 +23,7 @@ import nimsdata import nimsdata.medimg.nimsdicom import nimsdata.medimg.nimspfile -import nimsphysio # for compatibility +import nimsphysio from nimsgears.model import * @@ -112,21 +114,16 @@ def __init__(self, job, nims_path, physio_path, tempdir, max_recon_jobs): def run(self): DBSession.add(self.job) - self.job.activity = u'started' + self.job.activity = u'started %s' % self.job.data_container.primary_dataset.filetype log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) transaction.commit() DBSession.add(self.job) try: if self.job.task == u'find&proc': - self.find() - self.process() - elif self.job.task == u'find': - self.find() - elif self.job.task == u'proc': - self.process() + self.process() # process now includes find. except Exception as ex: self.job.status = u'failed' - self.job.activity = u'failed: %s' % ex + self.job.activity = (u'failed: %s' % ex)[:255] log.warning(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) else: self.job.status = u'done' @@ -140,26 +137,43 @@ def clean(self, data_container, kind): ds.delete() @abc.abstractmethod - def find(self): - if self.physio_path is None: return - + def find(self, slice_order, num_slices): + """ + Locate physio and generate physio regressors. + + Find is called from within each pipeline's process() method after the file's slice_order + and num_slices attributes have been set, but before preparing the data to be written out. + This will use the num_slice and slice_order to create an array of the slice numbers in the + sequence they were acquired. Nimsphysio will use both the current data containers metadata, + like timestamp and duration, and metadata obtained by parsing the primary dataset file to + determine if physio is valid. + + This method should never raise any exceptions. + + Parameters + ---------- + slice_order : int + integer that corresponds to the appropriate NIFTI slice order code. 0 for unknown. + num_slices : int + number of slices + + """ self.clean(self.job.data_container, u'peripheral') transaction.commit() DBSession.add(self.job) + if self.physio_path is None: return # can't search w/o phys path + if not slice_order or not num_slices: return # need both slice order AND num_slices to create regressors + if self.job.data_container.scanner_name == 'IRC MRC35068': return # hack to ignore Davis files + dc = self.job.data_container - ds = self.job.data_container.primary_dataset if dc.physio_recorded: physio_files = nimsutil.find_ge_physio(self.physio_path, dc.timestamp+dc.prescribed_duration, dc.psd.encode('utf-8')) if physio_files: - physio = nimsphysio.NIMSPhysio(physio_files, dc.tr, dc.num_timepoints) + physio = nimsphysio.NIMSPhysio(physio_files, dc.tr, dc.num_timepoints, nimsdata.medimg.medimg.get_slice_order(slice_order, num_slices)) if physio.is_valid(): self.job.activity = u'valid physio found' log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) - # Computing the slice-order can be expensive, so we didn't do it when we instantiated. - # But now that we know physio is valid, we need to do it. - ni = nimsdata.parse(os.path.join(self.nims_path, ds.primary_file_relpath)) - physio.slice_order = ni.get_slice_order() # TODO: should probably write a set method dataset = Dataset.at_path(self.nims_path, u'physio') DBSession.add(self.job) DBSession.add(self.job.data_container) @@ -177,6 +191,8 @@ def find(self): try: reg_filename = '%s_physio_regressors.csv.gz' % self.job.data_container.name physio.write_regressors(os.path.join(self.nims_path, dataset.relpath, reg_filename)) + self.job.activity = u'physio regressors %s written' % reg_filename + log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) except nimsphysio.NIMSPhysioError: self.job.activity = u'error generating regressors from physio data' log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) @@ -199,7 +215,7 @@ def process(self): self.clean(self.job.data_container, u'derived') self.clean(self.job.data_container, u'web') self.clean(self.job.data_container, u'qa') - self.job.activity = u'generating NIfTI / running recon' + self.job.activity = u'reading data / preparing to run recon' log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) transaction.commit() DBSession.add(self.job) @@ -207,21 +223,65 @@ def process(self): class DicomPipeline(Pipeline): - def find(self): - return super(DicomPipeline, self).find() + def find(self, slice_order, num_slices): + return super(DicomPipeline, self).find(slice_order, num_slices) def process(self): + """" + Convert a dicom file. + + Parse a dicom file and load the data. If an error occurs during parsing, no exception gets raised, + instead the exception is saved into dataset.failure_reason. This is to allow find() to attempt to + locate physio, even if the input dicom files could not be loaded. After the locating physio has been + attempted, the DicomPipeline will attempt to convert the dataset into various output files. + + Parameters + --------- + None : NoneType + The DicomPipeline works has a job and dataset assigned to it. No additional parameters are required. + + """ super(DicomPipeline, self).process() ds = self.job.data_container.primary_dataset with nimsutil.TempDir(dir=self.tempdir) as outputdir: outbase = os.path.join(outputdir, ds.container.name) dcm_tgz = os.path.join(self.nims_path, ds.relpath, os.listdir(os.path.join(self.nims_path, ds.relpath))[0]) - dcm_acq = nimsdata.parse(dcm_tgz, filetype='dicom', load_data=True) + dcm_acq = nimsdata.parse(dcm_tgz, filetype='dicom', load_data=True) # store exception for later... + + # if physio was not found, wait 30 seconds and search again. + # this should only run when the job activity is u'no physio files found' + # if physio not recorded, or physio invalid, don't try again + try: + self.find(dcm_acq.slice_order, dcm_acq.num_slices) + except Exception as e: + # this catches some of the non-image scans that do not have + # dcm_acq.slice_order and/or dcm_acq.num_slices + log.info(str(e)) # do we need this logging message? + if self.job.activity == u'no physio files found': + self.job.activity = u'no physio files found; searching again in 30 seconds' + log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) + time.sleep(30) + try: + self.find(dcm_acq.slice_order, dcm_acq.num_slices) + except Exception as e: + # this catches some of the non-image scans that do not have + # dcm_acq.slice_order and/or dcm_acq.num_slices + log.info(str(e)) # do we need this logging message? + + if dcm_acq.failure_reason: # implies dcm_acq.data = None + # if dcm_acq.failure_reason is set, job has failed + # raising an error should cause job.status should to end up 'failed' + self.job.activity = (u'load dicom data failed; %s' % str(dcm_acq.failure_reason)) + transaction.commit() + DBSession.add(self.job) + raise dcm_acq.failure_reason - if dcm_acq.data is None: - self.job_activity = (u'dicom %s has no data' % dcm_tgz) - log.warn('%d %s %s' % (self.job.id, self.job, self.job.activity)) + if dcm_acq.is_non_image: # implies dcm_acq.data = None + # non-image is an "expected" outcome, job has succeeded + # no error should be raised, job status should end up 'done' + self.job.activity = (u'dicom %s is a non-image type' % dcm_tgz) + transaction.commit() else: if dcm_acq.is_screenshot: conv_files = nimsdata.write(dcm_acq, dcm_acq.data, outbase, filetype='png') @@ -241,9 +301,7 @@ def process(self): conv_ds.filenames = filenames transaction.commit() else: - # to enable legacy NIMS v1 re-orientation, pass voxel_order='LPS' to nimsdata.write - # conv_files = nimsdata.write(dcm_acq, dcm_acq.data, outbase, filetype='nifti', voxel_order='LPS') - conv_files = nimsdata.write(dcm_acq, dcm_acq.data, outbase, filetype='nifti', voxel_order='LPS') + conv_files = nimsdata.write(dcm_acq, dcm_acq.data, outbase, filetype='nifti') if conv_files: # if nifti was successfully created outputdir_list = os.listdir(outputdir) @@ -264,8 +322,9 @@ def process(self): DBSession.add(self.job) DBSession.add(self.job.data_container) outpath = os.path.join(self.nims_path, pyramid_ds.relpath, self.job.data_container.name) - nims_montage = nimsdata.write(dcm_acq, dcm_acq.data, outpath, filetype='montage') - self.job.activity = u'image pyramid generated' + voxel_order = None if dcm_acq.is_localizer else 'LPS' + nims_montage = nimsdata.write(dcm_acq, dcm_acq.data, outpath, filetype='montage', voxel_order=voxel_order) + self.job.activity = (u'generated %s' % (', '.join([os.path.basename(f) for f in nims_montage])))[:255] log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) pyramid_ds.kind = u'web' pyramid_ds.container = self.job.data_container @@ -279,68 +338,126 @@ def process(self): class PFilePipeline(Pipeline): - def find(self): - return super(PFilePipeline, self).find() + def find(self, slice_order, num_slices): + return super(PFilePipeline, self).find(slice_order, num_slices) def process(self): + """" + Convert a pfile. + + Extracts a pfile.tgz into a temporary directory and full_parses the pfile.7. If an error occurs + during parsing, no exception gets raised, instead the exception is saved into dataset.failure_reason. + This is to allow find() to attempt to locate physio, even if the input pfile not be loaded. After + locating physio has been attempted, the PFilePipeline will attempt to convert the dataset into + a nifti, and then a montage. + + Parameters + --------- + None : NoneType + The PFilePipeline works has a job and dataset assigned to it. No additional parameters are required. + + """ super(PFilePipeline, self).process() ds = self.job.data_container.primary_dataset + with nimsutil.TempDir(dir=self.tempdir) as outputdir: - pf = None - pfiles = [f for f in os.listdir(os.path.join(self.nims_path, ds.relpath)) if not f.startswith('_') and 'refscan' not in f] - # Try them in numerical order. - # FIXME: if there are >1 pfiles, what to do? Try them all? - for pfile in sorted(pfiles): - try: - pf = nimsdata.parse(os.path.join(self.nims_path, ds.relpath, pfile), filetype='pfile', load_data=True) - except nimsdata.medimg.nimspfile.NIMSPFileError: - pf = None - else: - break - - if pf is not None: - criteria = pf.prep_convert() - if criteria != None: - q = Epoch.query.filter(Epoch.session==self.job.data_container.session).filter(Epoch.trashtime == None) - for fieldname,value in criteria.iteritems(): - q = q.filter(getattr(Epoch,fieldname)==unicode(value)) - epochs = [e for e in q.all() if e!=self.job.data_container] - aux_files = [os.path.join(self.nims_path, e.primary_dataset.relpath, f) for e in epochs for f in e.primary_dataset.filenames if f.startswith('P')] - self.job.activity = (u'Found %d aux files: %s' % (len(aux_files), (', '.join([os.path.basename(f) for f in aux_files]))))[:255] - log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) + log.debug('unpacking and full parsing') + outbase = os.path.join(outputdir, ds.container.name) + pfile_tgz = os.path.join(self.nims_path, ds.relpath, os.listdir(os.path.join(self.nims_path, ds.relpath))[0]) + with tarfile.open(pfile_tgz) as archive: + archive.extractall(path=outputdir) + temp_datadir = os.path.join(outputdir, os.listdir(outputdir)[0]) + temp_pfile = os.path.join(temp_datadir, glob.glob(os.path.join(temp_datadir, 'P?????.7'))[0]) + + # perform full parse, which doesn't attempt to load the data + pf = nimsdata.parse(temp_pfile, filetype='pfile', ignore_json=True, load_data=False, full_parse=True, tempdir=outputdir, num_jobs=self.max_recon_jobs) + + try: + self.find(pf.slice_order, pf.num_slices) + except Exception as exc: # XXX, specific exceptions + pass + + # MUX HACK, identify a group of aux candidates and determine the single best aux_file. + # Certain mux_epi scans will return a dictionary of parameters to use as query filters to + # help locate an aux_file that contains necessary calibration scans. + criteria = pf.prep_convert() + aux_file = None + if criteria is not None: # if criteria: this is definitely mux of some sort + log.debug('pfile aux criteria %s' % str(criteria.keys())) + q = Epoch.query.filter(Epoch.session==self.job.data_container.session).filter(Epoch.trashtime==None) + for fieldname, value in criteria.iteritems(): + q = q.filter(getattr(Epoch, fieldname)==unicode(value)) # filter by psd_name + + if pf.num_mux_cal_cycle >= 2: + log.debug('looking for num_bands = 1') + epochs = [e for e in q.all() if (e != self.job.data_container and e.num_bands == 1)] else: - aux_files = None + log.debug('looking for num_mux_cal_cycle >= 2') + epochs = [e for e in q.all() if (e != self.job.data_container and e.num_mux_cal_cycle >= 2)] + log.debug('candidates: %s' % str([e.primary_dataset.filenames for e in epochs])) + + # which epoch has the closest series number + series_num_diff = np.array([e.series for e in epochs]) - pf.series_no + closest = np.min(np.abs(series_num_diff))==np.abs(series_num_diff) + # there may be more than one. We prefer the prior scan. + closest = np.where(np.min(series_num_diff[closest])==series_num_diff)[0][0] + candidate = epochs[closest] + aux_file = os.path.join(self.nims_path, candidate.primary_dataset.relpath, candidate.primary_dataset.filenames[0]) + log.debug('identified aux_file: %s' % os.path.basename(aux_file)) + + self.job.activity = (u'Found aux file: %s' % os.path.basename(aux_file))[:255] + log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) - conv_type, conv_file = pf.convert(os.path.join(outputdir, ds.container.name), self.tempdir, self.max_recon_jobs, aux_files) + else: + log.debug('no special criteria') + aux_file = None - if conv_file: - outputdir_list = os.listdir(outputdir) - self.job.activity = (u'generated %s' % (', '.join([f for f in outputdir_list])))[:255] - log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) - dataset = Dataset.at_path(self.nims_path, u'nifti') - DBSession.add(self.job) - DBSession.add(self.job.data_container) - dataset.kind = u'derived' - dataset.container = self.job.data_container - filenames = [] - for f in outputdir_list: - filenames.append(f) - shutil.copy2(os.path.join(outputdir, f), os.path.join(self.nims_path, dataset.relpath)) - dataset.filenames = filenames + pf.load_data(aux_file=aux_file) # don't monopolize system resources + if pf.failure_reason: # implies pf.data = None + self.job.activity = (u'error loading pfile: %s' % str(pf.failure_reason)) transaction.commit() - - pyramid_ds = Dataset.at_path(self.nims_path, u'img_pyr') DBSession.add(self.job) - DBSession.add(self.job.data_container) - outpath = os.path.join(self.nims_path, pyramid_ds.relpath, self.job.data_container.name+'.pyrdb') - nims_montage = nimsdata.write(pf, pf.data, outpath, filetype='montage') - self.job.activity = u'image pyramid generated' - log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) - pyramid_ds.kind = u'web' - pyramid_ds.container = self.job.data_container - pyramid_ds.filenames = os.listdir(os.path.join(self.nims_path, pyramid_ds.relpath)) + raise pf.failure_reason + + # attempt to write nifti, if write fails, let exception bubble up to pipeline process() + # exception will cause job to be marked as 'fail' + if pf.is_non_image: # implies dcm_acq.data = None + # non-image is an "expected" outcome, job has succeeded + # no error should be raised, job status should end up 'done' + self.job.activity = (u'pfile %s is a non-image type' % pfile_tgz) transaction.commit() + else: + conv_file = nimsdata.write(pf, pf.data, outbase, filetype='nifti') + if conv_file: + outputdir_list = [f for f in os.listdir(outputdir) if not os.path.isdir(os.path.join(outputdir, f))] + self.job.activity = (u'generated %s' % (', '.join([f for f in outputdir_list])))[:255] + log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) + dataset = Dataset.at_path(self.nims_path, u'nifti') + DBSession.add(self.job) + DBSession.add(self.job.data_container) + dataset.kind = u'derived' + dataset.container = self.job.data_container + filenames = [] + for f in outputdir_list: + filenames.append(f) + shutil.copy2(os.path.join(outputdir, f), os.path.join(self.nims_path, dataset.relpath)) + dataset.filenames = filenames + transaction.commit() + + pyramid_ds = Dataset.at_path(self.nims_path, u'img_pyr') + DBSession.add(self.job) + DBSession.add(self.job.data_container) + outpath = os.path.join(self.nims_path, pyramid_ds.relpath, self.job.data_container.name) + nims_montage = nimsdata.write(pf, pf.data, outpath, filetype='montage') + self.job.activity = u'generated image pyramid %s' % nims_montage + log.info(u'%d %s %s' % (self.job.id, self.job, self.job.activity)) + pyramid_ds.kind = u'web' + pyramid_ds.container = self.job.data_container + pyramid_ds.filenames = os.listdir(os.path.join(self.nims_path, pyramid_ds.relpath)) + transaction.commit() + + DBSession.add(self.job) DBSession.add(self.job) @@ -380,3 +497,4 @@ def term_handler(signum, stack): processor.run() log.warning('Process halted') + diff --git a/nimsproc/sorter.py b/nimsproc/sorter.py index 31e4eea..5e442b2 100755 --- a/nimsproc/sorter.py +++ b/nimsproc/sorter.py @@ -3,14 +3,17 @@ # @author: Gunnar Schaefer, Reno Bowen import os +import glob import time import shutil import logging +import tarfile import datetime import transaction import nimsdata import nimsgears.model +import tempdir as tempfile log = logging.getLogger('sorter') @@ -38,6 +41,14 @@ def run(self): if os.path.islink(stage_item): os.remove(stage_item) elif 'gephysio' in os.path.basename(stage_item): # HACK !!!!!!!!!!!!!!!! NIMS 1.0 cannot sort gephysio + log.info('Unpacking %s' % os.path.basename(stage_item)) + with tempfile.TemporaryDirectory() as tempdir_path: + with tarfile.open(stage_item) as archive: + archive.extractall(path=tempdir_path) + physiodir_path = os.listdir(tempdir_path)[0] + for f in os.listdir(os.path.join(tempdir_path, physiodir_path)): + shutil.copy(os.path.join(tempdir_path, physiodir_path, f), os.path.join(self.nims_path, 'physio')) + log.info('Unpacked %s' % os.path.basename(stage_item)) os.remove(stage_item) elif os.path.isfile(stage_item): self.sort(stage_item) @@ -54,7 +65,22 @@ def sort(self, filepath): filename = os.path.basename(filepath) try: log.info('Parsing %s' % filename) - mrfile = nimsdata.parse(filepath) + if 'pfile' in filename: + with tempfile.TemporaryDirectory(dir=None) as tempdir_path: + with tarfile.open(filepath) as archive: + archive.extractall(path=tempdir_path) + subdir = os.listdir(tempdir_path)[0] + f = glob.glob(os.path.join(tempdir_path, subdir, 'P?????.7'))[0] + try: + mrfile = nimsdata.parse(f, filetype='pfile', full_parse=True) + except Exception: + pass + else: + mrfile = nimsdata.parse(filepath) + mrfile.num_mux_cal_cycle = None # dcms will never have num_mux_cal_cycles + if mrfile.is_screenshot: + mrfile.acq_no = 0 + mrfile.timestamp = datetime.datetime.strptime(datetime.datetime.strftime(mrfile.timestamp, '%Y%m%d') + '235959', '%Y%m%d%H%M%S') except nimsdata.NIMSDataError: log.warning('Cannot sort %s' % filename) if self.preserve_path: diff --git a/nimsproc/tempdir.py b/nimsproc/tempdir.py new file mode 100644 index 0000000..e9cffae --- /dev/null +++ b/nimsproc/tempdir.py @@ -0,0 +1,94 @@ +"""This is a backport of TemporaryDirectory from Python 3.3.""" + +from __future__ import print_function + +import warnings as _warnings +import sys as _sys +import os as _os + +from tempfile import mkdtemp + +template = "tmp" + + +class TemporaryDirectory(object): + """Create and return a temporary directory. This has the same + behavior as mkdtemp but can be used as a context manager. For + example: + + with TemporaryDirectory() as tmpdir: + ... + + Upon exiting the context, the directory and everything contained + in it are removed. + """ + + def __init__(self, suffix="", prefix=template, dir=None): + self._closed = False + self.name = None # Handle mkdtemp raising an exception + self.name = mkdtemp(suffix, prefix, dir) + + def __repr__(self): + return "<{} {!r}>".format(self.__class__.__name__, self.name) + + def __enter__(self): + return self.name + + def cleanup(self, _warn=False): + if self.name and not self._closed: + try: + self._rmtree(self.name) + except (TypeError, AttributeError) as ex: + # Issue #10188: Emit a warning on stderr + # if the directory could not be cleaned + # up due to missing globals + if "None" not in str(ex): + raise + print("ERROR: {!r} while cleaning up {!r}".format(ex, self,), + file=_sys.stderr) + return + self._closed = True + if _warn: + self._warn("Implicitly cleaning up {!r}".format(self), + ResourceWarning) + + def __exit__(self, exc, value, tb): + self.cleanup() + + def __del__(self): + # Issue a ResourceWarning if implicit cleanup needed + self.cleanup(_warn=True) + + # XXX (ncoghlan): The following code attempts to make + # this class tolerant of the module nulling out process + # that happens during CPython interpreter shutdown + # Alas, it doesn't actually manage it. See issue #10188 + _listdir = staticmethod(_os.listdir) + _path_join = staticmethod(_os.path.join) + _isdir = staticmethod(_os.path.isdir) + _islink = staticmethod(_os.path.islink) + _remove = staticmethod(_os.remove) + _rmdir = staticmethod(_os.rmdir) + _os_error = OSError + _warn = _warnings.warn + + def _rmtree(self, path): + # Essentially a stripped down version of shutil.rmtree. We can't + # use globals because they may be None'ed out at shutdown. + for name in self._listdir(path): + fullname = self._path_join(path, name) + try: + isdir = self._isdir(fullname) and not self._islink(fullname) + except self._os_error: + isdir = False + if isdir: + self._rmtree(fullname) + else: + try: + self._remove(fullname) + except self._os_error: + pass + try: + self._rmdir(path) + except self._os_error: + pass From a1421c7fafbc94872d09e202fe564e406543a56c Mon Sep 17 00:00:00 2001 From: Gunnar Schaefer Date: Tue, 30 Sep 2014 15:28:48 -0700 Subject: [PATCH 3/5] add upload route --- apache.conf | 6 ++++++ nimsgears/controllers/root.py | 21 ++++++++++++++++++++- nimsutil/nimsutil.py | 5 +++-- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/apache.conf b/apache.conf index da677ed..5c81198 100644 --- a/apache.conf +++ b/apache.conf @@ -23,3 +23,9 @@ Alias /nims/javascript /var/local/nims/nimsgears/public/javascript WebAuthDoLogout on + + + Order deny,allow + Deny from all + # add allowed hosts + diff --git a/nimsgears/controllers/root.py b/nimsgears/controllers/root.py index bfe18c0..e172526 100644 --- a/nimsgears/controllers/root.py +++ b/nimsgears/controllers/root.py @@ -3,11 +3,13 @@ import os import json +import uuid +import hashlib import datetime import tempfile import subprocess -from tg import config, expose, flash, lurl, request, redirect, response +from tg import abort, config, expose, flash, lurl, request, redirect, response from tg.i18n import ugettext as _, lazy_ugettext as l_ import webob.exc @@ -257,3 +259,20 @@ def download(self, **kwargs): tar_proc = subprocess.Popen('tar -chf - -C %s nims; rm -r %s' % (temp_dir, temp_dir), shell=True, stdout=subprocess.PIPE) response.content_disposition = 'attachment; filename=%s_%s' % ('nims', datetime.datetime.now().strftime('%Y%m%d_%H%M%S')) return tar_proc.stdout + + @expose() + def upload(self, filename='upload', **kwargs): + if 'Content-MD5' not in request.headers: + abort(400, 'Request must contain a valid "Content-MD5" header.') + stage_path = config.get('upload_path') + with nimsutil.TempDir(prefix='.tmp', dir=stage_path) as tempdir_path: + hash_ = hashlib.sha1() + upload_filepath = os.path.join(tempdir_path, filename) + with open(upload_filepath, 'wb') as upload_file: + for chunk in iter(lambda: request.body_file.read(2**20), ''): + hash_.update(chunk) + upload_file.write(chunk) + if hash_.hexdigest() != request.headers['Content-MD5']: + abort(400, 'Content-MD5 mismatch (or unset).') + print 'upload from %s: %s [%s]' % (request.user_agent, os.path.basename(upload_filepath), nimsutil.hrsize(request.content_length)) + os.rename(upload_filepath, os.path.join(stage_path, str(uuid.uuid1()) + '_' + filename)) # add UUID to prevent clobbering files diff --git a/nimsutil/nimsutil.py b/nimsutil/nimsutil.py index 18e3765..36f20fa 100644 --- a/nimsutil/nimsutil.py +++ b/nimsutil/nimsutil.py @@ -19,13 +19,14 @@ class TempDir(object): """Context managed temporary directory creation and automatic removal.""" - def __init__(self, dir=None): + def __init__(self, dir=None, prefix='tmp'): self.dir = dir + self.prefix = prefix super(TempDir, self).__init__() def __enter__(self): """Create temporary directory on context entry, returning the path.""" - self.temp_dir = tempfile.mkdtemp(dir=self.dir) + self.temp_dir = tempfile.mkdtemp(dir=self.dir, prefix=self.prefix) return self.temp_dir def __exit__(self, exc_type, exc_value, traceback): From 7e171b53465e89afd4bd0965bb62e9996a18c86a Mon Sep 17 00:00:00 2001 From: "Kevin S. Hahn" Date: Wed, 10 Dec 2014 14:33:19 -0800 Subject: [PATCH 4/5] adds update guide, tmux startup scripts and config --- apache.conf | 1 + nims.tmux | 36 ++++++++++++++++ nims.tmux.sample | 57 ++++++++++++++++++++++++ predator.tmux | 29 +++++++++++++ reaper.tmux | 43 ++++++++++++++++++ update_guide.txt | 110 +++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 276 insertions(+) create mode 100755 nims.tmux create mode 100644 nims.tmux.sample create mode 100755 predator.tmux create mode 100755 reaper.tmux create mode 100644 update_guide.txt diff --git a/apache.conf b/apache.conf index 5c81198..884f8e5 100644 --- a/apache.conf +++ b/apache.conf @@ -3,6 +3,7 @@ WSGIDaemonProcess nims user=nims group=nims processes=2 threads=4 maximum-reques WSGIApplicationGroup %{GLOBAL} WSGIProcessGroup nims + Require all granted WSGIScriptAlias /nims /var/local/nims/nimsgears/public/nims.wsgi diff --git a/nims.tmux b/nims.tmux new file mode 100755 index 0000000..cc78274 --- /dev/null +++ b/nims.tmux @@ -0,0 +1,36 @@ +#!/bin/bash +SESSION="CNI-NIMS" + +POSTGRES_USER="nims" +POSTGRES_PW="nims" +POSTGRES_HOST="cnifs.stanford.edu" +POSTGRES_PORT="5432" +POSTGRES_DB="nims" +POSTGRES="postgresql://${POSTGRES_USER}:${POSTGRES_PW}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}" + +UNSORTABLE_PATH="/scratch/cni/unsortable" # where to place unsortable files +STAGE_PATH="/scratch/cni/upload" # where uploads are placed, where sorter looks for new files +NIMS_PATH="/cnifs/nims" # base path where files get sorted +PHYSIO_PATH="/cnifs/nims/physio" # where physio files are unpacked into + +# create +cd /var/local/nims +tmux new-session -s "$SESSION" -n bash -d + +# sorter +tmux new-window -t "$SESSION:1" -n "sorter" +tmux send-keys -t "$SESSION:1" \ + "source /var/local/tg2env/bin/activate" C-m +tmux send-keys -t "$SESSION:1" \ + "PYTHONPATH=. nimsproc/sorter.py -p ${UNSORTABLE_PATH} ${POSTGRES} ${STAGE_PATH} ${NIMS_PATH}" C-m + +# scheduler +tmux new-window -t "$SESSION:2" -n "scheduler" +tmux send-keys -t "$SESSION:2" \ + "source /var/local/tg2env/bin/activate" C-m +tmux send-keys -t "$SESSION:2" \ + "PYTHONPATH=. nimsproc/scheduler.py ${POSTGRES} ${NIMS_PATH}" C-m + +# attach to session +tmux select-window -t "$SESSION:0" +tmux attach-session -t "$SESSION" diff --git a/nims.tmux.sample b/nims.tmux.sample new file mode 100644 index 0000000..d5b3769 --- /dev/null +++ b/nims.tmux.sample @@ -0,0 +1,57 @@ +#!/bin/bash +SESSION="SESSION-NAME" + +POSTGRES_USER="db_user" +POSTGRES_PW="password" +POSTGRES_HOST="postgres.example.com" +POSTGRES_PORT="5432" +POSTGRES_DB="db_name" +POSTGRES="postgresql://${POSTGRES_USER}:${POSTGRES_PW}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}" + +UNSORTABLE_PATH="/scratch/example/unsortable" # where to place unsortable files +STAGE_PATH="/scratch/example/upload" # where uploads are placed, where sorter looks for new files +NIMS_PATH="/examplefs/nims" # base path where files get sorted +PHYSIO_PATH="/examplefs/nims/physio" # where physio files are unpacked into + +# create +cd /var/local/nims +tmux new-session -s "$SESSION" -n bash -d + +# sorter +tmux new-window -t "$SESSION:1" -n "sorter" +tmux send-keys -t "$SESSION:1" \ + "source /var/local/tg2env/bin/activate" C-m +tmux send-keys -t "$SESSION:1" \ + "nimsproc/sorter.py -p ${UNSORTABLE_PATH} ${POSTGRES} ${STAGE_PATH} ${NIMS_PATH}" C-m + +# scheduler +tmux new-window -t "$SESSION:2" -n "scheduler" +tmux send-keys -t "$SESSION:2" \ + "source /var/local/tg2env/bin/activate" C-m +tmux send-keys -t "$SESSION:2" \ + "nimsproc/scheduler.py ${POSTGRES} ${NIMS_PATH}" C-m + +# processor +tmux new-window -t "$SESSION:3" -n "processor" +tmux send-keys -t "$SESSION:3" \ + "source /var/local/tg2env/bin/activate" C-m +tmux send-keys -t "$SESSION:3" \ + "nimsproc/processor.py -j8 -t /scratch_spinning -e \"~Epoch.psd.contains(u'mux')\" ${POSTGRES} ${NIMS_PATH} ${PHYSIO_PATH}" C-m + +# mux processor +tmux new-window -t "$SESSION:4" -n "mux_proc" +tmux send-keys -t "$SESSION:4" \ + "source /var/local/tg2env/bin/activate" C-m +tmux send-keys -t "$SESSION:4" \ + "nimsproc/processor.py -j1 -k32 -t /scratch_spinning -e \"Epoch.psd.contains(u'mux')\" ${POSTGRES} ${NIMS_PATH} ${PHYSIO_PATH}" C-m + +# QA +tmux new-window -t "$SESSION:5" -n "qa" +tmux send-keys -t "$SESSION:5" \ + "source /var/local/tg2env/bin/activate" C-m +tmux send-keys -t "$SESSION:5" \ + "nimsproc/qa_report.py -j8 ${POSTGRES} ${NIMS_PATH}" C-m + +# attach to session +tmux select-window -t "$SESSION:0" +tmux attach-session -t "$SESSION" diff --git a/predator.tmux b/predator.tmux new file mode 100755 index 0000000..9692a6e --- /dev/null +++ b/predator.tmux @@ -0,0 +1,29 @@ +#!/bin/bash +SESSION="PREDATOR" + +POSTGRES_USER="nims" +POSTGRES_PW="nims" +POSTGRES_HOST="cnifs.stanford.edu" +POSTGRES_PORT="5432" +POSTGRES_DB="nims" +POSTGRES="postgresql://${POSTGRES_USER}:${POSTGRES_PW}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}" + +UNSORTABLE_PATH="/scratch/cni/unsortable" # where to place unsortable files +STAGE_PATH="/scratch/cni/upload" # where uploads are placed, where sorter looks for new files +NIMS_PATH="/cnifs/nims" # base path where files get sorted +PHYSIO_PATH="/cnifs/nims/physio" # where physio files are unpacked into + +# create +cd ~/nims +tmux new-session -s "$SESSION" -n bash -d + +# mux processor +tmux new-window -t "$SESSION:1" -n "mux_proc" +tmux send-keys -t "$SESSION:1" \ + "source ~/tg2env/bin/activate" C-m +tmux send-keys -t "$SESSION:1" \ + "nimsproc/processor.py -j1 -k32 -t /scratch -e \"Epoch.psd.contains(u'mux')\" ${POSTGRES} ${NIMS_PATH} ${PHYSIO_PATH}" C-m + +# attach to session +tmux select-window -t "$SESSION:0" +tmux attach-session -t "$SESSION" diff --git a/reaper.tmux b/reaper.tmux new file mode 100755 index 0000000..ede7fd6 --- /dev/null +++ b/reaper.tmux @@ -0,0 +1,43 @@ +#!/bin/bash +SESSION="REAPER" + +POSTGRES_USER="nims" +POSTGRES_PW="nims" +POSTGRES_HOST="cnifs.stanford.edu" +POSTGRES_PORT="5432" +POSTGRES_DB="nims" +POSTGRES="postgresql://${POSTGRES_USER}:${POSTGRES_PW}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}" + +UNSORTABLE_PATH="/scratch/cni/unsortable" # where to place unsortable files +STAGE_PATH="/scratch/cni/upload" # where uploads are placed, where sorter looks for new files +NIMS_PATH="/cnifs/nims" # base path where files get sorted +PHYSIO_PATH="/cnifs/nims/physio" # where physio files are unpacked into + +# create +cd nims +tmux new-session -s "$SESSION" -n bash -d + +# processor +tmux new-window -t "$SESSION:1" -n "processor" +tmux send-keys -t "$SESSION:1" \ + "source ~/tg2env/bin/activate" C-m +tmux send-keys -t "$SESSION:1" \ + "nimsproc/processor.py -j8 -t /scratch_spinning -e \"~Epoch.psd.contains(u'mux')\" ${POSTGRES} ${NIMS_PATH} ${PHYSIO_PATH}" C-m + +# mux processor +tmux new-window -t "$SESSION:2" -n "mux_proc" +tmux send-keys -t "$SESSION:2" \ + ". ~/tg2env/bin/activate" C-m +tmux send-keys -t "$SESSION:2" \ + "nimsproc/processor.py -j1 -k32 -t /scratch_spinning -e \"Epoch.psd.contains(u'mux')\" ${POSTGRES} ${NIMS_PATH} ${PHYSIO_PATH}" C-m + +# QA +tmux new-window -t "$SESSION:3" -n "qa" +tmux send-keys -t "$SESSION:3" \ + ". ~/tg2env/bin/activate" C-m +tmux send-keys -t "$SESSION:3" \ + "nimsproc/qa_report.py -j8 ${POSTGRES} ${NIMS_PATH}" C-m + +# attach to session +tmux select-window -t "$SESSION:0" +tmux attach-session -t "$SESSION" diff --git a/update_guide.txt b/update_guide.txt new file mode 100644 index 0000000..13d29d5 --- /dev/null +++ b/update_guide.txt @@ -0,0 +1,110 @@ +NIMS 1.0 to NIMS 1.1 Upgrade Guide +================================== + +Upgrade is easier to do on a Monday morning, hopefully when no new data is being reaped/sorted/processed. + + +1. Stop all processes. + - halt reapers, sorters, schedulers, processors, qa, nimsfs, etc + - this could involve halting processes across several different machines + - take note of the exact time processes were stopped, will need this to restart reaper at appropriate time. + - take note of the last run job numbers, will need this if any jobs must be rerun + +2. Close all connections to postgres DB. Note that this command restarts the entire postgres DB. + - `/usr/local/etc/rc.d/postgresql restart` + +3. Update postgres DB to add new information that is needed for the new nimsdata and processor code. + - `cd ~/nims` + - `source ~/tg2env/bin/activate;` + - `psql -h postgres.example.com -p 5432 -U user db;` + - replace 'postgres.example.com' with the postgres server hostname + - replace '5432' with the postgres port + - replace 'user' with the postgres user + - replace 'db' with the nims database name + - `ALTER TABLE epoch ADD COLUMN num_mux_cal_cycle integer;` + - `ALTER TYPE dataset_qa_status ADD VALUE rerun;` + - `\dT+ dataset_qa_status` check for 'rerun' in the 'Elements' column. + - `\d+ Epoch` check 'num_mux_cal_cycle' in the 'Column' column. + - `\q` quit psql` + +4. Stash any local changes, if necessary. + - `cd ~/nims` + - `git remote update` + - `git stash save 'pre-upgrade local changes' + +5. Upgrade nims code and nimsdata code. + - `cd ~/nims` + - `git checkout ksh-nims1.1` + - `git pull` + - `cd nimsdata` + - `git pull` + - `git submodule init` + - `git submodule update` + +6. Re-apply stashed code, if necessary + - `cd ~/nims` + - `git stash apply` + - if `git stash apply` successful, then `git stash drop` + +5. Update NFS mounts, if necessary. This will depend how duties are divided between machines. + - reaper does not need NFS access to anything + - sorter needs NFS access to filesytem containing the files + - scheduler does not need NFS access to anything + - processors need NFS access to filesystem containing the files + +6. Clean the cache. + - clean contents of `/var/cache/nimsgears/` + - create cache subdirectories + - `/var/cache/nimsgears/sessions/` + +7. Restart processes using supplied tmux start scripts. Some edits may be necessary. + + +Oops Handling +============= +If some data was reaped and sorted with bad metadata, you will want to purge those specific +database entries, datasets, and corresponding files. + +1. Stop the reaper. +2. Reset the reaper datetime file to desired datetime. +3. Stop sorter and scheduler. +4. Remove the Epochs and Datasets database entries, and also delete the corresponding files. + + import os + import shutil + import datetime + import transaction + from nimsgears.model import * + for j in Epoch.query.filter(Epoch.timestamp > datetime.datetime(2014,12,5,7,0,0)).all(): + for i in j.datasets: + if os.path.exists(os.path.join('/cnifs/nims', i.relpath)): + print os.path.join('/cnifs/nims', i.relpath) + shutil.rmtree(os.path.join('/cnifs/nims', i.relpath)) + i.delete + j.delete + +5. Remove the Session and Session Subject database entries. + + for s in Session.query.filter(Session.timestamp > datetime.datetime(2014,12,5,7,0,0)).all(): + s.subject.delete() + s.delete() + +6. Restart reaper, sorter scheduler + + +Verifying the Upgrade +===================== +1. login +2. inspect a few new datasets + - is exam number being set? + - is subject number being set? + - are jobs processing? +3. inspect a few specific scan types + - localizers + - does image viewer work? + - does volume viewer work? + - anat (or any other single timepoint scan) + - does image viewer work? + - does volume viewer work? + - any scan type + - does downloading data work From a87c567ae005a0dae5bb907963a01f0d8c242ae4 Mon Sep 17 00:00:00 2001 From: "Kevin S. Hahn" Date: Mon, 15 Dec 2014 17:38:53 -0800 Subject: [PATCH 5/5] processor updates some Epoch metadata after recon - closes #26 --- nimsproc/processor.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/nimsproc/processor.py b/nimsproc/processor.py index ec5d0c9..5deb058 100755 --- a/nimsproc/processor.py +++ b/nimsproc/processor.py @@ -294,6 +294,11 @@ def process(self): DBSession.add(self.job.data_container) conv_ds.kind = u'derived' conv_ds.container = self.job.data_container + conv_ds.container.size = dcm_acq.size + conv_ds.container.mm_per_vox = dcm_acq.mm_per_vox + conv_ds.container.num_slices = dcm_acq.num_slices + conv_ds.container.num_timepoints = dcm_acq.num_timepoints + conv_ds.container.duration = dcm_acq.duration filenames = [] for f in outputdir_list: filenames.append(f) @@ -438,6 +443,11 @@ def process(self): DBSession.add(self.job.data_container) dataset.kind = u'derived' dataset.container = self.job.data_container + dataset.container.size = pf.size + dataset.container.mm_per_vox = pf.mm_per_vox + dataset.container.num_slices = pf.num_slices + dataset.container.num_timepoints = pf.num_timepoints + dataset.container.duration = pf.duration filenames = [] for f in outputdir_list: filenames.append(f)