Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gb/solar module #92

Merged
merged 23 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3967370
started test setup for solar module
grantbuster Sep 13, 2022
14a2dd6
added two weeks of cleasky data for nsrdb solar tests
grantbuster Sep 13, 2022
624224c
added farms requirement for solar module
grantbuster Sep 13, 2022
d46f7b1
added basic solar module that will take clearsky ratio GAN outputs an…
grantbuster Sep 13, 2022
7b1ad5e
added solar irrad output handling
grantbuster Sep 13, 2022
69e1431
more tests
grantbuster Sep 13, 2022
d0b57e0
added get_node_cmd for solar module
grantbuster Sep 13, 2022
15d06ca
added solar file parser with test
grantbuster Sep 13, 2022
3569890
added cli
grantbuster Sep 13, 2022
62ff8aa
finished solar cli with tests
grantbuster Sep 13, 2022
88b2597
fixed up tests
grantbuster Sep 13, 2022
d9b96fe
added solar cli to the main cli and pipeline
grantbuster Sep 13, 2022
b23230e
moved i_t_chunks to optional arg and enabled the solar class method e…
grantbuster Sep 13, 2022
d306db3
fixed node count logic
grantbuster Sep 13, 2022
5e92b11
fixed a stupid assumption that the nsrdb time index would always have…
grantbuster Sep 13, 2022
d973a75
changed i_t_chunk arg -> temporal_id which makes a lot more sense
grantbuster Sep 14, 2022
d785fc1
logging updates
grantbuster Sep 14, 2022
883a77f
linter fixes
grantbuster Sep 14, 2022
2fb8630
added new rex version requirement for sup3r solar multitimeresource i…
grantbuster Sep 14, 2022
0476f26
added the option for the user to specify qa datsets with mapping from…
grantbuster Sep 14, 2022
b99c42a
fixed qa features as list
grantbuster Sep 14, 2022
2833f0f
CC-based clearsky ratio should be set with ceiling not scaled to the …
grantbuster Sep 15, 2022
ca33492
moved solar test utilities
grantbuster Sep 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added cli
  • Loading branch information
grantbuster committed Sep 15, 2022
commit 356989042496edf287b520e51df94242a031fc27
110 changes: 93 additions & 17 deletions sup3r/solar/solar.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,20 +344,18 @@ def get_nsrdb_data(self, dset):
return out

@staticmethod
def get_sup3r_fps(config):
"""Get a list of file chunks to run in parallel based on a solar config
with key "fp_pattern".
def get_sup3r_fps(fp_pattern):
"""Get a list of file chunks to run in parallel based on a file pattern

NOTE: it's assumed that all source files have the pattern
sup3r_file_TTTTTT_SSSSSS.h5 where TTTTTT is the zero-padded temporal
chunk index and SSSSSS is the zero-padded spatial chunk index.

Parameters
----------
config : dict
Solar config with args to initialize Solar class and to run
Solar.write(), must have "fp_pattern" that returns
spatiotemporally chunked sup3r forward pass output files.
fp_pattern : str
Unix-style file*pattern that matches a set of spatiotemporally
chunked sup3r forward pass output files.

Returns
-------
Expand All @@ -371,10 +369,12 @@ def get_sup3r_fps(config):
List of t_slice arguments corresponding to fp_sets to pass to
Solar class initialization that will slice and reduce the
overlapping time axis when Solar outputs irradiance data.
temporal_ids : list
List of temporal id strings TTTTTT corresponding to the fp_sets
spatial_ids : list
List of spatial id strings SSSSSS corresponding to the fp_sets
"""

assert 'fp_pattern' in config, 'Need fp_pattern in config!'
fp_pattern = config['fp_pattern']
all_fps = [fp for fp in glob.glob(fp_pattern) if fp.endswith('.h5')]
all_fps = sorted(all_fps)

Expand All @@ -392,6 +392,8 @@ def get_sup3r_fps(config):

fp_sets = []
t_slices = []
temporal_ids = []
spatial_ids = []
for idt, id_temporal in enumerate(all_id_temporal):
start = 0
single_chunk_id_temps = [id_temporal]
Expand All @@ -412,27 +414,28 @@ def get_sup3r_fps(config):

fp_sets.append(single_fp_set)
t_slices.append(slice(start, start + 24))
temporal_ids.append(id_temporal)
spatial_ids.append(id_spatial)

return fp_sets, t_slices
return fp_sets, t_slices, temporal_ids, spatial_ids

@classmethod
def get_node_cmd(cls, config):
"""Get a CLI call to initialize Solar and run Solar.write() on a single
"""Get a CLI call to run Solar.run_temporal_chunk() on a single
node based on an input config.

Parameters
----------
config : dict
sup3r solar config with all necessary args and kwargs to initialize
Solar and run Solar.write() on a single node.
sup3r solar config with all necessary args and kwargs to
run Solar.run_temporal_chunk() on a single node.
"""
import_str = 'import time;\n'
import_str += 'from reV.pipeline.status import Status;\n'
import_str += 'from rex import init_logger;\n'
import_str += 'from sup3r.solar import {cls.__name__};\n'

init_str = get_fun_call_str(cls, config)
write_args = get_arg_str(cls.write, config)
fun_str = get_fun_call_str(cls.run_temporal_chunk, config)

log_file = config.get('log_file', None)
log_level = config.get('log_level', 'INFO')
Expand All @@ -443,8 +446,7 @@ def get_node_cmd(cls, config):
cmd = (f"python -c \'{import_str}\n"
"t0 = time.time();\n"
f"logger = init_logger({log_arg_str});\n"
f"with {init_str} as solar;\n"
f"\tsolar.write({write_args});\n"
f"{fun_str};\n"
"t_elap = time.time() - t0;\n")

job_name = config.get('job_name', None)
Expand Down Expand Up @@ -475,6 +477,9 @@ def write(self, fp_out, features=('ghi', 'dni', 'dhi')):
fp_out : str
Filepath to an output h5 file to write irradiance variables to.
Parent directory will be created if it does not exist.
features : list | tuple
List of features to write to disk. These have to be attributes of
the Solar class (ghi, dni, dhi).
"""

if not os.path.exists(os.path.dirname(fp_out)):
Expand Down Expand Up @@ -504,3 +509,74 @@ def write(self, fp_out, features=('ghi', 'dni', 'dhi')):
fh.run_attrs = run_attrs

logger.info(f'Finished writing file: {fp_out}')

@classmethod
def run_temporal_chunk(cls, i_t_chunk, fp_pattern, nsrdb_fp, fp_out,
tz=-6, agg_factor=1,
nn_threshold=0.5, cloud_threshold=0.99,
features=('ghi', 'dni', 'dhi')):
"""Run the solar module on all spatial chunks for a single temporal
chunk corresponding to the fp_pattern. This typically gets run from the
CLI.

Parameters
----------
i_t_chunk : int
Index of the sorted list of unique temporal indices to run (parsed
from the files matching fp_pattern). This input typically gets set
from the CLI.
fp_pattern : str
Unix-style file*pattern that matches a set of spatiotemporally
chunked sup3r forward pass output files.
nsrdb_fp : str
Filepath to NSRDB .h5 file containing clearsky_ghi, clearsky_dni,
clearsky_dhi data.
fp_out : str
Filepath to an output h5 file to write irradiance variables to.
Parent directory will be created if it does not exist.
t_slice : slice
Slicing argument to slice the temporal axis of the sup3r_fps source
data after doing the tz roll to UTC but before returning the
irradiance variables. This can be used to effectively pad the solar
irradiance calculation in UTC time. For example, if sup3r_fps is 3
files each with 24 hours of data, t_slice can be slice(24, 48) to
only output the middle day of irradiance data, but padded by the
other two days for the UTC output.
tz : int
The timezone offset for the data in sup3r_fps. It is assumed that
the GAN is trained on data in local time and therefore the output
in sup3r_fps should be treated as local time. For example, -6 is
CST which is default for CONUS training data.
agg_factor : int
Spatial aggregation factor for nsrdb-to-GAN-meta e.g. the number of
NSRDB spatial pixels to average for a single sup3r GAN output site.
nn_threshold : float
The KDTree nearest neighbor threshold that determines how far the
sup3r GAN output data has to be from the NSRDB source data to get
irradiance=0. Note that is value is in decimal degrees which is a
very approximate way to determine real distance.
cloud_threshold : float
Clearsky ratio threshold below which the data is considered cloudy
and DNI is calculated using DISC.
features : list | tuple
List of features to write to disk. These have to be attributes of
the Solar class (ghi, dni, dhi).
"""
temp = cls.get_sup3r_fps(fp_pattern)
fp_sets, t_slices, temporal_ids, spatial_ids = temp

i_fp_sets = [fp_set for i, fp_set in enumerate(fp_sets)
if temporal_ids[i] == temporal_ids[i_t_chunk]]
i_t_slices = [t_slice for i, t_slice in enumerate(t_slices)
if temporal_ids[i] == temporal_ids[i_t_chunk]]

for i, (fp_set, t_slice) in enumerate(zip(i_fp_sets, i_t_slices)):
logger.info('Running temporal index {} out of {}.'
.format(i + 1, len(i_fp_sets)))
kwargs = dict(t_slice=t_slice,
tz=tz,
agg_factor=agg_factor,
nn_threshold=nn_threshold,
cloud_threshold=cloud_threshold)
with Solar(fp_set, nsrdb_fp, **kwargs) as solar:
solar.write(fp_out, features=features)
210 changes: 210 additions & 0 deletions sup3r/solar/solar_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# -*- coding: utf-8 -*-
"""
sup3r solar CLI entry points.
"""
import copy
import click
import logging
from inspect import signature
import os

from reV.pipeline.status import Status

from rex.utilities.execution import SubprocessManager
from rex.utilities.hpc import SLURM
from rex.utilities.loggers import init_mult

from sup3r.solar import Solar
from sup3r.utilities import ModuleName
from sup3r.version import __version__
from sup3r.pipeline.config import BaseConfig


logger = logging.getLogger(__name__)


@click.group()
@click.version_option(version=__version__)
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def main(ctx, verbose):
"""Sup3r Solar Command Line Interface"""
ctx.ensure_object(dict)
ctx.obj['VERBOSE'] = verbose


@main.command()
@click.option('--config_file', '-c', required=True,
type=click.Path(exists=True),
help='sup3r solar configuration .json file.')
@click.option('-v', '--verbose', is_flag=True,
help='Flag to turn on debug logging. Default is not verbose.')
@click.pass_context
def from_config(ctx, config_file, verbose):
"""Run sup3r solar from a config file."""
ctx.ensure_object(dict)
ctx.obj['VERBOSE'] = verbose
status_dir = os.path.dirname(os.path.abspath(config_file))
ctx.obj['OUT_DIR'] = status_dir
config = BaseConfig(config_file)
config['status_dir'] = status_dir
config_verbose = config.get('log_level', 'INFO')
config_verbose = (config_verbose == 'DEBUG')
verbose = any([verbose, config_verbose, ctx.obj['VERBOSE']])

init_mult('sup3r_solar', os.path.join(status_dir, 'logs/'),
modules=[__name__, 'sup3r'], verbose=verbose)

exec_kwargs = config.get('execution_control', {})
log_pattern = config.get('log_pattern', None)
if log_pattern is not None:
os.makedirs(os.path.dirname(log_pattern), exist_ok=True)
if '.log' not in log_pattern:
log_pattern += '.log'
if '{node_index}' not in log_pattern:
log_pattern = log_pattern.replace('.log', '_{node_index}.log')

hardware_option = exec_kwargs.pop('option', 'local')
exec_kwargs['stdout_path'] = os.path.join(status_dir, 'stdout/')
logger.debug('Found execution kwargs: {}'.format(exec_kwargs))
logger.debug('Hardware run option: "{}"'.format(hardware_option))

fp_pattern = config['fp_pattern']
fp_sets, _, temporal_ids, _ = Solar.get_sup3r_fps(fp_pattern)
logger.info('Solar module found {} sets of chunked source files to run '
'on. Submitting to {} nodes based on the number of temporal '
'chunks'.format(len(fp_sets), len(set(temporal_ids))))

for i_node, _ in enumerate(temporal_ids):
node_config = copy.deepcopy(config)
node_config['log_file'] = (
log_pattern if log_pattern is None
else os.path.normpath(log_pattern.format(node_index=i_node)))
name = ('sup3r_solar_{}_{}'.format(os.path.basename(status_dir),
str(i_node).zfill(6)))
ctx.obj['NAME'] = name
node_config['job_name'] = name

node_config['i_t_chunk'] = i_node
cmd = Solar.get_node_cmd(node_config)

cmd_log = '\n\t'.join(cmd.split('\n'))
logger.debug(f'Running command:\n\t{cmd_log}')

if hardware_option.lower() in ('eagle', 'slurm'):
kickoff_slurm_job(ctx, cmd, **exec_kwargs)
else:
kickoff_local_job(ctx, cmd)


def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4,
feature=None, stdout_path='./stdout/'):
"""Run sup3r on HPC via SLURM job submission.

Parameters
----------
ctx : click.pass_context
Click context object where ctx.obj is a dictionary
cmd : str
Command to be submitted in SLURM shell script. Example:
'python -m sup3r.cli forward_pass -c <config_file>'
alloc : str
HPC project (allocation) handle. Example: 'sup3r'.
memory : int
Node memory request in GB.
walltime : float
Node walltime request in hours.
feature : str
Additional flags for SLURM job. Format is "--qos=high"
or "--depend=[state:job_id]". Default is None.
stdout_path : str
Path to print .stdout and .stderr files.
"""

name = ctx.obj['NAME']
out_dir = ctx.obj['OUT_DIR']
slurm_manager = ctx.obj.get('SLURM_MANAGER', None)
if slurm_manager is None:
slurm_manager = SLURM()
ctx.obj['SLURM_MANAGER'] = slurm_manager

status = Status.retrieve_job_status(out_dir,
module=ModuleName.SOLAR,
job_name=name,
hardware='slurm',
subprocess_manager=slurm_manager)

msg = 'sup3r solar CLI failed to submit jobs!'
if status == 'successful':
msg = ('Job "{}" is successful in status json found in "{}", '
'not re-running.'.format(name, out_dir))
elif 'fail' not in str(status).lower() and status is not None:
msg = ('Job "{}" was found with status "{}", not resubmitting'
.format(name, status))
else:
logger.info('Running sup3r solar on SLURM with node name "{}".'
.format(name))
out = slurm_manager.sbatch(cmd,
alloc=alloc,
memory=memory,
walltime=walltime,
feature=feature,
name=name,
stdout_path=stdout_path)[0]
if out:
msg = ('Kicked off sup3r solar job "{}" (SLURM jobid #{}).'
.format(name, out))

# add job to sup3r status file.
Status.add_job(out_dir, module=ModuleName.SOLAR,
job_name=name, replace=True,
job_attrs={'job_id': out, 'hardware': 'slurm'})

click.echo(msg)
logger.info(msg)


def kickoff_local_job(ctx, cmd):
"""Run sup3r solar locally.

Parameters
----------
ctx : click.pass_context
Click context object where ctx.obj is a dictionary
cmd : str
Command to be submitted in shell script. Example:
'python -m sup3r.cli forward_pass -c <config_file>'
"""

name = ctx.obj['NAME']
out_dir = ctx.obj['OUT_DIR']
subprocess_manager = SubprocessManager
status = Status.retrieve_job_status(out_dir,
module=ModuleName.SOLAR,
job_name=name)
msg = 'sup3r solar CLI failed to submit jobs!'
if status == 'successful':
msg = ('Job "{}" is successful in status json found in "{}", '
'not re-running.'.format(name, out_dir))
elif 'fail' not in str(status).lower() and status is not None:
msg = ('Job "{}" was found with status "{}", not resubmitting'
.format(name, status))
else:
logger.info('Running sup3r solar locally with job name "{}".'
.format(name))
Status.add_job(out_dir, module=ModuleName.SOLAR,
job_name=name, replace=True)
subprocess_manager.submit(cmd)
msg = ('Completed sup3r solar job "{}".'.format(name))

click.echo(msg)
logger.info(msg)


if __name__ == '__main__':
try:
main(obj={})
except Exception:
logger.exception('Error running sup3r solar CLI')
raise