Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel concatenate #5926

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
Add support for comparing different data types
  • Loading branch information
bouweandela committed Apr 26, 2024
commit 6e883da02c7cd0360fa357a8ca0e87de105e9c71
93 changes: 47 additions & 46 deletions lib/iris/_concatenate.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to fall back on 'normal equality' when hash equality is False?

Sorry to add more work to this, but I've been having some offline conversations with @bjlittle and @pp-mo about equality in general and we're concerned about Iris' strictness. The changes here would make Iris more strict than it is already.

We are therefore keen to use hashing as a way to confirm equality quickly and efficiently, while still retaining the chance for more lenient comparisons such as:

  • Allowing NaN (example).
  • Potentially allowing for floating point differences in future (thanks to @larsbarring for insights).

If this would harm the performance gains you are looking for then we would be open to configurable behaviour in concatenate().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for providing me with feedback! ✨

I've been having some offline conversations with @bjlittle and @pp-mo about equality in general and we're concerned about Iris' strictness.

I agree with this. In ESMValCore we have implemented many workarounds for this to make the life of our users easier.

The changes here would make Iris more strict than it is already.

As far as I'm aware, this pull request does not make any changes to Iris behaviour.
Would you have an example so I can understand when this happens?

I even made the hash comparison work for arrays of different dtypes because I initially expected that that would be allowed, but it turns out that even that is not allowed by the current implementation of concatenate, so I could take that out again. Or we can keep it in case you would be interested in being more lenient w.r.t. this kind of differences in the future.

Allowing NaN

Arrays containing NaNs compare equal with the hashing implementation, I added a test to demonstrate it in 2540fea.

Would it be possible to fall back on 'normal equality' when hash equality is False?

Yes, it would be easy to add the additional comparison here:

iris/lib/iris/_concatenate.py

Lines 1077 to 1078 in 3bfea80

if get_hashes(coord_a.coord) != get_hashes(coord_b.coord):
return False

however, with the current strict implementation of coordinate comparison, there would be no point in doing so because the result would be the same. I'm not too concerned about the performance impact because in our use case, we expect the input to be compatible enough such that the result of the concatenation is a single cube, so the extra comparison would only happen in exceptional cases when there is something wrong with the input data.

Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,7 @@ def _hash_array(a: da.Array | np.ndarray) -> np.int64:
"""Calculate a hash representation of the provided array.

Calculates a 64-bit non-cryptographic hash of the provided array, using
the extremely fast ``xxhash`` hashing algorithm, and returns the hexdigest
string representation of the hash.
the fast ``xxhash`` hashing algorithm.

Note that the computed hash depends on how the array is chunked.

Expand All @@ -303,12 +302,13 @@ def _hash_array(a: da.Array | np.ndarray) -> np.int64:
Returns
-------
np.int64
The string hexadecimal representation of the item's 64-bit hash.
The array's hash.

"""

def arrayhash(x):
value = xxh3_64(x.data.tobytes())
value = xxh3_64(np.array(x.shape, dtype=np.uint).tobytes())
value.update(x.data.tobytes())
if is_masked_data(x):
value.update(x.mask.tobytes())
return np.frombuffer(value.digest(), dtype=np.int64)
Expand All @@ -335,47 +335,34 @@ def __eq__(self, other: "_ArrayHash") -> bool:
return self.value == other.value


def _compute_hashes(
cubes: Iterable[iris.cube.Cube],
check_aux_coords: bool,
check_derived_coords: bool,
check_cell_measures: bool,
check_ancils: bool,
) -> dict[str, _ArrayHash]:
def _compute_hashes(arrays: Iterable[np.ndarray | da.Array]) -> dict[str, _ArrayHash]:
"""Compute hashes for the arrays that will be compared."""
arrays = []
for cube in cubes:
if check_aux_coords:
for coord in cube.aux_coords:
arrays.append(coord.core_points())
if coord.has_bounds():
arrays.append(coord.core_bounds())
if check_derived_coords:
for coord in cube.derived_coords:
arrays.append(coord.core_points())
if coord.has_bounds():
arrays.append(coord.core_bounds())
if check_cell_measures:
for var in cube.cell_measures():
arrays.append(var.core_data())
if check_ancils:
for var in cube.ancillary_variables():
arrays.append(var.core_data())

hashes = {}

def get_shape(a):
return a.shape
def is_numerical(dtype):
return np.issubdtype(dtype, np.bool_) or np.issubdtype(dtype, np.number)

arrays.sort(key=get_shape)
for _, group in itertools.groupby(arrays, key=get_shape):
def group_key(a):
if is_numerical(a.dtype):
dtype = "numerical"
else:
dtype = str(a.dtype)
return a.shape, dtype

arrays = sorted(arrays, key=group_key)
for _, group in itertools.groupby(arrays, key=group_key):
group = list(group)
# TODO: Unify dtype as the hash depends on the dtype
# Unify dtype for numerical arrays, as the hash depends on it
if is_numerical(group[0].dtype):
dtype = np.result_type(*group)
same_dtype_arrays = [a.astype(dtype) for a in group]
else:
same_dtype_arrays = group
# Unify chunks as the hash depends on the chunks.
indices = tuple(range(group[0].ndim))[::-1]
argpairs = [(a, indices) for a in group]
_, rechunked_group = da.core.unify_chunks(*itertools.chain(*argpairs))
for array, rechunked in zip(group, rechunked_group):
argpairs = [(a, indices) for a in same_dtype_arrays]
rechunked_arrays = da.core.unify_chunks(*itertools.chain(*argpairs))[1]
for array, rechunked in zip(group, rechunked_arrays):
hashes[dask.base.tokenize(array)] = (
_hash_array(rechunked),
rechunked.chunks,
Expand Down Expand Up @@ -435,15 +422,29 @@ def concatenate(
# which requires to be negotiated.
axis = None

# Register each cube with its appropriate proto-cube.
hashes = _compute_hashes(
cubes,
check_aux_coords=check_aux_coords,
check_cell_measures=check_cell_measures,
check_ancils=check_ancils,
check_derived_coords=check_derived_coords,
)
# Compute hashes for parallel array comparison.
arrays = []
for cube in cubes:
if check_aux_coords:
for coord in cube.aux_coords:
arrays.append(coord.core_points())
if coord.has_bounds():
arrays.append(coord.core_bounds())
if check_derived_coords:
for coord in cube.derived_coords:
arrays.append(coord.core_points())
if coord.has_bounds():
arrays.append(coord.core_bounds())
if check_cell_measures:
for var in cube.cell_measures():
arrays.append(var.core_data())
if check_ancils:
for var in cube.ancillary_variables():
arrays.append(var.core_data())

hashes = _compute_hashes(arrays)

# Register each cube with its appropriate proto-cube.
for cube in cubes:
registered = False

Expand Down
21 changes: 21 additions & 0 deletions lib/iris/tests/unit/concatenate/test_hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import dask.array as da
from dask.base import tokenize
import numpy as np
import pytest

from iris import _concatenate


@pytest.mark.parametrize(
"a,b,eq",
[
(np.arange(2), da.arange(2), True),
(np.array([1], dtype=np.float32), np.array([1], dtype=bool), True),
(np.array([1]), np.array([[1]]), False),
(np.ma.array([1, 2], mask=[0, 1]), np.ma.array([1, 2], mask=[0, 1]), True),
(da.arange(2, chunks=1), da.arange(2, chunks=2), True),
],
)
def test_compute_hashes(a, b, eq):
hashes = _concatenate._compute_hashes([a, b])
assert eq == (hashes[tokenize(a)].value == hashes[tokenize(b)].value)