Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gb/bc #96

Merged
merged 29 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
efcf50e
added bias module with a place for bias transformation functions and …
grantbuster Sep 20, 2022
115e9c2
feature specific bias correction
grantbuster Sep 20, 2022
c184917
removed warnings about excessive padding - not a bad thing
grantbuster Sep 20, 2022
1d823e3
added a site-by-site linear bias correction calculation method
grantbuster Sep 21, 2022
8b7f25b
bug fix and logging
grantbuster Sep 21, 2022
56148db
bias calc mods and new functions
grantbuster Sep 22, 2022
50c3692
added bias calc cli
grantbuster Sep 22, 2022
2f7d4f7
added bias calc to main cli
grantbuster Sep 22, 2022
c132b81
make bias out dir
grantbuster Sep 22, 2022
915edd5
bug fixes and minor refactor to run on eagle
grantbuster Sep 22, 2022
7b28943
added local linear bias correct to forward pass bc options
grantbuster Sep 22, 2022
0cee671
added option to smooth spatial bias correction factors outside of the…
grantbuster Sep 23, 2022
aa0a040
better enumerated progress logging for fwp
grantbuster Sep 23, 2022
240a0d6
added bias correction option to QA
grantbuster Sep 23, 2022
9ac0905
minor refactor to bias correct u and v instead of windspeed and direc…
grantbuster Sep 23, 2022
d2fb1e2
fixed up the u/v QA with bias correction
grantbuster Sep 27, 2022
01fbeda
added meta data to bc h5 output attrs
grantbuster Sep 27, 2022
b370b9f
more bc convenience functions
grantbuster Sep 28, 2022
24c40b6
added monthly bias correction
grantbuster Sep 28, 2022
0deab95
added montly bias correction data transformation method and integrate…
grantbuster Sep 29, 2022
1f28ccc
fixed collection logic for undefined mask meta variable when file is …
grantbuster Oct 2, 2022
5d93283
added bias correction calc tests
grantbuster Oct 3, 2022
0638d7b
added bias transform calcs
grantbuster Oct 3, 2022
6cc6ced
added fwp+bc integration test
grantbuster Oct 3, 2022
ca24793
added qa+bc integration test
grantbuster Oct 3, 2022
b75b3fc
added version record to bias calc output files and incremented versio…
grantbuster Oct 4, 2022
b0a2c49
simplify qa test and pylint issue
grantbuster Oct 4, 2022
7b9c88f
fixed test on h5 meta attrs dtype and docstrings
grantbuster Oct 4, 2022
2ea15e3
serial data handling for QA+BC bug
grantbuster Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added bias calc cli
  • Loading branch information
grantbuster committed Oct 4, 2022
commit 50c36920daadc745e7d7fe4a055a328c7a634a70
61 changes: 61 additions & 0 deletions sup3r/bias/bias_calc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
going to be fed into the sup3r downscaling models. This is typically used to
bias correct GCM data vs. some historical record like the WTK or NSRDB."""
import h5py
import json
import logging
import numpy as np
import pandas as pd
from scipy.spatial import KDTree
from scipy.stats import ks_2samp
from concurrent.futures import ProcessPoolExecutor, as_completed
import rex
from rex.utilities.fun_utils import get_fun_call_str
from sup3r.utilities import ModuleName
from sup3r.utilities.utilities import nn_fill_array
import sup3r.preprocessing.data_handling

Expand Down Expand Up @@ -133,6 +136,64 @@ def compare_dists(base_data, bias_data, adder=0, scalar=1):
out = ks_2samp(base_data, bias_data * scalar + adder)
return out.statistic

@classmethod
def get_node_cmd(cls, config):
"""Get a CLI call to call cls.run() on a single node based on an input
config.

Parameters
----------
config : dict
sup3r bias calc config with all necessary args and kwargs to
initialize the class and call run() on a single node.
"""
import_str = 'import time;\n'
import_str += 'from reV.pipeline.status import Status;\n'
import_str += 'from rex import init_logger;\n'
import_str += f'from sup3r.bias.bias_calc import {cls.__name__};\n'

if not hasattr(cls, 'run'):
msg = ('I can only get you a node command for subclasses of '
'DataRetrievalBase with a run() method.')
logger.error(msg)
raise NotImplementedError(msg)

init_str = get_fun_call_str(cls, config)
fun_str = get_fun_call_str(cls.run, config)

log_file = config.get('log_file', None)
log_level = config.get('log_level', 'INFO')
log_arg_str = (f'"sup3r", log_level="{log_level}"')
if log_file is not None:
log_arg_str += f', log_file="{log_file}"'

cmd = (f"python -c \'{import_str}\n"
"t0 = time.time();\n"
f"logger = init_logger({log_arg_str});\n"
f"bc = {init_str};\n"
f"bc.{fun_str};\n"
"t_elap = time.time() - t0;\n")

job_name = config.get('job_name', None)
if job_name is not None:
status_dir = config.get('status_dir', None)
status_file_arg_str = f'"{status_dir}", '
status_file_arg_str += f'module="{ModuleName.BIAS_CALC}", '
status_file_arg_str += f'job_name="{job_name}", '
status_file_arg_str += 'attrs=job_attrs'

cmd += ('job_attrs = {};\n'.format(json.dumps(config)
.replace("null", "None")
.replace("false", "False")
.replace("true", "True")))
cmd += 'job_attrs.update({"job_status": "successful"});\n'
cmd += 'job_attrs.update({"time": t_elap});\n'
cmd += f'Status.make_job_file({status_file_arg_str})'

cmd += (";\'\n")

return cmd.replace('\\', '/')

def get_base_gid(self, bias_gid, knn):
"""Get one or more base gid(s) corresponding to a bias gid.

Expand Down
206 changes: 206 additions & 0 deletions sup3r/bias/bias_calc_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# -*- coding: utf-8 -*-
"""
sup3r bias correction calculation CLI entry points.
"""
import copy
import click
import logging
import os

from reV.pipeline.status import Status

from rex.utilities.execution import SubprocessManager
from rex.utilities.hpc import SLURM
from rex.utilities.loggers import init_mult

import sup3r.bias.bias_calc
from sup3r.utilities import ModuleName
from sup3r.version import __version__
from sup3r.pipeline.config import BaseConfig


logger = logging.getLogger(__name__)


@click.group()
@click.version_option(version=__version__)
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def main(ctx, verbose):
"""Sup3r bias calc Command Line Interface"""
ctx.ensure_object(dict)
ctx.obj['VERBOSE'] = verbose


@main.command()
@click.option('--config_file', '-c', required=True,
type=click.Path(exists=True),
help='sup3r bias correction calculation config .json file.')
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config_file, verbose):
"""Run sup3r bias correction calculation from a config file."""
ctx.ensure_object(dict)
ctx.obj['VERBOSE'] = verbose
status_dir = os.path.dirname(os.path.abspath(config_file))
ctx.obj['OUT_DIR'] = status_dir
config = BaseConfig(config_file)
config['status_dir'] = status_dir
config_verbose = config.get('log_level', 'INFO')
config_verbose = (config_verbose == 'DEBUG')
verbose = any([verbose, config_verbose, ctx.obj['VERBOSE']])

init_mult('sup3r_bias_calc', os.path.join(status_dir, 'logs/'),
modules=[__name__, 'sup3r'], verbose=verbose)

exec_kwargs = config.get('execution_control', {})
log_pattern = config.get('log_pattern', None)
if log_pattern is not None:
os.makedirs(os.path.dirname(log_pattern), exist_ok=True)
if '.log' not in log_pattern:
log_pattern += '.log'
if '{node_index}' not in log_pattern:
log_pattern = log_pattern.replace('.log', '_{node_index}.log')

hardware_option = exec_kwargs.pop('option', 'local')
exec_kwargs['stdout_path'] = os.path.join(status_dir, 'stdout/')
logger.debug('Found execution kwargs: {}'.format(exec_kwargs))
logger.debug('Hardware run option: "{}"'.format(hardware_option))

calc_class_name = config['bias_calc_class']
BiasCalcClass = getattr(sup3r.bias.bias_calc, calc_class_name)

jobs = config['jobs']
for i_node, job in enumerate(jobs):
node_config = copy.deepcopy(job)
node_config['log_file'] = (
log_pattern if log_pattern is None
else os.path.normpath(log_pattern.format(node_index=i_node)))
name = ('sup3r_bias_{}_{}'.format(os.path.basename(status_dir),
str(i_node).zfill(6)))
ctx.obj['NAME'] = name
node_config['job_name'] = name

cmd = BiasCalcClass.get_node_cmd(job)

cmd_log = '\n\t'.join(cmd.split('\n'))
logger.debug(f'Running command:\n\t{cmd_log}')

if hardware_option.lower() in ('eagle', 'slurm'):
kickoff_slurm_job(ctx, cmd, **exec_kwargs)
else:
kickoff_local_job(ctx, cmd)


def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
feature=None, stdout_path='./stdout/'):
"""Run sup3r on HPC via SLURM job submission.

Parameters
----------
ctx : click.pass_context
Click context object where ctx.obj is a dictionary
cmd : str
Command to be submitted in SLURM shell script. Example:
'python -m sup3r.cli forward_pass -c <config_file>'
alloc : str
HPC project (allocation) handle. Example: 'sup3r'.
memory : int
Node memory request in GB.
walltime : float
Node walltime request in hours.
feature : str
Additional flags for SLURM job. Format is "--qos=high"
or "--depend=[state:job_id]". Default is None.
stdout_path : str
Path to print .stdout and .stderr files.
"""

name = ctx.obj['NAME']
out_dir = ctx.obj['OUT_DIR']
slurm_manager = ctx.obj.get('SLURM_MANAGER', None)
if slurm_manager is None:
slurm_manager = SLURM()
ctx.obj['SLURM_MANAGER'] = slurm_manager

status = Status.retrieve_job_status(out_dir,
module=ModuleName.SOLAR,
job_name=name,
hardware='slurm',
subprocess_manager=slurm_manager)

msg = 'sup3r bias calc CLI failed to submit jobs!'
if status == 'successful':
msg = ('Job "{}" is successful in status json found in "{}", '
'not re-running.'.format(name, out_dir))
elif 'fail' not in str(status).lower() and status is not None:
msg = ('Job "{}" was found with status "{}", not resubmitting'
.format(name, status))
else:
logger.info('Running sup3r bias calc on SLURM with node name "{}".'
.format(name))
out = slurm_manager.sbatch(cmd,
alloc=alloc,
memory=memory,
walltime=walltime,
feature=feature,
name=name,
stdout_path=stdout_path)[0]
if out:
msg = ('Kicked off sup3r bias calc job "{}" (SLURM jobid #{}).'
.format(name, out))

# add job to sup3r status file.
Status.add_job(out_dir, module=ModuleName.SOLAR,
job_name=name, replace=True,
job_attrs={'job_id': out, 'hardware': 'slurm'})

click.echo(msg)
logger.info(msg)


def kickoff_local_job(ctx, cmd):
"""Run sup3r bias calc locally.

Parameters
----------
ctx : click.pass_context
Click context object where ctx.obj is a dictionary
cmd : str
Command to be submitted in shell script. Example:
'python -m sup3r.cli forward_pass -c <config_file>'
"""

name = ctx.obj['NAME']
out_dir = ctx.obj['OUT_DIR']
subprocess_manager = SubprocessManager
status = Status.retrieve_job_status(out_dir,
module=ModuleName.SOLAR,
job_name=name)
msg = 'sup3r bias calc CLI failed to submit jobs!'
if status == 'successful':
msg = ('Job "{}" is successful in status json found in "{}", '
'not re-running.'.format(name, out_dir))
elif 'fail' not in str(status).lower() and status is not None:
msg = ('Job "{}" was found with status "{}", not resubmitting'
.format(name, status))
else:
logger.info('Running sup3r bias calc locally with job name "{}".'
.format(name))
Status.add_job(out_dir, module=ModuleName.SOLAR,
job_name=name, replace=True)
subprocess_manager.submit(cmd)
msg = ('Completed sup3r bias calc job "{}".'.format(name))

click.echo(msg)
logger.info(msg)


if __name__ == '__main__':
try:
main(obj={})
except Exception:
logger.exception('Error running sup3r bias calc CLI')
raise
1 change: 1 addition & 0 deletions sup3r/utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ModuleName(str, Enum):
QA = 'qa'
SOLAR = 'solar'
WIND_STATS = 'wind-stats'
BIAS_CALC = 'bias-calc'

@classmethod
def all_names(cls):
Expand Down