Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy netcdf saves #5191

Merged
merged 92 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
cd7fa42
Basic functional lazy saving.
pp-mo Oct 21, 2022
1f32800
Simplify function signature which upsets Sphinx.
pp-mo Oct 21, 2022
e0f980f
Non-lazy saves return nothing.
pp-mo Oct 21, 2022
67b96cf
Now fixed to enable use with process/distributed scheduling.
pp-mo Oct 23, 2022
8cdbc9b
Remove dask.utils.SerializableLock, which I think was a mistake.
pp-mo Mar 3, 2023
8723f24
Make DefferedSaveWrapper use _thread_safe_nc.
pp-mo Mar 8, 2023
d19a87f
Fixes for non-lazy save.
pp-mo Mar 9, 2023
45e0e60
Avoid saver error when no deferred writes.
pp-mo Mar 10, 2023
6a83200
Reorganise locking code, ready for shareable locks.
pp-mo Mar 10, 2023
78a9346
Remove optional usage of 'filelock' for lazy saves.
pp-mo Mar 10, 2023
31b12f7
Document dask-specific locking; implement differently for threads or …
pp-mo Mar 10, 2023
99b4f41
Minor fix for unit-tests.
pp-mo Mar 10, 2023
5d0a707
Pin libnetcdf to avoid problems -- see #5187.
pp-mo Mar 10, 2023
431036f
Minor test fix.
pp-mo Mar 10, 2023
dc368d9
Move DeferredSaveWrapper into _thread_safe_nc; replicate the NetCDFDa…
pp-mo Mar 13, 2023
6756a46
Update lib/iris/fileformats/netcdf/saver.py
pp-mo Mar 16, 2023
80b4b6c
Update lib/iris/fileformats/netcdf/_dask_locks.py
pp-mo Mar 16, 2023
78a8716
Update lib/iris/fileformats/netcdf/saver.py
pp-mo Mar 16, 2023
47bb08b
Small rename + reformat.
pp-mo Mar 17, 2023
0ece09a
Remove Saver lazy option; all lazy saves are delayed; factor out fill…
pp-mo Mar 18, 2023
940f544
Merge branch 'main' into lazy_save_2
pp-mo Mar 18, 2023
eb97130
Merge remote-tracking branch 'upstream/main' into lazy_save_2
pp-mo Mar 20, 2023
4596081
Repurposed 'test__FillValueMaskCheckAndStoreTarget' to 'test__data_fi…
pp-mo Mar 20, 2023
ad49fbe
Disable (temporary) saver debug printouts.
pp-mo Mar 20, 2023
b29c927
Fix test problems; Saver automatically completes to preserve existing…
pp-mo Mar 20, 2023
8f10281
Fix docstring error.
pp-mo Mar 20, 2023
6a564d9
Fix spurious error in old saver test.
pp-mo Mar 20, 2023
2fb4d6c
Fix Saver docstring.
pp-mo Mar 20, 2023
c84bfdc
More robust exit for NetCDFWriteProxy operation.
pp-mo Mar 20, 2023
5b78085
Fix doctests by making the Saver example functional.
pp-mo Mar 21, 2023
478332e
Improve docstrings; unify terminology; simplify non-lazy save call.
pp-mo Mar 23, 2023
34f154c
Moved netcdf cell-method handling into nc_load_rules.helpers, and var…
pp-mo Mar 27, 2023
d3744ba
Merge branch 'latest' into lazy_save_2
pp-mo Mar 27, 2023
9673ea0
Fix lockfiles and Makefile process.
pp-mo Mar 27, 2023
bcbcbc8
Add unit tests for routine _fillvalue_report().
pp-mo Mar 27, 2023
05c04a1
Remove debug-only code.
pp-mo Mar 27, 2023
679ea47
Added tests for what the save function does with the 'compute' keyword.
pp-mo Mar 28, 2023
70ec9dd
Fix mock-specific problems, small tidy.
pp-mo Mar 28, 2023
28a4674
Restructure hierarchy of tests.unit.fileformats.netcdf
pp-mo Mar 29, 2023
67f4b2b
Tidy test docstrings.
pp-mo Mar 29, 2023
ebec72f
Correct test import.
pp-mo Mar 29, 2023
1f5b904
Avoid incorrect checking of byte data, and a numpy deprecation warning.
pp-mo Mar 29, 2023
5045c9f
Alter parameter names to make test reports clearer.
pp-mo Mar 29, 2023
393407a
Test basic behaviour of _lazy_stream_data; make 'Saver._delayed_write…
pp-mo Mar 29, 2023
518360b
Add integration tests, and distributed dependency.
pp-mo Mar 30, 2023
5c9931f
Docstring fixes.
pp-mo Mar 31, 2023
7daee68
Documentation section and whatsnew entry.
pp-mo Apr 4, 2023
97474f9
Merge branch 'main' into lazy_save_2
pp-mo Apr 4, 2023
64c7251
Various fixes to whatsnew, docstrings and docs.
pp-mo Apr 4, 2023
75043f9
Minor review changes, fix doctest.
pp-mo Apr 11, 2023
445fbe2
Arrange tests + results to organise by package-name alone.
pp-mo Apr 11, 2023
09cb22e
Review changes.
pp-mo Apr 11, 2023
3445f58
Review changes.
pp-mo Apr 12, 2023
cb1e1f7
Enhance tests + debug.
pp-mo Apr 12, 2023
1c81cee
Support scheduler type 'single-threaded'; allow retries on delayed-sa…
pp-mo Apr 13, 2023
370837b
Improve test.
pp-mo Apr 13, 2023
2f5f3c2
Adding a whatsnew entry for 5224 (#5234)
HGWright Apr 4, 2023
a55c6f2
Replacing numpy legacy printing with array2string and remaking result…
HGWright Apr 4, 2023
4914e99
adding a whatsnew entry
HGWright Apr 4, 2023
bd642cd
configure codecov
HGWright Apr 4, 2023
bc5bdd1
remove results creation commit from blame
HGWright Apr 4, 2023
301e59e
fixing whatsnew entry
HGWright Apr 4, 2023
7b3044d
Bump scitools/workflows from 2023.04.1 to 2023.04.2 (#5236)
dependabot[bot] Apr 5, 2023
02f2b66
Use real array for data of of small netCDF variables. (#5229)
pp-mo Apr 6, 2023
a7e0689
Handle derived coordinates correctly in `concatenate` (#5096)
schlunma Apr 12, 2023
c4e8bbb
clarity on whatsnew entry contributors (#5240)
bjlittle Apr 12, 2023
e6661b8
Modernize and simplify iris.analysis._Groupby (#5015)
bouweandela Apr 12, 2023
afbdbbd
Finalises Lazy Data documentation (#5137)
ESadek-MO Apr 12, 2023
b8bb753
Fixes to _discontiguity_in_bounds (attempt 2) (#4975)
stephenworsley Apr 12, 2023
97cc149
update ci locks location (#5228)
bjlittle Apr 13, 2023
f14a321
Updated environment lockfiles (#5211)
scitools-ci[bot] Apr 13, 2023
f7a0b87
Increase retries.
pp-mo Apr 13, 2023
69ddd9d
Change debug to show which elements failed.
pp-mo Apr 13, 2023
8235d60
update cf standard units (#5244)
ESadek-MO Apr 13, 2023
724c6d2
libnetcdf <4.9 pin (#5242)
trexfeathers Apr 13, 2023
4f50dc7
Avoid possible same-file crossover between tests.
pp-mo Apr 13, 2023
0da68cf
Ensure all-different testfiles; load all vars lazy.
pp-mo Apr 13, 2023
e8b7bfd
Revert changes to testing framework.
pp-mo Apr 13, 2023
ad48caf
Remove repeated line from requirements/py*.yml (?merge error), and re…
pp-mo Apr 13, 2023
291b587
Revert some more debug changes.
pp-mo Apr 13, 2023
b2260ef
Merge branch 'latest' into lazy_save_2
pp-mo Apr 13, 2023
33a7d86
Reorganise test for better code clarity.
pp-mo Apr 14, 2023
db6932d
Use public 'Dataset.isopen()' instead of '._isopen'.
pp-mo Apr 14, 2023
631e001
Create output files in unique temporary directories.
pp-mo Apr 14, 2023
2869f97
Tests for fileformats.netcdf._dask_locks.
pp-mo Apr 14, 2023
419727b
Merge branch 'latest' into lazy_save_2
pp-mo Apr 21, 2023
2f4458b
Fix attribution names.
pp-mo Apr 21, 2023
88b7a2a
Merge branch 'latest' into lazy_save_2
pp-mo Apr 21, 2023
98a20e7
Fixed new py311 lockfile.
pp-mo Apr 21, 2023
bbc1167
Fix typos spotted by codespell.
pp-mo Apr 21, 2023
ed38e43
Add distributed test dep for python 3.11
pp-mo Apr 21, 2023
54ec0f8
Fix lockfile for python 3.11
pp-mo Apr 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Review changes.
  • Loading branch information
pp-mo committed Apr 11, 2023
commit 09cb22e8bf40375c60cab2c62d89157d6f3cac79
14 changes: 7 additions & 7 deletions docs/src/userguide/real_and_lazy_data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,18 @@ for more information on setting dask processing options.

.. _delayed_netcdf_save:

Delayed netCDF Saving
Delayed NetCDF Saving
---------------------

When saving data to netCDF files, it is possible to *delay* writing lazy content to the
When saving data to NetCDF files, it is possible to *delay* writing lazy content to the
output file, to be performed by `Dask <https://docs.dask.org/en/stable/>`_ later,
thus enabling parallel save operations.

This works in the following way :
1. an :func:`iris.save` call is made, with a netcdf file output and the additional
1. an :func:`iris.save` call is made, with a NetCDF file output and the additional
keyword ``compute=False``.
This is currently *only* available when saving to netCDF, so it is documented in
the netCDF file format API. See : :func:`iris.fileformats.netcdf.save`.
This is currently *only* available when saving to NetCDF, so it is documented in
the Iris NetCDF file format API. See: :func:`iris.fileformats.netcdf.save`.

2. the call creates the output file, but does not fill in variables' data, where
the data is a lazy array in the Iris object. Instead, these variables are
Expand All @@ -270,6 +270,6 @@ from shared lazy input data, these can be computed in parallel efficiently by Da
can do.

.. note::
This feature does **not** enable parallel writes to the *same* netCDF output file.
This feature does **not** enable parallel writes to the *same* NetCDF output file.
That can only be done on certain operating systems, with a specially configured
build of the netCDF C library, and is not supported by Iris at present.
build of the NetCDF C library, and is not supported by Iris at present.
5 changes: 3 additions & 2 deletions docs/src/whatsnew/latest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ This document explains the changes made to Iris for this release
enable lazy computation of rotated wind vector components (:issue:`4934`,
:pull:`4972`)

#. `@pp-mo`_ supported delayed saving of lazy data, when writing to the netCDF file
format. See : :ref:`delayed netCDF saves <delayed_netcdf_save>`.
#. `@pp-mo`_ and `@lbdreyer`_ supported delayed saving of lazy data, when writing to
the netCDF file format. See : :ref:`delayed netCDF saves <delayed_netcdf_save>`.
Also with significant input from `@fnattino`_.
(:pull:`5191`)


Expand Down
8 changes: 6 additions & 2 deletions lib/iris/fileformats/netcdf/saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ def _data_fillvalue_check(arraylib, data, check_value):
return is_masked, contains_value


class SaverFillValueWarning(UserWarning):
pass


def _fillvalue_report(fill_info, is_masked, contains_fill_value, warn=False):
"""
From the given information, work out whether there was a possible or actual
Expand Down Expand Up @@ -336,7 +340,7 @@ def _fillvalue_report(fill_info, is_masked, contains_fill_value, warn=False):
is_byte_data = fill_info.dtype.itemsize == 1
result = None
if is_byte_data and is_masked and user_value is None:
result = UserWarning(
result = SaverFillValueWarning(
f"CF var '{varname}' contains byte data with masked points, but "
"no fill_value keyword was given. As saved, these "
"points will read back as valid values. To save as "
Expand All @@ -345,7 +349,7 @@ def _fillvalue_report(fill_info, is_masked, contains_fill_value, warn=False):
"keyword during saving, otherwise use ncedit/equivalent."
)
elif contains_fill_value:
result = UserWarning(
result = SaverFillValueWarning(
f"CF var '{varname}' contains unmasked data points equal to the "
f"fill-value, {check_value}. As saved, these points will read back "
"as missing data. To save these as normal values, "
Expand Down
1 change: 0 additions & 1 deletion lib/iris/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,6 @@ def save(source, target, saver=None, **kwargs):
# Force append=True for the tail cubes. Don't modify the incoming
# kwargs.
kwargs = kwargs.copy()
result = []
for i, cube in enumerate(source):
if i != 0:
kwargs["append"] = True
Expand Down
51 changes: 46 additions & 5 deletions lib/iris/tests/integration/netcdf/test_delayed_save.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import iris
from iris.fileformats.netcdf._thread_safe_nc import default_fillvals
from iris.fileformats.netcdf.saver import SaverFillValueWarning
import iris.tests
from iris.tests.stock import realistic_4d

Expand All @@ -39,6 +40,7 @@ def make_testcube(
include_lazy_content=True,
ensure_fillvalue_collision=False,
data_is_maskedbytes=False,
include_extra_coordlikes=False,
):
cube = realistic_4d()

Expand Down Expand Up @@ -80,6 +82,24 @@ def fix_array(array):
cube.data = fix_array(cube.data)
auxcoord = cube.coord("surface_altitude")
auxcoord.points = fix_array(auxcoord.points)

if include_extra_coordlikes:
# Also concoct + attach an ancillary variable and a cell-measure, so we can
# check that they behave the same as coordinates.
ancil_dims = [0, 2]
cm_dims = [0, 3]
ancil_shape = [cube.shape[idim] for idim in ancil_dims]
cm_shape = [cube.shape[idim] for idim in cm_dims]
from iris.coords import AncillaryVariable, CellMeasure

ancil = AncillaryVariable(
fix_array(np.zeros(ancil_shape)), long_name="sample_ancil"
)
cube.add_ancillary_variable(ancil, ancil_dims)
cm = CellMeasure(
fix_array(np.zeros(cm_shape)), long_name="sample_cm"
)
cube.add_cell_measure(cm, cm_dims)
return cube

def test_realfile_loadsave_equivalence(self, save_is_delayed, output_path):
Expand All @@ -106,8 +126,8 @@ def test_realfile_loadsave_equivalence(self, save_is_delayed, output_path):
reloaded_cubes = iris.load(output_path)
reloaded_cubes = sorted(reloaded_cubes, key=lambda cube: cube.name())
assert reloaded_cubes == original_cubes
# NOTE: it might be nicer to do assertCDL, but I', not sure how to access that
# from pytest-style test code ?
# NOTE: it might be nicer to use assertCDL, but unfortunately importing
# unitest.TestCase seems to lose us the ability to use fixtures.

@staticmethod
def getmask(cube_or_coord):
Expand All @@ -122,8 +142,11 @@ def test_time_of_writing(self, save_is_delayed, output_path):
# Check when lazy data is actually written :
# - in 'immediate' mode, on initial file write
# - in 'delayed' mode, only when delayed-write is executed.
original_cube = self.make_testcube()
original_cube = self.make_testcube(include_extra_coordlikes=True)
assert original_cube.has_lazy_data()
lbdreyer marked this conversation as resolved.
Show resolved Hide resolved
assert original_cube.coord("surface_altitude").has_lazy_points()
assert original_cube.cell_measure("sample_cm").has_lazy_data()
assert original_cube.ancillary_variable("sample_ancil").has_lazy_data()

result = iris.save(
original_cube, output_path, compute=not save_is_delayed
Expand All @@ -139,20 +162,34 @@ def test_time_of_writing(self, save_is_delayed, output_path):
# If 'delayed', the lazy content should all be masked, otherwise none of it.
data_mask = self.getmask(readback_cube)
coord_mask = self.getmask(readback_cube.coord("surface_altitude"))
ancil_mask = self.getmask(
readback_cube.ancillary_variable("sample_ancil")
)
cm_mask = self.getmask(readback_cube.cell_measure("sample_cm"))
if save_is_delayed:
assert np.all(data_mask)
assert np.all(coord_mask)
assert np.all(ancil_mask)
assert np.all(cm_mask)
else:
assert np.all(~data_mask)
assert np.all(~coord_mask)
assert np.all(~ancil_mask)
assert np.all(~cm_mask)

if save_is_delayed:
result.compute()
# Re-fetch the arrays. The data is **no longer masked**.
data_mask = self.getmask(readback_cube)
coord_mask = self.getmask(readback_cube.coord("surface_altitude"))
ancil_mask = self.getmask(
readback_cube.ancillary_variable("sample_ancil")
)
cm_mask = self.getmask(readback_cube.cell_measure("sample_cm"))
assert np.all(~data_mask)
assert np.all(~coord_mask)
assert np.all(~ancil_mask)
assert np.all(~cm_mask)

@pytest.mark.parametrize(
"warning_type", ["WarnMaskedBytes", "WarnFillvalueCollision"]
Expand Down Expand Up @@ -181,12 +218,16 @@ def test_fill_warnings(self, warning_type, output_path, save_is_delayed):
result = iris.save(cube, output_path, compute=not save_is_delayed)

if not save_is_delayed:
result_warnings = [log.message for log in logged_warnings]
result_warnings = [
log.message
for log in logged_warnings
if isinstance(log.message, SaverFillValueWarning)
]
else:
assert len(logged_warnings) == 0
lbdreyer marked this conversation as resolved.
Show resolved Hide resolved
# Complete the operation now
# NOTE: warnings should not be *issued* here, instead they are returned.
warnings.simplefilter("error")
warnings.simplefilter("error", category=SaverFillValueWarning)
result_warnings = result.compute()

# Either way, we should now have 2 similar warnings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Test__lazy_stream_data:
@staticmethod
@pytest.fixture(autouse=True)
def saver_patch():
# Install patches, so we can creata a Saver without opening a real output file.
# Install patches, so we can create a Saver without opening a real output file.
# Mock just enough of Dataset behaviour to allow a 'Saver.complete()' call.
mock_dataset = mock.MagicMock(_isopen=False)
mock_dataset_class = mock.Mock(return_value=mock_dataset)
Expand Down Expand Up @@ -63,9 +63,9 @@ def saver(compute) -> Saver:
)

@staticmethod
def mock_var(shape, dtype=np.float32):
def mock_var(shape):
# Create a test cf_var object
return mock.MagicMock(shape=tuple(shape), dtype=np.dtype(dtype))
return mock.MagicMock(shape=tuple(shape), dtype=np.dtype(np.float32))

def test_data_save(self, compute, data_is_lazy):
"""Real data is transferred immediately, lazy data creates a delayed write."""
Expand All @@ -81,9 +81,9 @@ def test_data_save(self, compute, data_is_lazy):
assert cf_var.__setitem__.call_count == (0 if data_is_lazy else 1)
assert len(saver._delayed_writes) == (1 if data_is_lazy else 0)
if data_is_lazy:
result_data, result__write, fill_info = saver._delayed_writes[0]
result_data, result_writer, fill_info = saver._delayed_writes[0]
assert result_data is data
assert isinstance(result__write, nc_threadsafe.NetCDFWriteProxy)
assert isinstance(result_writer, nc_threadsafe.NetCDFWriteProxy)
assert isinstance(fill_info, _FillvalueCheckInfo)
else:
cf_var.__setitem__.assert_called_once_with(slice(None), data)
Expand Down
8 changes: 4 additions & 4 deletions lib/iris/tests/unit/fileformats/netcdf/saver/test_save.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
# See COPYING and COPYING.LESSER in the root of the repository for full
# licensing details.
"""Unit tests for the :func:`iris.fileformats.netcdf.save` function."""
# Import iris.tests first so that some things can be initialised before
# importing anything else.
import iris.tests as tests # isort:skip

from pathlib import Path
from shutil import rmtree
from tempfile import mkdtemp
Expand All @@ -25,10 +29,6 @@
from iris.tests.stock import lat_lon_cube
from iris.tests.stock.mesh import sample_mesh_cube

# Import iris.tests first so that some things can be initialised before
# importing anything else.
import iris.tests as tests # isort:skip


class Test_conventions(tests.IrisTest):
def setUp(self):
Expand Down