diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index b25698da89..3676249c0f 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -4,10 +4,17 @@ # See LICENSE in the root of the repository for full licensing details. """Automatic concatenation of multiple cubes over one or more existing dimensions.""" -from collections import defaultdict, namedtuple +from collections import namedtuple +from collections.abc import Sequence +import itertools +from typing import Any, Iterable import warnings +import dask +import dask.array as da +from dask.base import tokenize import numpy as np +from xxhash import xxh3_64_digest from iris._lazy_data import concatenate as concatenate_arrays import iris.coords @@ -281,14 +288,119 @@ class _CoordExtent(namedtuple("CoordExtent", ["points", "bounds"])): __slots__ = () +def _hash_array(a: da.Array | np.ndarray) -> np.int64: + """Calculate a hash representation of the provided array. + + Calculates a 64-bit non-cryptographic hash of the provided array, using + the fast ``xxhash`` hashing algorithm. + + Note that the computed hash depends on how the array is chunked. + + Parameters + ---------- + a : + The array that requires to have its hexdigest calculated. + + Returns + ------- + np.int64 + The array's hash. + + """ + + def arrayhash(x): + return np.frombuffer(xxh3_64_digest(x.tobytes()), dtype=np.int64) + + return da.reduction( + a, + chunk=lambda x, axis, keepdims: arrayhash(x).reshape((1,) * a.ndim), + combine=lambda x, axis, keepdims: arrayhash(x).reshape((1,) * a.ndim), + aggregate=lambda x, axis, keepdims: arrayhash(x)[0], + keepdims=False, + meta=np.empty(tuple(), dtype=np.int64), + dtype=np.int64, + ) + + +class _ArrayHash(namedtuple("ArrayHash", ["value", "chunks"])): + """Container for a hash value and the chunks used when computing it. + + Parameters + ---------- + value : :class:`np.int64` + The hash value. + chunks : tuple + The chunks the array had when the hash was computed. + """ + + __slots__ = () + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, self.__class__): + return NotImplemented + if self.chunks != other.chunks: + raise ValueError( + "Unable to compare arrays with different chunks: " + f"{self.chunks} != {other.chunks}" + ) + return self.value == other.value + + +def array_id(array: np.ndarray | da.Array) -> str: + """Get a deterministic token representing `array`.""" + if isinstance(array, np.ma.MaskedArray): + # Tokenizing a masked array is much slower than separately tokenizing + # the data and mask. + return tokenize((tokenize(array.data), tokenize(array.mask))) + return tokenize(array) + + +def _compute_hashes(arrays: Iterable[np.ndarray | da.Array]) -> dict[str, _ArrayHash]: + """Compute hashes for the arrays that will be compared.""" + + def is_numerical(dtype): + return np.issubdtype(dtype, np.bool_) or np.issubdtype(dtype, np.number) + + def group_key(a): + if is_numerical(a.dtype): + dtype = "numerical" + else: + dtype = str(a.dtype) + return a.shape, dtype + + hashes = {} + + arrays = sorted(arrays, key=group_key) + for _, group_iter in itertools.groupby(arrays, key=group_key): + group = list(group_iter) + # Unify dtype for numerical arrays, as the hash depends on it + if is_numerical(group[0].dtype): + dtype = np.result_type(*group) + same_dtype_arrays = [a.astype(dtype) for a in group] + else: + same_dtype_arrays = group + # Unify chunks as the hash depends on the chunks. + indices = tuple(range(group[0].ndim))[::-1] + argpairs = [(a, indices) for a in same_dtype_arrays] + rechunked_arrays = da.core.unify_chunks(*itertools.chain(*argpairs))[1] + for array, rechunked in zip(group, rechunked_arrays): + hashes[array_id(array)] = ( + _hash_array(rechunked), + rechunked.chunks, + ) + + hashes = dask.compute(hashes)[0] + return {k: _ArrayHash(*v) for k, v in hashes.items()} + + def concatenate( - cubes, - error_on_mismatch=False, - check_aux_coords=True, - check_cell_measures=True, - check_ancils=True, - check_derived_coords=True, -): + cubes: Sequence[iris.cube.Cube], + error_on_mismatch: bool = False, + check_aux_coords: bool = True, + check_cell_measures: bool = True, + check_ancils: bool = True, + check_derived_coords: bool = True, +) -> iris.cube.CubeList: """Concatenate the provided cubes over common existing dimensions. Parameters @@ -326,21 +438,42 @@ def concatenate( A :class:`iris.cube.CubeList` of concatenated :class:`iris.cube.Cube` instances. """ - proto_cubes_by_name = defaultdict(list) + proto_cubes: list[_ProtoCube] = [] # Initialise the nominated axis (dimension) of concatenation # which requires to be negotiated. axis = None + # Compute hashes for parallel array comparison. + arrays = [] + for cube in cubes: + if check_aux_coords: + for coord in cube.aux_coords: + arrays.append(coord.core_points()) + if coord.has_bounds(): + arrays.append(coord.core_bounds()) + if check_derived_coords: + for coord in cube.derived_coords: + arrays.append(coord.core_points()) + if coord.has_bounds(): + arrays.append(coord.core_bounds()) + if check_cell_measures: + for var in cube.cell_measures(): + arrays.append(var.core_data()) + if check_ancils: + for var in cube.ancillary_variables(): + arrays.append(var.core_data()) + + hashes = _compute_hashes(arrays) + # Register each cube with its appropriate proto-cube. for cube in cubes: - name = cube.standard_name or cube.long_name - proto_cubes = proto_cubes_by_name[name] registered = False # Register cube with an existing proto-cube. for proto_cube in proto_cubes: registered = proto_cube.register( cube, + hashes, axis, error_on_mismatch, check_aux_coords, @@ -360,13 +493,11 @@ def concatenate( concatenated_cubes = iris.cube.CubeList() # Emulate Python 2 behaviour. - def _none_sort(item): - return (item is not None, item) + def _none_sort(proto_cube): + return (proto_cube.name is not None, proto_cube.name) - for name in sorted(proto_cubes_by_name, key=_none_sort): - for proto_cube in proto_cubes_by_name[name]: - # Construct the concatenated cube. - concatenated_cubes.append(proto_cube.concatenate()) + for proto_cube in sorted(proto_cubes, key=_none_sort): + concatenated_cubes.append(proto_cube.concatenate()) # Perform concatenation until we've reached an equilibrium. count = len(concatenated_cubes) @@ -384,7 +515,7 @@ class _CubeSignature: """ - def __init__(self, cube): + def __init__(self, cube: iris.cube.Cube) -> None: """Represent the cube metadata and associated coordinate metadata. Parameters @@ -417,7 +548,7 @@ def __init__(self, cube): # # Collate the dimension coordinate metadata. # - for ind, coord in enumerate(self.dim_coords): + for coord in self.dim_coords: dims = cube.coord_dims(coord) metadata = _CoordMetaData(coord, dims) self.dim_metadata.append(metadata) @@ -607,7 +738,7 @@ def match(self, other, error_on_mismatch): class _CoordSignature: """Template for identifying a specific type of :class:`iris.cube.Cube` based on its coordinates.""" - def __init__(self, cube_signature): + def __init__(self, cube_signature: _CubeSignature) -> None: """Represent the coordinate metadata. Represent the coordinate metadata required to identify suitable @@ -626,7 +757,7 @@ def __init__(self, cube_signature): self.derived_coords_and_dims = cube_signature.derived_coords_and_dims self.dim_coords = cube_signature.dim_coords self.dim_mapping = cube_signature.dim_mapping - self.dim_extents = [] + self.dim_extents: list[_CoordExtent] = [] self.dim_order = [ metadata.kwargs["order"] for metadata in cube_signature.dim_metadata ] @@ -635,7 +766,10 @@ def __init__(self, cube_signature): self._calculate_extents() @staticmethod - def _cmp(coord, other): + def _cmp( + coord: iris.coords.DimCoord, + other: iris.coords.DimCoord, + ) -> tuple[bool, bool]: """Compare the coordinates for concatenation compatibility. Returns @@ -646,22 +780,17 @@ def _cmp(coord, other): """ # A candidate axis must have non-identical coordinate points. - candidate_axis = not array_equal(coord.core_points(), other.core_points()) + candidate_axis = not array_equal(coord.points, other.points) - if candidate_axis: - # Ensure both have equal availability of bounds. - result = (coord.core_bounds() is None) == (other.core_bounds() is None) - else: - if coord.core_bounds() is not None and other.core_bounds() is not None: - # Ensure equality of bounds. - result = array_equal(coord.core_bounds(), other.core_bounds()) - else: - # Ensure both have equal availability of bounds. - result = coord.core_bounds() is None and other.core_bounds() is None + # Ensure both have equal availability of bounds. + result = coord.has_bounds() == other.has_bounds() + if result and not candidate_axis: + # Ensure equality of bounds. + result = array_equal(coord.bounds, other.bounds) return result, candidate_axis - def candidate_axis(self, other): + def candidate_axis(self, other: "_CoordSignature") -> int | None: """Determine the candidate axis of concatenation with the given coordinate signature. If a candidate axis is found, then the coordinate @@ -693,13 +822,13 @@ def candidate_axis(self, other): # Only permit one degree of dimensional freedom when # determining the candidate axis of concatenation. if result and len(candidate_axes) == 1: - result = candidate_axes[0] + axis = candidate_axes[0] else: - result = None + axis = None - return result + return axis - def _calculate_extents(self): + def _calculate_extents(self) -> None: """Calculate the extent over each dimension coordinates points and bounds.""" self.dim_extents = [] for coord, order in zip(self.dim_coords, self.dim_order): @@ -761,6 +890,12 @@ def axis(self): """Return the nominated dimension of concatenation.""" return self._axis + @property + def name(self) -> str | None: + """Return the standard_name or long name.""" + metadata = self._cube_signature.defn + return metadata.standard_name or metadata.long_name + def concatenate(self): """Concatenate all the source-cubes registered with the :class:`_ProtoCube`. @@ -833,14 +968,15 @@ def concatenate(self): def register( self, - cube, - axis=None, - error_on_mismatch=False, - check_aux_coords=False, - check_cell_measures=False, - check_ancils=False, - check_derived_coords=False, - ): + cube: iris.cube.Cube, + hashes: dict[str, _ArrayHash], + axis: int | None = None, + error_on_mismatch: bool = False, + check_aux_coords: bool = False, + check_cell_measures: bool = False, + check_ancils: bool = False, + check_derived_coords: bool = False, + ) -> bool: """Determine if the given source-cube is suitable for concatenation. Determine if the given source-cube is suitable for concatenation @@ -913,73 +1049,56 @@ def register( msg = f"Found cubes with overlap on concatenate axis {candidate_axis}, skipping concatenation for these cubes" warnings.warn(msg, category=iris.warnings.IrisUserWarning) - # Check for compatible AuxCoords. - if match: - if check_aux_coords: - for coord_a, coord_b in zip( - self._cube_signature.aux_coords_and_dims, - cube_signature.aux_coords_and_dims, + def get_hash(array): + return hashes[array_id(array)] + + def get_hashes(coord): + result = [] + if hasattr(coord, "core_points"): + result.append(get_hash(coord.core_points())) + if coord.has_bounds(): + result.append(get_hash(coord.core_bounds())) + else: + result.append(get_hash(coord.core_data())) + return tuple(result) + + def check_coord_match(coord_type): + for coord_a, coord_b in zip( + getattr(self._cube_signature, coord_type), + getattr(cube_signature, coord_type), + ): + # Coordinates that span the candidate axis can differ + if ( + candidate_axis not in coord_a.dims + or candidate_axis not in coord_b.dims ): - # AuxCoords that span the candidate axis can differ - if ( - candidate_axis not in coord_a.dims - or candidate_axis not in coord_b.dims - ): - if not coord_a == coord_b: - match = False + if coord_a.dims != coord_b.dims: + return False + if get_hashes(coord_a.coord) != get_hashes(coord_b.coord): + return False + return True + + # Check for compatible AuxCoords. + if match and check_aux_coords: + match = check_coord_match("aux_coords_and_dims") # Check for compatible CellMeasures. - if match: - if check_cell_measures: - for coord_a, coord_b in zip( - self._cube_signature.cell_measures_and_dims, - cube_signature.cell_measures_and_dims, - ): - # CellMeasures that span the candidate axis can differ - if ( - candidate_axis not in coord_a.dims - or candidate_axis not in coord_b.dims - ): - if not coord_a == coord_b: - match = False + if match and check_cell_measures: + match = check_coord_match("cell_measures_and_dims") # Check for compatible AncillaryVariables. - if match: - if check_ancils: - for coord_a, coord_b in zip( - self._cube_signature.ancillary_variables_and_dims, - cube_signature.ancillary_variables_and_dims, - ): - # AncillaryVariables that span the candidate axis can differ - if ( - candidate_axis not in coord_a.dims - or candidate_axis not in coord_b.dims - ): - if not coord_a == coord_b: - match = False + if match and check_ancils: + match = check_coord_match("ancillary_variables_and_dims") # Check for compatible derived coordinates. - if match: - if check_derived_coords: - for coord_a, coord_b in zip( - self._cube_signature.derived_coords_and_dims, - cube_signature.derived_coords_and_dims, - ): - # Derived coords that span the candidate axis can differ - if ( - candidate_axis not in coord_a.dims - or candidate_axis not in coord_b.dims - ): - if not coord_a == coord_b: - match = False + if match and check_derived_coords: + match = check_coord_match("derived_coords_and_dims") if match: # Register the cube as a source-cube for this proto-cube. self._add_skeleton(coord_signature, cube.lazy_data()) # Declare the nominated axis of concatenation. self._axis = candidate_axis - - if match: # If the protocube dimension order is constant (indicating it was # created from a cube with a length 1 dimension coordinate) but # a subsequently registered cube has a non-constant dimension diff --git a/lib/iris/tests/unit/concatenate/test_hashing.py b/lib/iris/tests/unit/concatenate/test_hashing.py new file mode 100644 index 0000000000..3f3483faff --- /dev/null +++ b/lib/iris/tests/unit/concatenate/test_hashing.py @@ -0,0 +1,39 @@ +# Copyright Iris contributors +# +# This file is part of Iris and is released under the BSD license. +# See LICENSE in the root of the repository for full licensing details. +"""Test array hashing in :mod:`iris._concatenate`.""" + +import dask.array as da +import numpy as np +import pytest + +from iris import _concatenate + + +@pytest.mark.parametrize( + "a,b,eq", + [ + (np.arange(2), da.arange(2), True), + (np.array([1], dtype=np.float32), np.array([1], dtype=bool), True), + (np.ma.array([1, 2], mask=[0, 1]), np.ma.array([1, 2], mask=[0, 1]), True), + (np.ma.array([1, 2], mask=[0, 1]), np.ma.array([1, 2], mask=[0, 0]), False), + (da.arange(6).reshape((2, 3)), da.arange(6, chunks=1).reshape((2, 3)), True), + (da.arange(20, chunks=1), da.arange(20, chunks=2), True), + ( + da.ma.masked_array([1, 2], mask=[0, 1]), + da.ma.masked_array([1, 2], mask=[0, 1]), + True, + ), + ], +) +def test_compute_hashes(a, b, eq): + hashes = _concatenate._compute_hashes([a, b]) + assert eq == (hashes[_concatenate.array_id(a)] == hashes[_concatenate.array_id(b)]) + + +def test_arrayhash_incompatible_chunks_raises(): + hash1 = _concatenate._ArrayHash(1, chunks=(1, 1)) + hash2 = _concatenate._ArrayHash(1, chunks=(2,)) + with pytest.raises(ValueError): + hash1 == hash2