Skip to content

Commit

Permalink
Merge pull request #27 from wpreimes/smosl2
Browse files Browse the repository at this point in the history
Updates to repurpose (img2ts) for performant conversion of SMOS L2 swath data
  • Loading branch information
wpreimes committed May 25, 2024
2 parents 36eed1b + 53f4ace commit 308cc89
Show file tree
Hide file tree
Showing 7 changed files with 572 additions and 296 deletions.
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies:
- pip:
- pygeogrids
- pynetcf>=0.5.0
- cadati
- more_itertools
- smecv_grid
- tqdm
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ install_requires =
tqdm
more_itertools
joblib
cadati
# The usage of test_requires is discouraged, see `Dependency Management` docs
# tests_require = pytest; pytest-cov
# Require a specific Python version, e.g. Python 2.7 or >= 3.4
Expand Down
630 changes: 398 additions & 232 deletions src/repurpose/img2ts.py

Large diffs are not rendered by default.

29 changes: 23 additions & 6 deletions src/repurpose/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class ImageBaseConnection:
This protects against processing gaps due to e.g. temporary network issues.
"""

def __init__(self, reader, max_retries=99, retry_delay_s=1):
def __init__(self, reader, max_retries=99, retry_delay_s=1,
attr_read='read', attr_path='path', attr_grid='grid'):
"""
Parameters
----------
Expand All @@ -44,25 +44,39 @@ def __init__(self, reader, max_retries=99, retry_delay_s=1):
fails.
retry_delay_s: int, optional (default: 1)
Number of seconds to wait after each failed retry.
attr_read: str, optional (default: 'read')
Name of method to call to read an image. Will add a method of
the same name to this wrapper.
attr_path: str, optional (default: 'path')
Name of the reader attribute to access the data path
attr_grid: str, optional (default: 'grid')
Name of the reader attribute to access the grid definition
"""
self.reader = reader
self.max_retries = max_retries
self.retry_delay_s = retry_delay_s

self.attr_read = attr_read
self.attr_path = attr_path
self.attr_grid = attr_grid

self.filelist = self._gen_filelist()

setattr(self, self.attr_read, self._read) # map read method to ._read

@property
def grid(self):
return self.reader.grid
return getattr(self.reader, self.attr_grid)

def tstamps_for_daterange(self, *args, **kwargs):
return self.reader.tstamps_for_daterange(*args, **kwargs)

def _gen_filelist(self) -> list:
flist = glob(os.path.join(self.reader.path, '**'), recursive=True)
path = getattr(self.reader, self.attr_path)
flist = glob(os.path.join(path, '**'), recursive=True)
return flist

def read(self, timestamp, **kwargs):
def _read(self, timestamp, **kwargs):
retry = 0
img = None
error = None
Expand All @@ -72,7 +86,7 @@ def read(self, timestamp, **kwargs):
try:
if filename is None:
filename = self.reader._build_filename(timestamp)
img = self.reader.read(timestamp, **kwargs)
img = getattr(self.reader, self.attr_read)(timestamp, **kwargs)
except Exception as e:
logging.error(f"Error reading file (try {retry+1}) "
f"at {timestamp}: {e}. "
Expand Down Expand Up @@ -384,6 +398,9 @@ def parallel_process_async(
handler.close()
handlers.clear()

del ITER_KWARGS
del STATIC_KWARGS

if len(results) == 0:
return None
else:
Expand Down
26 changes: 18 additions & 8 deletions src/repurpose/ts2img.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ class Ts2Img:
loglevel: str, optional (default: 'WARNING')
Logging level.
Must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'.
ignore_errors: bool, optional (default: False)
Instead of raising an exception, log errors and continue the
process. E.g. to skip individual corrupt files.
"""

# Some variables are generated internally and cannot be used.
Expand All @@ -153,12 +156,15 @@ class Ts2Img:

def __init__(self, ts_reader, img_grid, timestamps,
variables=None, read_function='read',
max_dist=18000, time_collocation=True, loglevel="WARNING"):
max_dist=18000, time_collocation=True, loglevel="WARNING",
ignore_errors=False):

self.ts_reader = ts_reader
self.img_grid: CellGrid = Regular3dimImageStack._eval_grid(img_grid)
self.timestamps = timestamps

self.ignore_errors = ignore_errors

if variables is not None:
if not isinstance(variables, dict):
variables = {v: v for v in variables}
Expand Down Expand Up @@ -237,7 +243,7 @@ def _calc_chunk(self, timestamps, preprocess_func=None, preprocess_kwargs=None,
stack = parallel_process_async(
_convert, ITER_KWARGS, STATIC_KWARGS, n_proc=n_proc,
show_progress_bars=True, log_path=log_path,
verbose=False, ignore_errors=True)
verbose=False, ignore_errors=self.ignore_errors)

stack = xr.combine_by_coords(stack)

Expand Down Expand Up @@ -510,16 +516,20 @@ def store_netcdf_images(self, path_out, fn_template=f"{datetime}.nc",
del self.stack
self.stack = None

ITER_KWARGS = {'image': images, 'filename': filenames, 'dt': dts}
STATIC_KWARGS = {'out_path': path_out, 'annual_folder': annual_folder,
'encoding': encoding}

_ = parallel_process_async(
FUNC=_write_img,
ITER_KWARGS={'image': images, 'filename': filenames,
'dt': dts},
STATIC_KWARGS={'out_path': path_out,
'annual_folder': annual_folder,
'encoding': encoding},
ITER_KWARGS=ITER_KWARGS,
STATIC_KWARGS=STATIC_KWARGS,
n_proc=n_proc,
show_progress_bars=True,
verbose=False,
loglevel=self.loglevel,
ignore_errors=True,
ignore_errors=self.ignore_errors,
)

del ITER_KWARGS, STATIC_KWARGS, images, _

2 changes: 0 additions & 2 deletions tests/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ def test_add_3d_data_via_ts(self):
assert ds['var1'].values.dtype == 'float32'
assert ds['var2'].values.dtype == 'int8'

print(ds['timedelta_seconds'].values)
print(self.offsets)
assert np.array_equal(ds['timedelta_seconds'].values,
self.offsets.astype(np.float32))

Expand Down
Loading

0 comments on commit 308cc89

Please sign in to comment.