Skip to content

Commit

Permalink
Conform with GAPs-style Status class
Browse files Browse the repository at this point in the history
  • Loading branch information
ppinchuk committed Aug 28, 2023
1 parent f32c5f4 commit 46a8bdc
Showing 1 changed file with 40 additions and 17 deletions.
57 changes: 40 additions & 17 deletions sup3r/utilities/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,39 @@
import os
import json

from reV.pipeline.status import Status
from gaps import Status
from gaps.config import load_config

from rex.utilities.execution import SubprocessManager
from rex.utilities.hpc import SLURM
from rex.utilities.loggers import init_mult

from sup3r.version import __version__
from sup3r.pipeline.config import BaseConfig
from sup3r.utilities import ModuleName


logger = logging.getLogger(__name__)


class SlurmManager(SLURM):
"""GAPs-compliant SLURM manager"""

def check_status_using_job_id(self, job_id):
"""Check the status of a job using the HPC queue and job ID.
Parameters
----------
job_id : int
Job integer ID number.
Returns
-------
status : str | None
Queue job status string or `None` if not found.
"""
return self.check_status(job_id=job_id)


class BaseCLI:
"""Base CLI class used to create CLI for modules in ModuleName"""

Expand Down Expand Up @@ -89,7 +108,7 @@ def from_config_preflight(cls, module_name, ctx, config_file, verbose):
ctx.obj['VERBOSE'] = verbose
status_dir = os.path.dirname(os.path.abspath(config_file))
ctx.obj['OUT_DIR'] = status_dir
config = BaseConfig(config_file)
config = load_config(config_file)
config['status_dir'] = status_dir
log_file = config.get('log_file', None)
log_pattern = config.get('log_pattern', None)
Expand Down Expand Up @@ -167,20 +186,21 @@ def kickoff_slurm_job(cls, module_name, ctx, cmd, alloc='sup3r',
out_dir = ctx.obj['OUT_DIR']
slurm_manager = ctx.obj.get('SLURM_MANAGER', None)
if slurm_manager is None:
slurm_manager = SLURM()
slurm_manager = SlurmManager()
ctx.obj['SLURM_MANAGER'] = slurm_manager

status = Status.retrieve_job_status(out_dir,
module=module_name,
command=module_name,
job_name=name,
hardware='slurm',
subprocess_manager=slurm_manager)

msg = f'sup3r {module_name} CLI failed to submit jobs!'
if status == 'successful':
msg = (f'Job "{name}" is successful in status json found in '
f'"{out_dir}", not re-running.')
elif 'fail' not in str(status).lower() and status is not None:
elif ('fail' not in str(status).lower()
and status != 'not submitted'
and status is not None):
msg = (f'Job "{name}" was found with status "{status}", not '
'resubmitting')
else:
Expand All @@ -193,15 +213,16 @@ def kickoff_slurm_job(cls, module_name, ctx, cmd, alloc='sup3r',
feature=feature,
name=name,
stdout_path=stdout_path)[0]

if out:
msg = (f'Kicked off sup3r {module_name} job "{name}" '
f'(SLURM jobid #{out}).')

# add job to sup3r status file.
Status.add_job(out_dir, module=module_name,
job_name=name, replace=True,
job_attrs={'job_id': out, 'hardware': 'slurm'})

Status.mark_job_as_submitted(out_dir, command=module_name,
job_name=name, replace=True,
job_attrs={'job_id': out,
'hardware': 'slurm'})
click.echo(msg)
logger.info(msg)

Expand All @@ -225,20 +246,22 @@ def kickoff_local_job(cls, module_name, ctx, cmd):
out_dir = ctx.obj['OUT_DIR']
subprocess_manager = SubprocessManager
status = Status.retrieve_job_status(out_dir,
module=module_name,
command=module_name,
job_name=name)
msg = f'sup3r {module_name} CLI failed to submit jobs!'
if status == 'successful':
msg = (f'Job "{name}" is successful in status json found in '
f'"{out_dir}", not re-running.')
elif 'fail' not in str(status).lower() and status is not None:
elif ('fail' not in str(status).lower()
and status != 'not submitted'
and status is not None):
msg = (f'Job "{name}" was found with status "{status}", not '
'resubmitting')
else:
logger.info(f'Running sup3r {module_name} locally with job '
f'name "{name}".')
Status.add_job(out_dir, module=module_name, job_name=name,
replace=True)
Status.mark_job_as_submitted(out_dir, command=module_name,
job_name=name, replace=True)
subprocess_manager.submit(cmd)
msg = (f'Completed sup3r {module_name} job "{name}".')

Expand Down Expand Up @@ -269,7 +292,7 @@ def add_status_cmd(cls, config, module_name, cmd):
status_dir = config.get('status_dir', None)
if job_name is not None and status_dir is not None:
status_file_arg_str = f'"{status_dir}", '
status_file_arg_str += f'module="{module_name}", '
status_file_arg_str += f'command="{module_name}", '
status_file_arg_str += f'job_name="{job_name}", '
status_file_arg_str += 'attrs=job_attrs'

Expand All @@ -279,6 +302,6 @@ def add_status_cmd(cls, config, module_name, cmd):
.replace("true", "True")))
cmd += 'job_attrs.update({"job_status": "successful"});\n'
cmd += 'job_attrs.update({"time": t_elap});\n'
cmd += f"Status.make_job_file({status_file_arg_str})"
cmd += f"Status.make_single_job_file({status_file_arg_str})"

return cmd

0 comments on commit 46a8bdc

Please sign in to comment.