Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel concatenate #5926

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
Clean up a bit
  • Loading branch information
bouweandela committed Apr 25, 2024
commit 35facc78df5ae781de01a1b6cc8e9f115dee86bf
183 changes: 109 additions & 74 deletions lib/iris/_concatenate.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to fall back on 'normal equality' when hash equality is False?

Sorry to add more work to this, but I've been having some offline conversations with @bjlittle and @pp-mo about equality in general and we're concerned about Iris' strictness. The changes here would make Iris more strict than it is already.

We are therefore keen to use hashing as a way to confirm equality quickly and efficiently, while still retaining the chance for more lenient comparisons such as:

  • Allowing NaN (example).
  • Potentially allowing for floating point differences in future (thanks to @larsbarring for insights).

If this would harm the performance gains you are looking for then we would be open to configurable behaviour in concatenate().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for providing me with feedback! ✨

I've been having some offline conversations with @bjlittle and @pp-mo about equality in general and we're concerned about Iris' strictness.

I agree with this. In ESMValCore we have implemented many workarounds for this to make the life of our users easier.

The changes here would make Iris more strict than it is already.

As far as I'm aware, this pull request does not make any changes to Iris behaviour.
Would you have an example so I can understand when this happens?

I even made the hash comparison work for arrays of different dtypes because I initially expected that that would be allowed, but it turns out that even that is not allowed by the current implementation of concatenate, so I could take that out again. Or we can keep it in case you would be interested in being more lenient w.r.t. this kind of differences in the future.

Allowing NaN

Arrays containing NaNs compare equal with the hashing implementation, I added a test to demonstrate it in 2540fea.

Would it be possible to fall back on 'normal equality' when hash equality is False?

Yes, it would be easy to add the additional comparison here:

iris/lib/iris/_concatenate.py

Lines 1077 to 1078 in 3bfea80

if get_hashes(coord_a.coord) != get_hashes(coord_b.coord):
return False

however, with the current strict implementation of coordinate comparison, there would be no point in doing so because the result would be the same. I'm not too concerned about the performance impact because in our use case, we expect the input to be compatible enough such that the result of the concatenation is a single cube, so the extra comparison would only happen in exceptional cases when there is something wrong with the input data.

Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

from collections import namedtuple
import itertools
from typing import Iterable
import warnings

import dask
import dask.array as da
from dask.base import tokenize
import numpy as np
from xxhash import xxh3_64

Expand Down Expand Up @@ -39,35 +39,6 @@
_INCREASING = 1


def hash_array(a: da.Array | np.ndarray) -> np.int64:
def arrayhash(x):
value = xxh3_64(x.data.tobytes())
if is_masked_data(x):
value.update(x.mask.tobytes())
return np.frombuffer(value.digest(), dtype=np.int64)

return da.reduction(
a,
chunk=lambda x, axis, keepdims: arrayhash(x).reshape((1,) * a.ndim),
combine=lambda x, axis, keepdims: arrayhash(x).reshape((1,) * a.ndim),
aggregate=lambda x, axis, keepdims: arrayhash(x)[0],
keepdims=False,
meta=np.empty(tuple(), dtype=np.int64),
dtype=np.int64,
)


class ArrayHash:
def __init__(self, value: np.int64, chunks: tuple) -> None:
self.value = value
self.chunks = chunks

def __eq__(self, other: "ArrayHash") -> bool:
if self.chunks != other.chunks:
raise ValueError("Unable to compare arrays with different chunks.")
return self.value == other.value


class _CoordAndDims(namedtuple("CoordAndDims", ["coord", "dims"])):
"""Container for a coordinate and the associated data dimension(s).

Expand Down Expand Up @@ -315,6 +286,105 @@ class _CoordExtent(namedtuple("CoordExtent", ["points", "bounds"])):
__slots__ = ()


def _hash_array(a: da.Array | np.ndarray) -> np.int64:
"""Calculate a hash representation of the provided array.

Calculates a 64-bit non-cryptographic hash of the provided array, using
the extremely fast ``xxhash`` hashing algorithm, and returns the hexdigest
string representation of the hash.

Note that the computed hash depends on how the array is chunked.

Parameters
----------
a :
The array that requires to have its hexdigest calculated.

Returns
-------
np.int64
The string hexadecimal representation of the item's 64-bit hash.

"""

def arrayhash(x):
value = xxh3_64(x.data.tobytes())
if is_masked_data(x):
value.update(x.mask.tobytes())
return np.frombuffer(value.digest(), dtype=np.int64)

return da.reduction(
a,
chunk=lambda x, axis, keepdims: arrayhash(x).reshape((1,) * a.ndim),
combine=lambda x, axis, keepdims: arrayhash(x).reshape((1,) * a.ndim),
aggregate=lambda x, axis, keepdims: arrayhash(x)[0],
keepdims=False,
meta=np.empty(tuple(), dtype=np.int64),
dtype=np.int64,
)


class _ArrayHash:
def __init__(self, value: np.int64, chunks: tuple) -> None:
self.value = value
self.chunks = chunks

def __eq__(self, other: "_ArrayHash") -> bool:
if self.chunks != other.chunks:
raise ValueError("Unable to compare arrays with different chunks.")
return self.value == other.value


def _compute_hashes(
cubes: Iterable[iris.cube.Cube],
check_aux_coords: bool,
check_derived_coords: bool,
check_cell_measures: bool,
check_ancils: bool,
) -> dict[str, _ArrayHash]:
"""Compute hashes for the arrays that will be compared."""
arrays = []
for cube in cubes:
if check_aux_coords:
for coord in cube.aux_coords:
arrays.append(coord.core_points())
if coord.has_bounds():
arrays.append(coord.core_bounds())
if check_derived_coords:
for coord in cube.derived_coords:
arrays.append(coord.core_points())
if coord.has_bounds():
arrays.append(coord.core_bounds())
if check_cell_measures:
for var in cube.cell_measures():
arrays.append(var.core_data())
if check_ancils:
for var in cube.ancillary_variables():
arrays.append(var.core_data())

hashes = {}

def get_shape(a):
return a.shape

arrays.sort(key=get_shape)
for _, group in itertools.groupby(arrays, key=get_shape):
group = list(group)
# TODO: Unify dtype as the hash depends on the dtype
# Unify chunks as the hash depends on the chunks.
indices = tuple(range(group[0].ndim))[::-1]
argpairs = [(a, indices) for a in group]
_, rechunked_group = da.core.unify_chunks(*itertools.chain(*argpairs))
for array, rechunked in zip(group, rechunked_group):
hashes[dask.base.tokenize(array)] = (
_hash_array(rechunked),
rechunked.chunks,
)

result = dask.compute(hashes)[0]
return {k: _ArrayHash(*v) for k, v in result.items()}


def concatenate(
cubes,
error_on_mismatch=False,
Expand Down Expand Up @@ -366,48 +436,13 @@ def concatenate(
axis = None

# Register each cube with its appropriate proto-cube.
arrays = []

# 1 collect list of arrays
for cube in cubes:
if check_aux_coords:
for coord in cube.aux_coords:
arrays.append(coord.core_points())
if coord.has_bounds():
arrays.append(coord.core_bounds())
if check_derived_coords:
for coord in cube.derived_coords:
arrays.append(coord.core_points())
if coord.has_bounds():
arrays.append(coord.core_bounds())
if check_cell_measures:
for var in cube.cell_measures():
arrays.append(var.core_data())
if check_ancils:
for var in cube.ancillary_variables():
arrays.append(var.core_data())

# 2 unify chunks of arrays that have matching shape
hashes = {}

def grouper(a):
return a.shape

arrays.sort(key=grouper)
for _, group in itertools.groupby(arrays, key=grouper):
group = list(group)
indices = tuple(range(group[0].ndim))[::-1]
argpairs = [(a, indices) for a in group]
_, rechunked_group = da.core.unify_chunks(*itertools.chain(*argpairs))
for array, rechunked in zip(group, rechunked_group):
hashes[dask.base.tokenize(array)] = (
hash_array(rechunked),
rechunked.chunks,
)

# 3 compute hashes
(hashes,) = dask.compute(hashes)
hashes = {k: ArrayHash(*v) for k, v in hashes.items()}
hashes = _compute_hashes(
cubes,
check_aux_coords=check_aux_coords,
check_cell_measures=check_cell_measures,
check_ancils=check_ancils,
check_derived_coords=check_derived_coords,
)

for cube in cubes:
registered = False
Expand Down Expand Up @@ -995,7 +1030,7 @@ def register(
warnings.warn(msg, category=iris.warnings.IrisUserWarning)

def get_hash(array):
return hashes[tokenize(array)]
return hashes[dask.base.tokenize(array)]

def get_hashes(coord):
result = []
Expand All @@ -1012,7 +1047,7 @@ def check_coord_match(coord_type):
getattr(self._cube_signature, coord_type),
getattr(cube_signature, coord_type),
):
# AuxCoords that span the candidate axis can differ
# Coordinates that span the candidate axis can differ
if (
candidate_axis not in coord_a.dims
or candidate_axis not in coord_b.dims
Expand Down