Skip to content

Commit

Permalink
Update wrapper for parallel processing to handle log files
Browse files Browse the repository at this point in the history
  • Loading branch information
wpreimes committed May 2, 2024
1 parent f6c2345 commit 999d597
Showing 1 changed file with 41 additions and 25 deletions.
66 changes: 41 additions & 25 deletions src/repurpose/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# os.environ['MKL_DYNAMIC'] = 'FALSE'
# os.environ['OPENBLAS_NUM_THREADS'] = '1'

import traceback
import numpy as np
from tqdm import tqdm
import logging
Expand Down Expand Up @@ -154,32 +155,37 @@ def print_progress(self):
self._pbar.n = self.n_completed_tasks
self._pbar.refresh()

def configure_worker_logger(log_queue, log_level):
worker_logger = logging.getLogger('worker')
def configure_worker_logger(log_queue, log_level, name):
worker_logger = logging.getLogger(name)
if not worker_logger.hasHandlers():
h = QueueHandler(log_queue)
worker_logger.addHandler(h)
worker_logger.setLevel(log_level)
return worker_logger

def run_with_error_handling(FUNC,
ignore_errors=False, log_queue=None, log_level="WARNING",
**kwargs) -> Any:
ignore_errors=False,
log_queue=None,
log_level="WARNING",
logger_name=None,
**kwargs) -> Any:

if log_queue is not None:
logger = configure_worker_logger(log_queue, log_level)
logger_name = logger.name
kwargs['logger_name'] = logger_name
logger = configure_worker_logger(log_queue, log_level, logger_name)
else:
logger = logging.getLogger()
# normal logger
logger = logging.getLogger(logger_name)

r = None

try:
r = FUNC(**kwargs)
except Exception as e:
if ignore_errors:
logger.error(f"Error: {e}")
logger.error(f"The following ERROR was raised in the parallelized "
f"function `{FUNC.__name__}` but was ignored due to "
f"the chosen settings: "
f"{traceback.format_exc()}")
else:
raise e
return r
Expand All @@ -195,6 +201,7 @@ def parallel_process_async(
log_path=None,
log_filename=None,
loglevel="WARNING",
logger_name=None,
verbose=False,
progress_bar_label="Processed",
backend="loky",
Expand Down Expand Up @@ -239,8 +246,15 @@ def parallel_process_async(
Name of the logfile in `log_path to create. If None is chosen, a name
is created automatically. If `log_path is None, this has no effect.
loglevel: str, optional (default: "WARNING")
Log level to use for logging. Must be one of
Which level should be logged. Must be one of
["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"].
logger_name: str, optional (default: None)
The name to assign to the logger that can be accessed in FUNC to
log to. If not given, then the root logger is used. e.g
```
logger = logging.getLogger(<logger_name>)
logger.error("Some error message")
```
verbose: bool, optional (default: False)
Print all logging messages to stdout, useful for debugging.
progress_bar_label: str, optional (default: "Processed")
Expand All @@ -258,16 +272,15 @@ def parallel_process_async(
values are found.
"""
if activate_logging:
logger = logging.getLogger()
logger = logging.getLogger(logger_name)
logger.setLevel(loglevel.upper())

if STATIC_KWARGS is None:
STATIC_KWARGS = dict()

if verbose:
# in this case we also print ALL log messages
streamHandler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.setLevel('DEBUG')
logger.addHandler(streamHandler)

Expand All @@ -280,14 +293,14 @@ def parallel_process_async(
log_file = None

if log_file:
# in this case the logger should write to file
os.makedirs(os.path.dirname(log_file), exist_ok=True)
logging.basicConfig(
filename=str(log_file),
level=loglevel.upper(),
format="%(levelname)s %(asctime)s %(message)s",
filehandler = logging.FileHandler(log_file)
filehandler.setFormatter(logging.Formatter(
"%(levelname)s %(asctime)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
force=True,
)
))
logger.addHandler(filehandler)
else:
logger = None

Expand Down Expand Up @@ -316,26 +329,28 @@ def parallel_process_async(
process_kwargs.append(kws)

if n_proc == 1:
logging.info("Processing metadata with {} process.".format(n_proc))
results = []
if show_progress_bars:
pbar = tqdm(total=len(process_kwargs), desc=progress_bar_label)
else:
pbar = None

for kwargs in process_kwargs:
r = run_with_error_handling(FUNC, ignore_errors, **kwargs)
r = run_with_error_handling(FUNC, ignore_errors,
logger_name=logger_name,
**kwargs)
if r is not None:
results.append(r)
if pbar is not None:
pbar.update()
else:
logging.info(f"Processing metadata with {n_proc} processes.")
if logger is not None:
log_level = logger.getEffectiveLevel()
m = Manager()
q = m.Queue()
listener = QueueListener(q, *logger.handlers)
listener = QueueListener(q, *logger.handlers,
respect_handler_level=True)
listener.start()
log_level = logger.getEffectiveLevel()
else:
q = None
log_level = None
Expand All @@ -355,6 +370,7 @@ def parallel_process_async(
FUNC, ignore_errors,
log_queue=q,
log_level=log_level,
logger_name=logger_name,
**kwargs)
for kwargs in process_kwargs)

Expand Down

0 comments on commit 999d597

Please sign in to comment.