Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy netcdf saves #5191

Merged
merged 92 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
cd7fa42
Basic functional lazy saving.
pp-mo Oct 21, 2022
1f32800
Simplify function signature which upsets Sphinx.
pp-mo Oct 21, 2022
e0f980f
Non-lazy saves return nothing.
pp-mo Oct 21, 2022
67b96cf
Now fixed to enable use with process/distributed scheduling.
pp-mo Oct 23, 2022
8cdbc9b
Remove dask.utils.SerializableLock, which I think was a mistake.
pp-mo Mar 3, 2023
8723f24
Make DefferedSaveWrapper use _thread_safe_nc.
pp-mo Mar 8, 2023
d19a87f
Fixes for non-lazy save.
pp-mo Mar 9, 2023
45e0e60
Avoid saver error when no deferred writes.
pp-mo Mar 10, 2023
6a83200
Reorganise locking code, ready for shareable locks.
pp-mo Mar 10, 2023
78a9346
Remove optional usage of 'filelock' for lazy saves.
pp-mo Mar 10, 2023
31b12f7
Document dask-specific locking; implement differently for threads or …
pp-mo Mar 10, 2023
99b4f41
Minor fix for unit-tests.
pp-mo Mar 10, 2023
5d0a707
Pin libnetcdf to avoid problems -- see #5187.
pp-mo Mar 10, 2023
431036f
Minor test fix.
pp-mo Mar 10, 2023
dc368d9
Move DeferredSaveWrapper into _thread_safe_nc; replicate the NetCDFDa…
pp-mo Mar 13, 2023
6756a46
Update lib/iris/fileformats/netcdf/saver.py
pp-mo Mar 16, 2023
80b4b6c
Update lib/iris/fileformats/netcdf/_dask_locks.py
pp-mo Mar 16, 2023
78a8716
Update lib/iris/fileformats/netcdf/saver.py
pp-mo Mar 16, 2023
47bb08b
Small rename + reformat.
pp-mo Mar 17, 2023
0ece09a
Remove Saver lazy option; all lazy saves are delayed; factor out fill…
pp-mo Mar 18, 2023
940f544
Merge branch 'main' into lazy_save_2
pp-mo Mar 18, 2023
eb97130
Merge remote-tracking branch 'upstream/main' into lazy_save_2
pp-mo Mar 20, 2023
4596081
Repurposed 'test__FillValueMaskCheckAndStoreTarget' to 'test__data_fi…
pp-mo Mar 20, 2023
ad49fbe
Disable (temporary) saver debug printouts.
pp-mo Mar 20, 2023
b29c927
Fix test problems; Saver automatically completes to preserve existing…
pp-mo Mar 20, 2023
8f10281
Fix docstring error.
pp-mo Mar 20, 2023
6a564d9
Fix spurious error in old saver test.
pp-mo Mar 20, 2023
2fb4d6c
Fix Saver docstring.
pp-mo Mar 20, 2023
c84bfdc
More robust exit for NetCDFWriteProxy operation.
pp-mo Mar 20, 2023
5b78085
Fix doctests by making the Saver example functional.
pp-mo Mar 21, 2023
478332e
Improve docstrings; unify terminology; simplify non-lazy save call.
pp-mo Mar 23, 2023
34f154c
Moved netcdf cell-method handling into nc_load_rules.helpers, and var…
pp-mo Mar 27, 2023
d3744ba
Merge branch 'latest' into lazy_save_2
pp-mo Mar 27, 2023
9673ea0
Fix lockfiles and Makefile process.
pp-mo Mar 27, 2023
bcbcbc8
Add unit tests for routine _fillvalue_report().
pp-mo Mar 27, 2023
05c04a1
Remove debug-only code.
pp-mo Mar 27, 2023
679ea47
Added tests for what the save function does with the 'compute' keyword.
pp-mo Mar 28, 2023
70ec9dd
Fix mock-specific problems, small tidy.
pp-mo Mar 28, 2023
28a4674
Restructure hierarchy of tests.unit.fileformats.netcdf
pp-mo Mar 29, 2023
67f4b2b
Tidy test docstrings.
pp-mo Mar 29, 2023
ebec72f
Correct test import.
pp-mo Mar 29, 2023
1f5b904
Avoid incorrect checking of byte data, and a numpy deprecation warning.
pp-mo Mar 29, 2023
5045c9f
Alter parameter names to make test reports clearer.
pp-mo Mar 29, 2023
393407a
Test basic behaviour of _lazy_stream_data; make 'Saver._delayed_write…
pp-mo Mar 29, 2023
518360b
Add integration tests, and distributed dependency.
pp-mo Mar 30, 2023
5c9931f
Docstring fixes.
pp-mo Mar 31, 2023
7daee68
Documentation section and whatsnew entry.
pp-mo Apr 4, 2023
97474f9
Merge branch 'main' into lazy_save_2
pp-mo Apr 4, 2023
64c7251
Various fixes to whatsnew, docstrings and docs.
pp-mo Apr 4, 2023
75043f9
Minor review changes, fix doctest.
pp-mo Apr 11, 2023
445fbe2
Arrange tests + results to organise by package-name alone.
pp-mo Apr 11, 2023
09cb22e
Review changes.
pp-mo Apr 11, 2023
3445f58
Review changes.
pp-mo Apr 12, 2023
cb1e1f7
Enhance tests + debug.
pp-mo Apr 12, 2023
1c81cee
Support scheduler type 'single-threaded'; allow retries on delayed-sa…
pp-mo Apr 13, 2023
370837b
Improve test.
pp-mo Apr 13, 2023
2f5f3c2
Adding a whatsnew entry for 5224 (#5234)
HGWright Apr 4, 2023
a55c6f2
Replacing numpy legacy printing with array2string and remaking result…
HGWright Apr 4, 2023
4914e99
adding a whatsnew entry
HGWright Apr 4, 2023
bd642cd
configure codecov
HGWright Apr 4, 2023
bc5bdd1
remove results creation commit from blame
HGWright Apr 4, 2023
301e59e
fixing whatsnew entry
HGWright Apr 4, 2023
7b3044d
Bump scitools/workflows from 2023.04.1 to 2023.04.2 (#5236)
dependabot[bot] Apr 5, 2023
02f2b66
Use real array for data of of small netCDF variables. (#5229)
pp-mo Apr 6, 2023
a7e0689
Handle derived coordinates correctly in `concatenate` (#5096)
schlunma Apr 12, 2023
c4e8bbb
clarity on whatsnew entry contributors (#5240)
bjlittle Apr 12, 2023
e6661b8
Modernize and simplify iris.analysis._Groupby (#5015)
bouweandela Apr 12, 2023
afbdbbd
Finalises Lazy Data documentation (#5137)
ESadek-MO Apr 12, 2023
b8bb753
Fixes to _discontiguity_in_bounds (attempt 2) (#4975)
stephenworsley Apr 12, 2023
97cc149
update ci locks location (#5228)
bjlittle Apr 13, 2023
f14a321
Updated environment lockfiles (#5211)
scitools-ci[bot] Apr 13, 2023
f7a0b87
Increase retries.
pp-mo Apr 13, 2023
69ddd9d
Change debug to show which elements failed.
pp-mo Apr 13, 2023
8235d60
update cf standard units (#5244)
ESadek-MO Apr 13, 2023
724c6d2
libnetcdf <4.9 pin (#5242)
trexfeathers Apr 13, 2023
4f50dc7
Avoid possible same-file crossover between tests.
pp-mo Apr 13, 2023
0da68cf
Ensure all-different testfiles; load all vars lazy.
pp-mo Apr 13, 2023
e8b7bfd
Revert changes to testing framework.
pp-mo Apr 13, 2023
ad48caf
Remove repeated line from requirements/py*.yml (?merge error), and re…
pp-mo Apr 13, 2023
291b587
Revert some more debug changes.
pp-mo Apr 13, 2023
b2260ef
Merge branch 'latest' into lazy_save_2
pp-mo Apr 13, 2023
33a7d86
Reorganise test for better code clarity.
pp-mo Apr 14, 2023
db6932d
Use public 'Dataset.isopen()' instead of '._isopen'.
pp-mo Apr 14, 2023
631e001
Create output files in unique temporary directories.
pp-mo Apr 14, 2023
2869f97
Tests for fileformats.netcdf._dask_locks.
pp-mo Apr 14, 2023
419727b
Merge branch 'latest' into lazy_save_2
pp-mo Apr 21, 2023
2f4458b
Fix attribution names.
pp-mo Apr 21, 2023
88b7a2a
Merge branch 'latest' into lazy_save_2
pp-mo Apr 21, 2023
98a20e7
Fixed new py311 lockfile.
pp-mo Apr 21, 2023
bbc1167
Fix typos spotted by codespell.
pp-mo Apr 21, 2023
ed38e43
Add distributed test dep for python 3.11
pp-mo Apr 21, 2023
54ec0f8
Fix lockfile for python 3.11
pp-mo Apr 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions lib/iris/fileformats/netcdf/_dask_locks.py
lbdreyer marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright Iris contributors
#
# This file is part of Iris and is released under the LGPL license.
# See COPYING and COPYING.LESSER in the root of the repository for full
# licensing details.
"""
Module containing code to create locks enabling dask workers to co-operate.

This matter is complicated by needing different solutions for different dask scheduler
types, i.e. local 'threads' scheduler, local 'processes' or distributed.

In any case, a "iris.fileformats.netcdf.saver.Saver" object contains a netCDF4.Dataset
lbdreyer marked this conversation as resolved.
Show resolved Hide resolved
targetting an output file, and creates a Saver.lock object to serialise write-accesses
to the file from dask tasks : All dask-task file writes go via a
"iris.fileformats.netcdf.saver.DeferredSaveWrapper" object, which also contains a link
to the Saver.lock, and uses it to prevent workers from fouling each other.
For each chunk written, the DeferredSaveWrapper acquires the common per-file lock;
opens a Dataset on the file; performs a write to the relevant variable; closes the
Dataset and then releases the lock.

For a threaded scheduler, the Saver.lock is a simple threading.Lock(). The workers
(threads) execute tasks which contain a DeferredSaveWrapper, as above. All of those
contain the common lock, and this is simply **the same object** for all workers, since
they share an address space.

For a distributed scheduler, the Saver.lock is a `distributed.Lock()` which is
identified with the output filepath. This is distributed to the workers by
serialising the task function arguments, which will include the DeferredSaveWrapper.
A worker behaves like a process, though it may execute on a remote machine. When a
distributed.Lock is deserialised to reconstruct the worker task, this creates an object
that communicates with the scheduler. These objects behave as a single common lock,
as they all have the same string 'identity', so the scheduler implements inter-process
communication so that the mutally exclude each other.

It is also *conceivable* that multiple processes could write to the same file in
parallel, if the operating system supports it. However, this also requires that the
libnetcdf C library is built with parallel access option, which is not common.
With the "ordinary" libnetcdf build, a process which attempts to rpen for writing a file
pp-mo marked this conversation as resolved.
Show resolved Hide resolved
which is _already_ open for writing simply raises an access error.
In any case, Iris netcdf saver will not support this mode of operation, at present.

We don't currently support a local "processes" type scheduler. If we did, the
behaviour should be very similar to a distributed scheduler. It would need to use some
other serialisable shared-lock solution in place of 'distributed.Lock', which requires
a distributed scheduler to function.

"""
import threading

import dask.array
import dask.base
import dask.multiprocessing
import dask.threaded


def dask_scheduler_is_distributed():
"""Return whether a distributed.Client is active."""
# NOTE: this replicates logic in `dask.base.get_scheduler` : if a distributed client
# has been created + is still active, then the default scheduler will always be
# "distributed".
is_distributed = False
# NOTE: must still work when 'distributed' is not available.
try:
import distributed

client = distributed.get_client()
is_distributed = client is not None
except (ImportError, ValueError):
pass
return is_distributed


def get_dask_array_scheduler_type():
"""
Work out what type of scheduler an array.compute*() will use.

Returns one of 'distributed', 'threads' or 'processes'.
The return value is a valid argument for dask.config.set(scheduler=<type>).
This cannot distinguish between distributed local and remote clusters -- both of
those simply return 'distributed'.

NOTE: this takes account of how dask is *currently* configured. It will be wrong
if the config changes before the compute actually occurs.

"""
if dask_scheduler_is_distributed():
result = "distributed"
else:
# Call 'get_scheduler', which respects the config settings, but pass an array
# so we default to the default scheduler for that type of object.
trial_dask_array = dask.array.zeros(1)
get_function = dask.base.get_scheduler(collections=[trial_dask_array])
# Detect the ones which we recognise.
if get_function == dask.threaded.get:
result = "threads"
elif get_function == dask.multiprocessing.get:
result = "processes"
else:
msg = f"Dask default scheduler for arrays is unrecognised : {get_function}"
raise ValueError(msg)

return result


def get_worker_lock(identity: str):
"""
Return a mutex Lock which can be shared by multiple Dask workers.

The type of Lock generated depends on the dask scheduler type, which must therefore
be set up before this is called.

"""
scheduler_type = get_dask_array_scheduler_type()
if scheduler_type == "threads":
# N.B. the "identity" string is never used in this case, as the same actual
# lock object is used by all workers.
lock = threading.Lock()
elif scheduler_type == "distributed":
from dask.distributed import Lock as DistributedLock

lock = DistributedLock(identity)
else:
msg = (
"The configured dask array scheduler type is "
f'"{scheduler_type}", '
"which is not supported by the Iris netcdf saver."
)
raise ValueError(msg)

# NOTE: not supporting 'processes' scheduler, for now.
return lock
38 changes: 38 additions & 0 deletions lib/iris/fileformats/netcdf/_thread_safe_nc.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,41 @@ def __getstate__(self):
def __setstate__(self, state):
for key, value in state.items():
setattr(self, key, value)


class DeferredSaveWrapper:
lbdreyer marked this conversation as resolved.
Show resolved Hide resolved
"""
The "opposite" of a NetCDFDataProxy : An object mimicking the data access of a
netCDF4.Variable, but where the data is to be ***written to***.

It encapsulates the netcdf file and variable which are actually to be written to.
lbdreyer marked this conversation as resolved.
Show resolved Hide resolved
This opens the file each time, to enable writing the data chunk, then closes it.
TODO: could be improved with a caching scheme, but this just about works.
"""

def __init__(self, filepath, cf_var, file_write_lock):
self.path = filepath
self.varname = cf_var.name
self.lock = file_write_lock

def __setitem__(self, keys, array_data):
# Write to the variable.
# First acquire a file-specific lock for all workers writing to this file.
self.lock.acquire()
# Open the file for writing + write to the specific file variable.
# Exactly as above, in NetCDFDataProxy : a DatasetWrapper causes problems with
# invalid ID's and the netCDF4 library, for so-far unknown reasons.
# Instead, use _GLOBAL_NETCDF4_LOCK, and netCDF4 _directly_.
with _GLOBAL_NETCDF4_LOCK:
dataset = None
try:
dataset = netCDF4.Dataset(self.path, "r+")
var = dataset.variables[self.varname]
var[keys] = array_data
finally:
if dataset:
dataset.close()
self.lock.release()
lbdreyer marked this conversation as resolved.
Show resolved Hide resolved

def __repr__(self):
return f"<{self.__class__.__name__} path={self.path!r} var={self.varname!r}>"
112 changes: 97 additions & 15 deletions lib/iris/fileformats/netcdf/saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import warnings

import cf_units
import dask
import dask.array as da
import numpy as np
import numpy.ma as ma
Expand All @@ -44,7 +45,7 @@
from iris.coords import AncillaryVariable, AuxCoord, CellMeasure, DimCoord
import iris.exceptions
import iris.fileformats.cf
from iris.fileformats.netcdf import _thread_safe_nc
from iris.fileformats.netcdf import _dask_locks, _thread_safe_nc
import iris.io
import iris.util

Expand Down Expand Up @@ -499,7 +500,7 @@ def __setitem__(self, keys, arr):
class Saver:
"""A manager for saving netcdf files."""

def __init__(self, filename, netcdf_format):
def __init__(self, filename, netcdf_format, compute=True):
"""
A manager for saving netcdf files.

Expand All @@ -512,6 +513,15 @@ def __init__(self, filename, netcdf_format):
Underlying netCDF file format, one of 'NETCDF4', 'NETCDF4_CLASSIC',
'NETCDF3_CLASSIC' or 'NETCDF3_64BIT'. Default is 'NETCDF4' format.

* compute (bool):
If True, the Saver performs normal 'synchronous' data writes, where data
is streamed directly into file variables during the save operation.
If False, the file is created as normal, but computation and streaming of
any lazy array content is instead deferred to :class:`dask.delayed` objects,
pp-mo marked this conversation as resolved.
Show resolved Hide resolved
which are held in a list in the saver 'delayed_writes' property.
The relavant file variables are created empty, and the write can
subsequently be completed by computing the 'save.deferred_writes'.

Returns:
None.

Expand Down Expand Up @@ -548,18 +558,28 @@ def __init__(self, filename, netcdf_format):
self._mesh_dims = {}
#: A dictionary, mapping formula terms to owner cf variable name
self._formula_terms_cache = {}
#: Target filepath
self.filepath = os.path.abspath(filename)
#: Whether lazy saving.
self.lazy_saves = not compute
#: A list of deferred writes for lazy saving : each is a (source, target) pair
self.deferred_writes = []
# N.B. the file-write-lock *type* actually depends on the dask scheduler type.
#: A per-file write lock to prevent dask attempting overlapping writes.
self.file_write_lock = _dask_locks.get_worker_lock(self.filepath)
#: NetCDF dataset
self._dataset = None
try:
self._dataset = _thread_safe_nc.DatasetWrapper(
filename, mode="w", format=netcdf_format
self.filepath, mode="w", format=netcdf_format
)
except RuntimeError:
dir_name = os.path.dirname(filename)
dir_name = os.path.dirname(self.filepath)
if not os.path.isdir(dir_name):
msg = "No such file or directory: {}".format(dir_name)
raise IOError(msg)
if not os.access(dir_name, os.R_OK | os.W_OK):
msg = "Permission denied: {}".format(filename)
msg = "Permission denied: {}".format(self.filepath)
raise IOError(msg)
else:
raise
Expand Down Expand Up @@ -2444,8 +2464,7 @@ def _increment_name(self, varname):

return "{}_{}".format(varname, num)

@staticmethod
def _lazy_stream_data(data, fill_value, fill_warn, cf_var):
def _lazy_stream_data(self, data, fill_value, fill_warn, cf_var):
if hasattr(data, "shape") and data.shape == (1,) + cf_var.shape:
# (Don't do this check for string data).
# Reduce dimensionality where the data array has an extra dimension
Expand All @@ -2455,16 +2474,37 @@ def _lazy_stream_data(data, fill_value, fill_warn, cf_var):
data = data.squeeze(axis=0)

if is_lazy_data(data):
if self.lazy_saves:
# deferred lazy streaming
def store(data, cf_var, fill_value):
# Create a data-writeable object that we can stream into, which
# encapsulates the file to be opened + variable to be written.
writeable_var_wrapper = (
_thread_safe_nc.DeferredSaveWrapper(
self.filepath, cf_var, self.file_write_lock
)
)
lbdreyer marked this conversation as resolved.
Show resolved Hide resolved
# Add to the list of deferred writes, used in _deferred_save().
self.deferred_writes.append((data, writeable_var_wrapper))
# NOTE: in this case, no checking of fill-value violations so just
# return dummy values for this.
# TODO: just for now -- can probably make this work later
is_masked, contains_value = False, False
return is_masked, contains_value

def store(data, cf_var, fill_value):
# Store lazy data and check whether it is masked and contains
# the fill value
target = _FillValueMaskCheckAndStoreTarget(cf_var, fill_value)
da.store([data], [target])
return target.is_masked, target.contains_value
else:
# Immediate streaming store : check mask+fill as we go.
def store(data, cf_var, fill_value):
# Store lazy data and check whether it is masked and contains
# the fill value
target = _FillValueMaskCheckAndStoreTarget(
cf_var, fill_value
)
da.store([data], [target], lock=False)
return target.is_masked, target.contains_value

else:

# Real data is always written directly, i.e. not via lazy save.
def store(data, cf_var, fill_value):
cf_var[:] = data
is_masked = np.ma.is_masked(data)
Expand Down Expand Up @@ -2513,6 +2553,28 @@ def store(data, cf_var, fill_value):
)
warnings.warn(msg.format(cf_var.name, fill_value))

def _deferred_save(self):
"""
Create a 'delayed' to trigger file completion for lazy saves.

This contains all the deferred writes, which complete the file by filling out
the data of variables initially created empty.

"""
if self.deferred_writes:
# Create a single delayed da.store operation to complete the file.
sources, targets = zip(*self.deferred_writes)
result = da.store(sources, targets, compute=False, lock=False)
else:
# Return a delayed anyway, just for usage consistency.
@dask.delayed
def no_op():
return None

result = no_op()

return result


def save(
cube,
Expand All @@ -2530,6 +2592,7 @@ def save(
least_significant_digit=None,
packing=None,
fill_value=None,
compute=True,
):
"""
Save cube(s) to a netCDF file, given the cube and the filename.
Expand Down Expand Up @@ -2652,6 +2715,14 @@ def save(
`:class:`iris.cube.CubeList`, or a single element, and each element of
this argument will be applied to each cube separately.

* compute (bool):
When False, create the output file but defer writing any lazy array content to
its variables, such as (lazy) data and aux-coords points and bounds.
Instead return a class:`dask.delayed` which, when computed, will compute all
pp-mo marked this conversation as resolved.
Show resolved Hide resolved
the lazy content and stream it to complete the file.
Several such data saves can be performed in parallel, by passing a list of them
into a :func:`dask.compute` call.

Returns:
None.

Expand Down Expand Up @@ -2752,7 +2823,9 @@ def is_valid_packspec(p):
raise ValueError(msg)

# Initialise Manager for saving
with Saver(filename, netcdf_format) as sman:
# N.B. FOR NOW -- we are cheating and making all saves compute=False, as otherwise
# non-lazy saves do *not* work with the distributed scheduler.
with Saver(filename, netcdf_format, compute=False) as sman:
# Iterate through the cubelist.
for cube, packspec, fill_value in zip(cubes, packspecs, fill_values):
sman.write(
Expand Down Expand Up @@ -2797,3 +2870,12 @@ def is_valid_packspec(p):

# Add conventions attribute.
sman.update_global_attributes(Conventions=conventions)

# For now, not using Saver(compute=True) as it doesn't work with distributed or
# process workers (only threaded).
result = sman._deferred_save()
if compute:
result = result.compute()
result = None

return result
Loading