Skip to content

Commit

Permalink
Fix issue where parallel reading fails
Browse files Browse the repository at this point in the history
  • Loading branch information
wpreimes committed Jan 15, 2024
1 parent 301c134 commit 230e698
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
12 changes: 5 additions & 7 deletions src/repurpose/img2ts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import warnings
import platform
from repurpose.process import parallel_process_async, idx_chunks
import pynetcf.time_series as nc
from pygeogrids.grids import CellGrid
Expand All @@ -12,7 +10,6 @@
import pygeogrids.netcdf as grid2nc
import pandas as pd
from pygeobase.object_base import Image
from multiprocessing import Manager


class Img2TsError(Exception):
Expand Down Expand Up @@ -228,10 +225,9 @@ def _read_image(self, date, target_grid):
'fill_values': self.r_fill_values,
}

try:
image = self.imgin.read(date, **self.input_kwargs)
except IOError as e:
logging.error("I/O error({0}): {1}".format(e.errno, e.strerror))
image = self.imgin.read(date, **self.input_kwargs)

if image is None:
return None

logging.info(f"Read image with constant time stamp. "
Expand Down Expand Up @@ -505,6 +501,7 @@ def calc(self):
STATIC_KWARGS=STATIC_KWARGS,
log_path=os.path.join(self.outputpath, '000_log'),
loglevel="INFO",
ignore_errors=True,
n_proc=self.n_proc,
show_progress_bars=False,
)
Expand Down Expand Up @@ -561,6 +558,7 @@ def img_bulk(self):
show_progress_bars=False,
log_path=os.path.join(self.outputpath, '000_log'),
loglevel="INFO",
ignore_errors=True,
n_proc=self.n_proc,
)

Expand Down
30 changes: 21 additions & 9 deletions src/repurpose/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ImageBaseConnection:
This protects against processing gaps due to e.g. temporary network issues.
"""

def __init__(self, reader, max_retries=20, retry_delay_s=5):
def __init__(self, reader, max_retries=20, retry_delay_s=10):
"""
Parameters
----------
Expand Down Expand Up @@ -65,20 +65,32 @@ def _gen_filelist(self):
return glob(os.path.join(self.reader.path, '**'), recursive=True)

def read(self, timestamp, **kwargs):
filename = self.reader._build_filename(timestamp)
retries = 0
img = None
error = None
while filename in self.filelist and retries <= self.max_retries:
while img is None and retries <= self.max_retries:
filename = None
try:
return self.reader.read(timestamp, **kwargs)
filename = self.reader._build_filename(timestamp)
img = self.reader.read(timestamp, **kwargs)
except Exception as e:
error = e
logging.error(f"Error reading file {filename}: {error}")
time.sleep(self.retry_delay_s)
if filename is not None:
if filename not in self.filelist:
break
else:
img = None
error = e
time.sleep(self.retry_delay_s)

retries += 1

raise IOError(f"Could not read file at {timestamp} after "
f"{self.max_retries} retries: {error}")
if img is None:
raise IOError(f"Reading file {filename} failed even after "
f"{retries} retries: {error}")
else:
logging.info(f"Success reading {filename} after {retries} "
f"retries")
return img


def rootdir() -> Path:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_img2ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def test_img2ts_nonortho_daily_no_resampling():

img2ts = Img2Ts(ds_in,
outputpath, start, end, imgbuffer=10,
input_grid=input_grid, n_proc=1)
input_grid=input_grid, n_proc=2)
img2ts.calc()

ts_should_base = pd.date_range(start, end, freq='D')
Expand Down Expand Up @@ -198,7 +198,7 @@ def test_img2ts_ortho_daily_no_resampling():
ds_in = TestMultiTemporalImageDatasetDaily(
cls=TestOrthogonalImageDataset)
img2ts = Img2Ts(ds_in, outputpath, start, end, imgbuffer=20,
input_grid=input_grid, n_proc=1)
input_grid=input_grid, n_proc=2)

ts_should = np.concatenate([np.arange(5, 29, dtype=float),
np.arange(1, 32, dtype=float),
Expand Down

0 comments on commit 230e698

Please sign in to comment.