Skip to content

Commit

Permalink
Merge pull request #29 from TUW-GEO/fix_parallel
Browse files Browse the repository at this point in the history
Fix issues with parallel processing
  • Loading branch information
wpreimes committed Jun 7, 2024
2 parents 308cc89 + fc4b8a3 commit 5a27031
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 53 deletions.
129 changes: 91 additions & 38 deletions docs/examples/ts2img.ipynb

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions src/repurpose/ts2img.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ def _write_img(
if annual_folder:
out_path = os.path.join(out_path, f"{dt.year:04}")

if not os.path.exists(out_path):
os.makedirs(out_path)
os.makedirs(out_path, exist_ok=True)

image.attrs['date_created'] = f"File created: {datetime.now()}"
image.to_netcdf(os.path.join(out_path, filename),
Expand Down Expand Up @@ -242,7 +241,7 @@ def _calc_chunk(self, timestamps, preprocess_func=None, preprocess_kwargs=None,

stack = parallel_process_async(
_convert, ITER_KWARGS, STATIC_KWARGS, n_proc=n_proc,
show_progress_bars=True, log_path=log_path,
show_progress_bars=True, log_path=log_path, backend='threading',
verbose=False, ignore_errors=self.ignore_errors)

stack = xr.combine_by_coords(stack)
Expand Down
23 changes: 11 additions & 12 deletions tests/test_ts2img.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,16 @@ def postprocess_func(stack, vars, fillvalue=0):
variables={'var0': 'var1', 'var2': 'var2'})

with tempfile.TemporaryDirectory() as path_out:
with pytest.warns(UserWarning): # expected warning about empty stack
converter.calc(
path_out, format_out='slice',
fn_template="test_{datetime}.nc", drop_empty=True,
preprocess=preprocess_func, preprocess_kwargs={'mult': 2},
postprocess=postprocess_func, postprocess_kwargs={'vars': ('var2',)},
encoding={'var1': {'dtype': 'int64', 'scale_factor': 0.0000001,
'_FillValue': -9999}, },
var_attrs={'var1': {'long_name': 'test_var1', 'units': 'm'}},
glob_attrs={'test': 'test'}, var_fillvalues={'var2': -9999},
var_dtypes={'var2': 'int32'}, n_proc=1)
converter.calc(
path_out, format_out='slice',
fn_template="test_{datetime}.nc", drop_empty=True,
preprocess=preprocess_func, preprocess_kwargs={'mult': 2},
postprocess=postprocess_func, postprocess_kwargs={'vars': ('var2',)},
encoding={'var1': {'dtype': 'int64', 'scale_factor': 0.0000001,
'_FillValue': -9999}, },
var_attrs={'var1': {'long_name': 'test_var1', 'units': 'm'}},
glob_attrs={'test': 'test'}, var_fillvalues={'var2': -9999},
var_dtypes={'var2': 'int32'}, n_proc=2)

assert len(os.listdir(os.path.join(path_out, '2020'))) == 28
assert os.path.isfile(
Expand Down Expand Up @@ -208,7 +207,7 @@ def postprocess_func(stack, **kwargs):
'var2': {'long_name': 'test_var2', 'units': 'm'}},
glob_attrs={'test': 'test2'},
var_fillvalues={'var2': -9999},
var_dtypes={'var2': 'int32'}, n_proc=1)
var_dtypes={'var2': 'int32'}, n_proc=2)

# all 10 files must exist, first two emtpy
assert len(os.listdir(os.path.join(path_out, '2020'))) == 10
Expand Down

0 comments on commit 5a27031

Please sign in to comment.