diff --git a/sup3r/models/abstract.py b/sup3r/models/abstract.py index e54182de0..ff6dbbdaf 100644 --- a/sup3r/models/abstract.py +++ b/sup3r/models/abstract.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- -""" -Abstract class to define the required interface for Sup3r model subclasses -""" +"""Abstract class defining the required interface for Sup3r model subclasses""" import json import logging import os @@ -23,10 +21,83 @@ import sup3r.utilities.loss_metrics from sup3r.preprocessing.data_handling.exogenous_data_handling import ExoData from sup3r.utilities import VERSION_RECORD +from sup3r.utilities.utilities import Timer logger = logging.getLogger(__name__) +class TensorboardMixIn: + """MixIn class for tensorboard logging and profiling.""" + + def __init__(self): + self._tb_writer = None + self._tb_log_dir = None + self._write_tb_profile = False + self._total_batches = None + self._history = None + self.timer = Timer() + + @property + def total_batches(self): + """Record of total number of batches for logging.""" + if self._total_batches is None and self._history is None: + self._total_batches = 0 + elif self._history is None and 'total_batches' in self._history: + self._total_batches = self._history['total_batches'].values[-1] + elif self._total_batches is None and self._history is not None: + self._total_batches = 0 + return self._total_batches + + @total_batches.setter + def total_batches(self, value): + """Set total number of batches.""" + self._total_batches = value + + def dict_to_tensorboard(self, entry): + """Write data to tensorboard log file. This is usually a loss_details + dictionary. + + Parameters + ---------- + entry: dict + Dictionary of values to write to tensorboard log file + """ + if self._tb_writer is not None: + with self._tb_writer.as_default(): + for name, value in entry.items(): + if isinstance(value, str): + tf.summary.text(name, value, self.total_batches) + else: + tf.summary.scalar(name, value, self.total_batches) + + def profile_to_tensorboard(self, name): + """Write profile data to tensorboard log file. + + Parameters + ---------- + name : str + Tag name to use for profile info + """ + if self._tb_writer is not None and self._write_tb_profile: + with self._tb_writer.as_default(): + tf.summary.trace_export(name=name, step=self.total_batches, + profiler_outdir=self._tb_log_dir) + + def _init_tensorboard_writer(self, out_dir): + """Initialize the ``tf.summary.SummaryWriter`` to use for writing + tensorboard compatible log files. + + Parameters + ---------- + out_dir : str + Standard out_dir where model epochs are saved. e.g. './gan_{epoch}' + """ + tb_log_pardir = os.path.abspath(os.path.join(out_dir, os.pardir)) + self._tb_log_dir = os.path.join(tb_log_pardir, 'logs') + os.makedirs(self._tb_log_dir, exist_ok=True) + self._tb_writer = tf.summary.create_file_writer(self._tb_log_dir) + + class AbstractInterface(ABC): """ Abstract class to define the required interface for Sup3r model subclasses @@ -371,9 +442,8 @@ def hr_exo_features(self): # pylint: disable=E1101 features = [] if hasattr(self, '_gen'): - for layer in self._gen.layers: - if isinstance(layer, (Sup3rAdder, Sup3rConcat)): - features.append(layer.name) + features = [layer.name for layer in self._gen.layers + if isinstance(layer, (Sup3rAdder, Sup3rConcat))] return features @property @@ -465,13 +535,14 @@ def save_params(self, out_dir): # pylint: disable=E1101,W0201,E0203 -class AbstractSingleModel(ABC): +class AbstractSingleModel(ABC, TensorboardMixIn): """ Abstract class to define the required training interface for Sup3r model subclasses """ def __init__(self): + super().__init__() self.gpu_list = tf.config.list_physical_devices('GPU') self.default_device = '/cpu:0' self._version_record = VERSION_RECORD @@ -743,13 +814,13 @@ def init_optimizer(optimizer, learning_rate): """ if isinstance(optimizer, dict): class_name = optimizer['name'] - OptimizerClass = getattr(optimizers, class_name) - sig = signature(OptimizerClass) + optimizer_class = getattr(optimizers, class_name) + sig = signature(optimizer_class) optimizer_kwargs = { k: v for k, v in optimizer.items() if k in sig.parameters } - optimizer = OptimizerClass.from_config(optimizer_kwargs) + optimizer = optimizer_class.from_config(optimizer_kwargs) elif optimizer is None: optimizer = optimizers.Adam(learning_rate=learning_rate) @@ -915,10 +986,9 @@ def update_loss_details(loss_details, new_data, batch_len, prefix=None): prior_n_obs = loss_details['n_obs'] new_n_obs = prior_n_obs + batch_len - for key, new_value in new_data.items(): - key = key if prefix is None else prefix + key - new_value = (new_value if not isinstance(new_value, tf.Tensor) else - new_value.numpy()) + for k, v in new_data.items(): + key = k if prefix is None else prefix + k + new_value = (v if not isinstance(v, tf.Tensor) else v.numpy()) if key in loss_details: saved_value = loss_details[key] @@ -1061,9 +1131,7 @@ def finish_epoch(self, stop : bool Flag to early stop training. """ - self.log_loss_details(loss_details) - self._history.at[epoch, 'elapsed_time'] = time.time() - t0 for key, value in loss_details.items(): if key != 'n_obs': @@ -1135,12 +1203,12 @@ def run_gradient_descent(self, loss_details : dict Namespace of the breakdown of loss components """ - t0 = time.time() if optimizer is None: optimizer = self.optimizer if not multi_gpu or len(self.gpu_list) == 1: + grad, loss_details = self.get_single_grad(low_res, hi_res_true, training_weights, **calc_loss_kwargs) @@ -1148,7 +1216,6 @@ def run_gradient_descent(self, t1 = time.time() logger.debug(f'Finished single gradient descent step ' f'in {(t1 - t0):.3f}s') - else: futures = [] lr_chunks = np.array_split(low_res, len(self.gpu_list)) @@ -1178,7 +1245,6 @@ def run_gradient_descent(self, t1 = time.time() logger.debug(f'Finished {len(futures)} gradient descent steps on ' f'{len(self.gpu_list)} GPUs in {(t1 - t0):.3f}s') - return loss_details def _reshape_norm_exo(self, hi_res, hi_res_exo, exo_name, norm_in=True): @@ -1283,8 +1349,10 @@ def generate(self, low_res = self.norm_input(low_res) hi_res = self.generator.layers[0](low_res) - for i, layer in enumerate(self.generator.layers[1:]): - try: + layer_num = 1 + try: + for i, layer in enumerate(self.generator.layers[1:]): + layer_num = i + 1 if isinstance(layer, (Sup3rAdder, Sup3rConcat)): msg = (f'layer.name = {layer.name} does not match any ' 'features in exogenous_data ' @@ -1299,11 +1367,11 @@ def generate(self, hi_res = layer(hi_res, hi_res_exo) else: hi_res = layer(hi_res) - except Exception as e: - msg = ('Could not run layer #{} "{}" on tensor of shape {}'. - format(i + 1, layer, hi_res.shape)) - logger.error(msg) - raise RuntimeError(msg) from e + except Exception as e: + msg = ('Could not run layer #{} "{}" on tensor of shape {}'. + format(layer_num, layer, hi_res.shape)) + logger.error(msg) + raise RuntimeError(msg) from e hi_res = hi_res.numpy() @@ -1341,8 +1409,10 @@ def _tf_generate(self, low_res, hi_res_exo=None): Synthetically generated high-resolution data """ hi_res = self.generator.layers[0](low_res) - for i, layer in enumerate(self.generator.layers[1:]): - try: + layer_num = 1 + try: + for i, layer in enumerate(self.generator.layers[1:]): + layer_num = i + 1 if isinstance(layer, (Sup3rAdder, Sup3rConcat)): msg = (f'layer.name = {layer.name} does not match any ' f'features in exogenous_data ({list(hi_res_exo)})') @@ -1351,11 +1421,11 @@ def _tf_generate(self, low_res, hi_res_exo=None): hi_res = layer(hi_res, hr_exo) else: hi_res = layer(hi_res) - except Exception as e: - msg = ('Could not run layer #{} "{}" on tensor of shape {}'. - format(i + 1, layer, hi_res.shape)) - logger.error(msg) - raise RuntimeError(msg) from e + except Exception as e: + msg = ('Could not run layer #{} "{}" on tensor of shape {}'. + format(layer_num, layer, hi_res.shape)) + logger.error(msg) + raise RuntimeError(msg) from e return hi_res @@ -1398,16 +1468,13 @@ def get_single_grad(self, loss_details : dict Namespace of the breakdown of loss components """ - with tf.device(device_name): - with tf.GradientTape(watch_accessed_variables=False) as tape: - tape.watch(training_weights) - - hi_res_exo = self.get_high_res_exo_input(hi_res_true) - hi_res_gen = self._tf_generate(low_res, hi_res_exo) - loss_out = self.calc_loss(hi_res_true, hi_res_gen, - **calc_loss_kwargs) - loss, loss_details = loss_out - - grad = tape.gradient(loss, training_weights) - + with tf.device(device_name), tf.GradientTape( + watch_accessed_variables=False) as tape: + self.timer(tape.watch, training_weights) + hi_res_exo = self.timer(self.get_high_res_exo_input, hi_res_true) + hi_res_gen = self.timer(self._tf_generate, low_res, hi_res_exo) + loss_out = self.timer(self.calc_loss, hi_res_true, hi_res_gen, + **calc_loss_kwargs) + loss, loss_details = loss_out + grad = self.timer(tape.gradient, loss, training_weights) return grad, loss_details diff --git a/sup3r/models/base.py b/sup3r/models/base.py index d055c582f..2988e3314 100644 --- a/sup3r/models/base.py +++ b/sup3r/models/base.py @@ -233,14 +233,16 @@ def discriminate(self, hi_res, norm_in=False): hi_res = (hi_res - mean_arr) / std_arr out = self.discriminator.layers[0](hi_res) - for i, layer in enumerate(self.discriminator.layers[1:]): - try: + layer_num = 1 + try: + for i, layer in enumerate(self.discriminator.layers[1:]): out = layer(out) - except Exception as e: - msg = ('Could not run layer #{} "{}" on tensor of shape {}'. - format(i + 1, layer, out.shape)) - logger.error(msg) - raise RuntimeError(msg) from e + layer_num = i + 1 + except Exception as e: + msg = ('Could not run layer #{} "{}" on tensor of shape {}'. + format(layer_num, layer, out.shape)) + logger.error(msg) + raise RuntimeError(msg) from e out = out.numpy() @@ -263,16 +265,17 @@ def _tf_discriminate(self, hi_res): out : np.ndarray Discriminator output logits """ - out = self.discriminator.layers[0](hi_res) - for i, layer in enumerate(self.discriminator.layers[1:]): - try: + layer_num = 1 + try: + for i, layer in enumerate(self.discriminator.layers[1:]): + layer_num = i + 1 out = layer(out) - except Exception as e: - msg = ('Could not run layer #{} "{}" on tensor of shape {}'. - format(i + 1, layer, out.shape)) - logger.error(msg) - raise RuntimeError(msg) from e + except Exception as e: + msg = ('Could not run layer #{} "{}" on tensor of shape {}'. + format(layer_num, layer, out.shape)) + logger.error(msg) + raise RuntimeError(msg) from e return out @@ -302,14 +305,14 @@ def update_optimizer(self, option='generator', **kwargs): if 'gen' in option.lower() or 'all' in option.lower(): conf = self.get_optimizer_config(self.optimizer) conf.update(**kwargs) - OptimizerClass = getattr(optimizers, conf['name']) - self._optimizer = OptimizerClass.from_config(conf) + optimizer_class = getattr(optimizers, conf['name']) + self._optimizer = optimizer_class.from_config(conf) if 'disc' in option.lower() or 'all' in option.lower(): conf = self.get_optimizer_config(self.optimizer_disc) conf.update(**kwargs) - OptimizerClass = getattr(optimizers, conf['name']) - self._optimizer_disc = OptimizerClass.from_config(conf) + optimizer_class = getattr(optimizers, conf['name']) + self._optimizer_disc = optimizer_class.from_config(conf) @property def meta(self): @@ -669,6 +672,8 @@ def train_epoch(self, only_gen = train_gen and not train_disc only_disc = train_disc and not train_gen + if self._write_tb_profile: + tf.summary.trace_on(graph=True, profiler=True) for ib, batch in enumerate(batch_handler): trained_gen = False trained_disc = False @@ -707,26 +712,29 @@ def train_epoch(self, b_loss_details['gen_trained_frac'] = float(trained_gen) b_loss_details['disc_trained_frac'] = float(trained_disc) + self.dict_to_tensorboard(b_loss_details) + self.dict_to_tensorboard(self.timer.log) loss_details = self.update_loss_details(loss_details, b_loss_details, len(batch), prefix='train_') - logger.debug('Batch {} out of {} has epoch-average ' '(gen / disc) loss of: ({:.2e} / {:.2e}). ' 'Trained (gen / disc): ({} / {})'.format( - ib, len(batch_handler), + ib + 1, len(batch_handler), loss_details['train_loss_gen'], loss_details['train_loss_disc'], trained_gen, trained_disc)) - if all([not trained_gen, not trained_disc]): msg = ('For some reason none of the GAN networks trained ' 'during batch {} out of {}!'.format( ib, len(batch_handler))) logger.warning(msg) warn(msg) + self.total_batches += 1 + loss_details['total_batches'] = int(self.total_batches) + self.profile_to_tensorboard('training_epoch') return loss_details def update_adversarial_weights(self, history, adaptive_update_fraction, @@ -794,7 +802,9 @@ def train(self, early_stop_n_epoch=5, adaptive_update_bounds=(0.9, 0.99), adaptive_update_fraction=0.0, - multi_gpu=False): + multi_gpu=False, + tensorboard_log=True, + tensorboard_profile=False): """Train the GAN model on real low res data and real high res data Parameters @@ -856,7 +866,19 @@ def train(self, constitute a single gradient descent step with the nominal learning rate that the model was initialized with. If true and multiple gpus are found, default_device device should be set to /gpu:0 + tensorboard_log : bool + Whether to write log file for use with tensorboard. Log data can + be viewed with ``tensorboard --logdir `` where ```` + is the parent directory of ``out_dir``, and pointing the browser to + the printed address. + tensorboard_profile : bool + Whether to export profiling information to tensorboard. This can + then be viewed in the tensorboard dashboard under the profile tab """ + if tensorboard_log: + self._init_tensorboard_writer(out_dir) + if tensorboard_profile: + self._write_tb_profile = True self.set_norm_stats(batch_handler.means, batch_handler.stds) self.set_model_params( @@ -889,9 +911,11 @@ def train(self, train_disc, disc_loss_bounds, multi_gpu=multi_gpu) - - loss_details = self.calc_val_loss(batch_handler, weight_gen_advers, + train_n_obs = loss_details['n_obs'] + loss_details = self.calc_val_loss(batch_handler, + weight_gen_advers, loss_details) + val_n_obs = loss_details['n_obs'] msg = f'Epoch {epoch} of {epochs[-1]} ' msg += 'gen/disc train loss: {:.2e}/{:.2e} '.format( @@ -906,11 +930,14 @@ def train(self, logger.info(msg) - lr_g = self.get_optimizer_config(self.optimizer)['learning_rate'] + lr_g = self.get_optimizer_config( + self.optimizer)['learning_rate'] lr_d = self.get_optimizer_config( self.optimizer_disc)['learning_rate'] extras = { + 'train_n_obs': train_n_obs, + 'val_n_obs': val_n_obs, 'weight_gen_advers': weight_gen_advers, 'disc_loss_bound_0': disc_loss_bounds[0], 'disc_loss_bound_1': disc_loss_bounds[1], @@ -919,8 +946,8 @@ def train(self, } weight_gen_advers = self.update_adversarial_weights( - loss_details, adaptive_update_fraction, adaptive_update_bounds, - weight_gen_advers, train_disc) + loss_details, adaptive_update_fraction, + adaptive_update_bounds, weight_gen_advers, train_disc) stop = self.finish_epoch(epoch, epochs, diff --git a/sup3r/models/conditional_moments.py b/sup3r/models/conditional_moments.py index ec7559e6d..ccd41c07c 100644 --- a/sup3r/models/conditional_moments.py +++ b/sup3r/models/conditional_moments.py @@ -347,7 +347,8 @@ def train(self, batch_handler, early_stop_on=None, early_stop_threshold=0.005, early_stop_n_epoch=5, - multi_gpu=False): + multi_gpu=False, + tensorboard_log=True): """Train the model on real low res data and real high res data Parameters @@ -388,7 +389,15 @@ def train(self, batch_handler, between the GPUs and the resulting gradient from each GPU will constitute a single gradient descent step with the nominal learning rate that the model was initialized with. + tensorboard_log : bool + Whether to write log file for use with tensorboard. Log data can + be viewed with ``tensorboard --logdir `` where ```` + is the parent directory of ``out_dir``, and pointing the browser to + the printed address. """ + if tensorboard_log: + self._init_tensorboard_writer(out_dir) + self.set_norm_stats(batch_handler.means, batch_handler.stds) self.set_model_params( input_resolution=input_resolution, diff --git a/sup3r/utilities/utilities.py b/sup3r/utilities/utilities.py index 1e4c7b4a3..b0ac20a62 100644 --- a/sup3r/utilities/utilities.py +++ b/sup3r/utilities/utilities.py @@ -1,8 +1,6 @@ # -*- coding: utf-8 -*- -"""Utilities module for preparing training data - -@author: bbenton -""" +"""Miscellaneous utilities for computing features, preparing training data, +timing functions, etc """ import glob import logging @@ -10,6 +8,7 @@ import random import re import string +import time from fnmatch import fnmatch from warnings import warn @@ -20,14 +19,42 @@ from packaging import version from scipy import ndimage as nd from scipy.interpolate import RegularGridInterpolator, interp1d -from scipy.ndimage import zoom -from scipy.ndimage import gaussian_filter +from scipy.ndimage import gaussian_filter, zoom np.random.seed(42) logger = logging.getLogger(__name__) +class Timer: + """Timer class for timing and storing function call times.""" + + def __init__(self): + self.log = {} + + def __call__(self, fun, *args, **kwargs): + """Time function call and store elapsed time in self.log. + + Parameters + ---------- + fun : function + Function to time + *args : list + positional arguments for fun + **kwargs : dict + keyword arguments for fun + + Returns + ------- + output of fun + """ + t0 = time.time() + out = fun(*args, **kwargs) + t_elap = time.time() - t0 + self.log[f'elapsed:{fun.__name__}'] = t_elap + return out + + def generate_random_string(length): """Generate random string with given length. Used for naming temporary files to avoid collisions.""" diff --git a/tests/training/test_train_gan_lr_era.py b/tests/training/test_train_gan_lr_era.py index 52c1974ff..1957254c5 100644 --- a/tests/training/test_train_gan_lr_era.py +++ b/tests/training/test_train_gan_lr_era.py @@ -163,7 +163,7 @@ def test_train_st(n_epoch=3, log=False): s_enhance=3, t_enhance=4, n_batches=5, - worker_kwargs=dict(max_workers=1), + worker_kwargs={'max_workers': 1}, ) assert batch_handler.norm_workers == 1