Skip to content

Commit

Permalink
Add reliance on pipeline_step
Browse files Browse the repository at this point in the history
  • Loading branch information
ppinchuk committed Nov 4, 2023
1 parent 69283cd commit 22ab050
Show file tree
Hide file tree
Showing 19 changed files with 181 additions and 81 deletions.
3 changes: 2 additions & 1 deletion sup3r/bias/bias_calc.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ def get_node_cmd(cls, config):
f"{fun_str};\n"
"t_elap = time.time() - t0;\n")

cmd = BaseCLI.add_status_cmd(config, ModuleName.BIAS_CALC, cmd)
pipeline_step = config.get('pipeline_step') or ModuleName.BIAS_CALC
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";\'\n"

return cmd.replace('\\', '/')
Expand Down
26 changes: 18 additions & 8 deletions sup3r/bias/bias_calc_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def main(ctx, verbose):
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config_file, verbose=False, **__):
def from_config(ctx, config_file, verbose=False, pipeline_step=None):
"""Run sup3r bias correction calculation from a config file."""
config = BaseCLI.from_config_preflight(ModuleName.BIAS_CALC, ctx,
config_file, verbose)
Expand All @@ -56,20 +56,22 @@ def from_config(ctx, config_file, verbose=False, **__):
name = ('{}_{}'.format(basename, str(i_node).zfill(6)))
ctx.obj['NAME'] = name
node_config['job_name'] = name
node_config["pipeline_step"] = pipeline_step

cmd = BiasCalcClass.get_node_cmd(node_config)

cmd_log = '\n\t'.join(cmd.split('\n'))
logger.debug(f'Running command:\n\t{cmd_log}')

if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS:
kickoff_slurm_job(ctx, cmd, **exec_kwargs)
kickoff_slurm_job(ctx, cmd, pipeline_step, **exec_kwargs)
else:
kickoff_local_job(ctx, cmd)
kickoff_local_job(ctx, cmd, pipeline_step)


def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
feature=None, stdout_path='./stdout/'):
def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r',
memory=None, walltime=4, feature=None,
stdout_path='./stdout/'):
"""Run sup3r on HPC via SLURM job submission.
Parameters
Expand All @@ -79,6 +81,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
cmd : str
Command to be submitted in SLURM shell script. Example:
'python -m sup3r.cli forward_pass -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
alloc : str
HPC project (allocation) handle. Example: 'sup3r'.
memory : int
Expand All @@ -92,10 +98,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
Path to print .stdout and .stderr files.
"""
BaseCLI.kickoff_slurm_job(ModuleName.BIAS_CALC, ctx, cmd, alloc, memory,
walltime, feature, stdout_path)
walltime, feature, stdout_path, pipeline_step)


def kickoff_local_job(ctx, cmd):
def kickoff_local_job(ctx, cmd, pipeline_step=None):
"""Run sup3r bias calc locally.
Parameters
Expand All @@ -105,8 +111,12 @@ def kickoff_local_job(ctx, cmd):
cmd : str
Command to be submitted in shell script. Example:
'python -m sup3r.cli forward_pass -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
"""
BaseCLI.kickoff_local_job(ModuleName.BIAS_CALC, ctx, cmd)
BaseCLI.kickoff_local_job(ModuleName.BIAS_CALC, ctx, cmd, pipeline_step)


if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion sup3r/pipeline/forward_pass.py
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,8 @@ def get_node_cmd(cls, config):
f"{cls.__name__}.run(strategy, {node_index});\n"
"t_elap = time.time() - t0;\n")

cmd = BaseCLI.add_status_cmd(config, ModuleName.FORWARD_PASS, cmd)
pipeline_step = config.get('pipeline_step') or ModuleName.FORWARD_PASS
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";\'\n"

return cmd.replace('\\', '/')
Expand Down
26 changes: 18 additions & 8 deletions sup3r/pipeline/forward_pass_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def main(ctx, verbose):
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config_file, verbose=False, **__):
def from_config(ctx, config_file, verbose=False, pipeline_step=None):
"""Run sup3r forward pass from a config file."""

config = BaseCLI.from_config_preflight(ModuleName.FORWARD_PASS, ctx,
Expand Down Expand Up @@ -66,19 +66,21 @@ def from_config(ctx, config_file, verbose=False, **__):
name = ('{}_{}'.format(basename, str(i_node).zfill(6)))
ctx.obj['NAME'] = name
node_config['job_name'] = name
node_config["pipeline_step"] = pipeline_step
cmd = ForwardPass.get_node_cmd(node_config)

cmd_log = '\n\t'.join(cmd.split('\n'))
logger.debug(f'Running command:\n\t{cmd_log}')

if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS:
kickoff_slurm_job(ctx, cmd, **exec_kwargs)
kickoff_slurm_job(ctx, cmd, pipeline_step, **exec_kwargs)
else:
kickoff_local_job(ctx, cmd)
kickoff_local_job(ctx, cmd, pipeline_step)


def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
feature=None, stdout_path='./stdout/'):
def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r',
memory=None, walltime=4, feature=None,
stdout_path='./stdout/'):
"""Run sup3r on HPC via SLURM job submission.
Parameters
Expand All @@ -88,6 +90,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
cmd : str
Command to be submitted in SLURM shell script. Example:
'python -m sup3r.cli forward_pass -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
alloc : str
HPC project (allocation) handle. Example: 'sup3r'.
memory : int
Expand All @@ -101,10 +107,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
Path to print .stdout and .stderr files.
"""
BaseCLI.kickoff_slurm_job(ModuleName.FORWARD_PASS, ctx, cmd, alloc, memory,
walltime, feature, stdout_path)
walltime, feature, stdout_path, pipeline_step)


def kickoff_local_job(ctx, cmd):
def kickoff_local_job(ctx, cmd, pipeline_step=None):
"""Run sup3r forward pass locally.
Parameters
Expand All @@ -114,8 +120,12 @@ def kickoff_local_job(ctx, cmd):
cmd : str
Command to be submitted in shell script. Example:
'python -m sup3r.cli forward_pass -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
"""
BaseCLI.kickoff_local_job(ModuleName.FORWARD_PASS, ctx, cmd)
BaseCLI.kickoff_local_job(ModuleName.FORWARD_PASS, ctx, cmd, pipeline_step)


if __name__ == '__main__':
Expand Down
12 changes: 9 additions & 3 deletions sup3r/postprocessing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ def get_node_cmd(cls, config):
"t_elap = time.time() - t0;\n"
)

cmd = BaseCLI.add_status_cmd(config, ModuleName.DATA_COLLECT, cmd)

pipeline_step = config.get('pipeline_step') or ModuleName.DATA_COLLECT
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";\'\n"

return cmd.replace('\\', '/')
Expand Down Expand Up @@ -755,6 +755,7 @@ def collect(
log_file=None,
write_status=False,
job_name=None,
pipeline_step=None,
join_times=False,
target_final_meta_file=None,
n_writes=None,
Expand Down Expand Up @@ -786,6 +787,10 @@ def collect(
Flag to write status file once complete if running from pipeline.
job_name : str
Job name for status file if running from pipeline.
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``"collect``,
mimicking old reV behavior. By default, ``None``.
join_times : bool
Option to split full file list into chunks with each chunk having
the same temporal_chunk_index. The number of writes will then be
Expand Down Expand Up @@ -909,8 +914,9 @@ def collect(
'job_status': 'successful',
'runtime': (time.time() - t0) / 60,
}
pipeline_step = pipeline_step or 'collect'
Status.make_single_job_file(
os.path.dirname(out_file), 'collect', job_name, status
os.path.dirname(out_file), pipeline_step, job_name, status
)

logger.info('Finished file collection.')
28 changes: 19 additions & 9 deletions sup3r/postprocessing/data_collect_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def main(ctx, verbose):
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config_file, verbose=False, **__):
def from_config(ctx, config_file, verbose=False, pipeline_step=None):
"""Run sup3r data collection from a config file. If dset_split is True this
each feature will be collected into a separate file."""
config = BaseCLI.from_config_preflight(ModuleName.DATA_COLLECT, ctx,
Expand All @@ -56,7 +56,8 @@ def from_config(ctx, config_file, verbose=False, **__):
f_config.update({'features': [feature],
'out_file': f_out_file,
'job_name': f_job_name,
'log_file': f_log_file})
'log_file': f_log_file,
'pipeline_step': pipeline_step})

configs.append(f_config)

Expand All @@ -68,12 +69,12 @@ def from_config(ctx, config_file, verbose=False, **__):
logger.debug(f'Running command:\n\t{cmd_log}')

if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS:
kickoff_slurm_job(ctx, cmd, **exec_kwargs)
kickoff_slurm_job(ctx, cmd, pipeline_step, **exec_kwargs)
else:
kickoff_local_job(ctx, cmd)
kickoff_local_job(ctx, cmd, pipeline_step)


def kickoff_local_job(ctx, cmd):
def kickoff_local_job(ctx, cmd, pipeline_step=None):
"""Run sup3r data collection locally.
Parameters
Expand All @@ -83,12 +84,17 @@ def kickoff_local_job(ctx, cmd):
cmd : str
Command to be submitted in shell script. Example:
'python -m sup3r.cli data_collect -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
"""
BaseCLI.kickoff_local_job(ModuleName.DATA_COLLECT, ctx, cmd)
BaseCLI.kickoff_local_job(ModuleName.DATA_COLLECT, ctx, cmd, pipeline_step)


def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
feature=None, stdout_path='./stdout/'):
def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r',
memory=None, walltime=4, feature=None,
stdout_path='./stdout/'):
"""Run sup3r on HPC via SLURM job submission.
Parameters
Expand All @@ -98,6 +104,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
cmd : str
Command to be submitted in SLURM shell script. Example:
'python -m sup3r.cli data-collect -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
alloc : str
HPC project (allocation) handle. Example: 'sup3r'.
memory : int
Expand All @@ -111,7 +121,7 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
Path to print .stdout and .stderr files.
"""
BaseCLI.kickoff_slurm_job(ModuleName.DATA_COLLECT, ctx, cmd, alloc, memory,
walltime, feature, stdout_path)
walltime, feature, stdout_path, pipeline_step)


if __name__ == '__main__':
Expand Down
22 changes: 16 additions & 6 deletions sup3r/preprocessing/data_extract_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def main(ctx, verbose):
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config_file, verbose=False, **__):
def from_config(ctx, config_file, verbose=False, pipeline_step=None):
"""Run sup3r data extraction from a config file.
Parameters
Expand All @@ -46,6 +46,7 @@ def from_config(ctx, config_file, verbose=False, **__):
"""
config = BaseCLI.from_config_preflight(ModuleName.DATA_EXTRACT, ctx,
config_file, verbose)
config["pipeline_step"] = pipeline_step

exec_kwargs = config.get('execution_control', {})
hardware_option = exec_kwargs.pop('option', 'local')
Expand All @@ -63,7 +64,7 @@ def from_config(ctx, config_file, verbose=False, **__):
kickoff_local_job(ctx, cmd)


def kickoff_local_job(ctx, cmd):
def kickoff_local_job(ctx, cmd, pipeline_step=None):
"""Run sup3r data extraction locally.
Parameters
Expand All @@ -73,12 +74,17 @@ def kickoff_local_job(ctx, cmd):
cmd : str
Command to be submitted in shell script. Example:
'python -m sup3r.cli data_extract -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
"""
BaseCLI.kickoff_local_job(ModuleName.DATA_EXTRACT, ctx, cmd)
BaseCLI.kickoff_local_job(ModuleName.DATA_EXTRACT, ctx, cmd, pipeline_step)


def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
feature=None, stdout_path='./stdout/'):
def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r',
memory=None, walltime=4, feature=None,
stdout_path='./stdout/'):
"""Run sup3r on HPC via SLURM job submission.
Parameters
Expand All @@ -88,6 +94,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
cmd : str
Command to be submitted in SLURM shell script. Example:
'python -m sup3r.cli data_extract -c <config_file>'
pipeline_step : str, optional
Name of the pipeline step being run. If ``None``, the
``pipeline_step`` will be set to the ``module_name``,
mimicking old reV behavior. By default, ``None``.
alloc : str
HPC project (allocation) handle. Example: 'sup3r'.
memory : int
Expand All @@ -101,7 +111,7 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
Path to print .stdout and .stderr files.
"""
BaseCLI.kickoff_slurm_job(ModuleName.DATA_EXTRACT, ctx, cmd, alloc, memory,
walltime, feature, stdout_path)
walltime, feature, stdout_path, pipeline_step)


if __name__ == '__main__':
Expand Down
4 changes: 2 additions & 2 deletions sup3r/preprocessing/data_handling/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,8 @@ def get_node_cmd(cls, config):
f"data_handler = {dh_init_str};\n"
"t_elap = time.time() - t0;\n")

cmd = BaseCLI.add_status_cmd(config, ModuleName.DATA_EXTRACT, cmd)

pipeline_step = config.get('pipeline_step') or ModuleName.DATA_EXTRACT
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";\'\n"
return cmd.replace('\\', '/')

Expand Down
3 changes: 2 additions & 1 deletion sup3r/qa/qa.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ def get_node_cmd(cls, config):
"t_elap = time.time() - t0;\n"
)

cmd = BaseCLI.add_status_cmd(config, ModuleName.QA, cmd)
pipeline_step = config.get('pipeline_step') or ModuleName.QA
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";\'\n"

return cmd.replace('\\', '/')
Expand Down
5 changes: 3 additions & 2 deletions sup3r/qa/qa_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ def main(ctx, verbose):
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config_file, verbose=False, **__):
def from_config(ctx, config_file, verbose=False, pipeline_step=None):
"""Run the sup3r QA module from a config file."""
BaseCLI.from_config(ModuleName.QA, Sup3rQa, ctx, config_file, verbose)
BaseCLI.from_config(ModuleName.QA, Sup3rQa, ctx, config_file, verbose,
pipeline_step)


if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion sup3r/qa/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ def get_node_cmd(cls, config):
"t_elap = time.time() - t0;\n"
)

cmd = BaseCLI.add_status_cmd(config, ModuleName.STATS, cmd)
pipeline_step = config.get('pipeline_step') or ModuleName.STATS
cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd)
cmd += ";\'\n"

return cmd.replace('\\', '/')
Expand Down
Loading

0 comments on commit 22ab050

Please sign in to comment.