From f9e0106bcc3ce41cbb40f1a90a9fae7763048198 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Thu, 18 Apr 2024 10:25:13 +0200 Subject: [PATCH 1/7] Simplify concatenate --- lib/iris/_concatenate.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index 8d929c2af2..be953f3437 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -4,7 +4,7 @@ # See LICENSE in the root of the repository for full licensing details. """Automatic concatenation of multiple cubes over one or more existing dimensions.""" -from collections import defaultdict, namedtuple +from collections import namedtuple import warnings import dask.array as da @@ -326,15 +326,13 @@ def concatenate( A :class:`iris.cube.CubeList` of concatenated :class:`iris.cube.Cube` instances. """ - proto_cubes_by_name = defaultdict(list) + proto_cubes = [] # Initialise the nominated axis (dimension) of concatenation # which requires to be negotiated. axis = None # Register each cube with its appropriate proto-cube. for cube in cubes: - name = cube.standard_name or cube.long_name - proto_cubes = proto_cubes_by_name[name] registered = False # Register cube with an existing proto-cube. @@ -360,13 +358,11 @@ def concatenate( concatenated_cubes = iris.cube.CubeList() # Emulate Python 2 behaviour. - def _none_sort(item): - return (item is not None, item) + def _none_sort(proto_cube): + return (proto_cube.name is not None, proto_cube.name) - for name in sorted(proto_cubes_by_name, key=_none_sort): - for proto_cube in proto_cubes_by_name[name]: - # Construct the concatenated cube. - concatenated_cubes.append(proto_cube.concatenate()) + for proto_cube in sorted(proto_cubes, key=_none_sort): + concatenated_cubes.append(proto_cube.concatenate()) # Perform concatenation until we've reached an equilibrium. count = len(concatenated_cubes) @@ -761,6 +757,12 @@ def axis(self): """Return the nominated dimension of concatenation.""" return self._axis + @property + def name(self) -> str | None: + """Return the standard_name or long name.""" + metadata = self._cube_signature.defn + return metadata.standard_name or metadata.long_name + def concatenate(self): """Concatenate all the source-cubes registered with the :class:`_ProtoCube`. From 6b013529c2580e8183c48ca91dc5658c8d88186e Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Tue, 23 Apr 2024 15:52:42 +0200 Subject: [PATCH 2/7] First attempt at parallel concatenate --- lib/iris/_concatenate.py | 174 ++++++++++++++++++++++++++------------- lib/iris/_lazy_data.py | 11 ++- 2 files changed, 125 insertions(+), 60 deletions(-) diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index be953f3437..824ec66406 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -5,11 +5,16 @@ """Automatic concatenation of multiple cubes over one or more existing dimensions.""" from collections import namedtuple +import itertools import warnings +import dask import dask.array as da +from dask.base import tokenize import numpy as np +from xxhash import xxh3_64 +from iris._lazy_data import is_masked_data import iris.coords import iris.cube import iris.exceptions @@ -34,6 +39,35 @@ _INCREASING = 1 +def hash_array(a: da.Array | np.ndarray) -> np.int64: + def arrayhash(x): + value = xxh3_64(x.data.tobytes()) + if is_masked_data(x): + value.update(x.mask.tobytes()) + return np.frombuffer(value.digest(), dtype=np.int64) + + return da.reduction( + a, + chunk=lambda x, axis, keepdims: arrayhash(x).reshape((1,) * a.ndim), + combine=lambda x, axis, keepdims: arrayhash(x).reshape((1,) * a.ndim), + aggregate=lambda x, axis, keepdims: arrayhash(x)[0], + keepdims=False, + meta=np.empty(tuple(), dtype=np.int64), + dtype=np.int64, + ) + + +class ArrayHash: + def __init__(self, value: np.int64, chunks: tuple) -> None: + self.value = value + self.chunks = chunks + + def __eq__(self, other: "ArrayHash") -> bool: + if self.chunks != other.chunks: + raise ValueError("Unable to compare arrays with different chunks.") + return self.value == other.value + + class _CoordAndDims(namedtuple("CoordAndDims", ["coord", "dims"])): """Container for a coordinate and the associated data dimension(s). @@ -332,6 +366,49 @@ def concatenate( axis = None # Register each cube with its appropriate proto-cube. + arrays = [] + + # 1 collect list of arrays + for cube in cubes: + if check_aux_coords: + for coord in cube.aux_coords: + arrays.append(coord.core_points()) + if coord.has_bounds(): + arrays.append(coord.core_bounds()) + if check_derived_coords: + for coord in cube.derived_coords: + arrays.append(coord.core_points()) + if coord.has_bounds(): + arrays.append(coord.core_bounds()) + if check_cell_measures: + for var in cube.cell_measures(): + arrays.append(var.core_data()) + if check_ancils: + for var in cube.ancillary_variables(): + arrays.append(var.core_data()) + + # 2 unify chunks of arrays that have matching shape + hashes = {} + + def grouper(a): + return a.shape + + arrays.sort(key=grouper) + for _, group in itertools.groupby(arrays, key=grouper): + group = list(group) + indices = tuple(range(group[0].ndim))[::-1] + argpairs = [(a, indices) for a in group] + _, rechunked_group = da.core.unify_chunks(*itertools.chain(*argpairs)) + for array, rechunked in zip(group, rechunked_group): + hashes[dask.base.tokenize(array)] = ( + hash_array(rechunked), + rechunked.chunks, + ) + + # 3 compute hashes + (hashes,) = dask.compute(hashes) + hashes = {k: ArrayHash(*v) for k, v in hashes.items()} + for cube in cubes: registered = False @@ -339,6 +416,7 @@ def concatenate( for proto_cube in proto_cubes: registered = proto_cube.register( cube, + hashes, axis, error_on_mismatch, check_aux_coords, @@ -380,7 +458,7 @@ class _CubeSignature: """ - def __init__(self, cube): + def __init__(self, cube: iris.cube.Cube) -> None: """Represent the cube metadata and associated coordinate metadata. Parameters @@ -413,7 +491,7 @@ def __init__(self, cube): # # Collate the dimension coordinate metadata. # - for ind, coord in enumerate(self.dim_coords): + for coord in self.dim_coords: dims = cube.coord_dims(coord) metadata = _CoordMetaData(coord, dims) self.dim_metadata.append(metadata) @@ -836,6 +914,7 @@ def concatenate(self): def register( self, cube, + hashes, axis=None, error_on_mismatch=False, check_aux_coords=False, @@ -915,73 +994,56 @@ def register( msg = f"Found cubes with overlap on concatenate axis {candidate_axis}, skipping concatenation for these cubes" warnings.warn(msg, category=iris.warnings.IrisUserWarning) - # Check for compatible AuxCoords. - if match: - if check_aux_coords: - for coord_a, coord_b in zip( - self._cube_signature.aux_coords_and_dims, - cube_signature.aux_coords_and_dims, + def get_hash(array): + return hashes[tokenize(array)] + + def get_hashes(coord): + result = [] + if hasattr(coord, "core_points"): + result.append(get_hash(coord.core_points())) + if coord.has_bounds(): + result.append(get_hash(coord.core_bounds())) + else: + result.append(get_hash(coord.core_data())) + return tuple(result) + + def check_coord_match(coord_type): + for coord_a, coord_b in zip( + getattr(self._cube_signature, coord_type), + getattr(cube_signature, coord_type), + ): + # AuxCoords that span the candidate axis can differ + if ( + candidate_axis not in coord_a.dims + or candidate_axis not in coord_b.dims ): - # AuxCoords that span the candidate axis can differ - if ( - candidate_axis not in coord_a.dims - or candidate_axis not in coord_b.dims - ): - if not coord_a == coord_b: - match = False + if coord_a.dims != coord_b.dims: + return False + if get_hashes(coord_a.coord) != get_hashes(coord_b.coord): + return False + return True + + # Check for compatible AuxCoords. + if match and check_aux_coords: + match = check_coord_match("aux_coords_and_dims") # Check for compatible CellMeasures. - if match: - if check_cell_measures: - for coord_a, coord_b in zip( - self._cube_signature.cell_measures_and_dims, - cube_signature.cell_measures_and_dims, - ): - # CellMeasures that span the candidate axis can differ - if ( - candidate_axis not in coord_a.dims - or candidate_axis not in coord_b.dims - ): - if not coord_a == coord_b: - match = False + if match and check_cell_measures: + match = check_coord_match("cell_measures_and_dims") # Check for compatible AncillaryVariables. - if match: - if check_ancils: - for coord_a, coord_b in zip( - self._cube_signature.ancillary_variables_and_dims, - cube_signature.ancillary_variables_and_dims, - ): - # AncillaryVariables that span the candidate axis can differ - if ( - candidate_axis not in coord_a.dims - or candidate_axis not in coord_b.dims - ): - if not coord_a == coord_b: - match = False + if match and check_ancils: + match = check_coord_match("ancillary_variables_and_dims") # Check for compatible derived coordinates. - if match: - if check_derived_coords: - for coord_a, coord_b in zip( - self._cube_signature.derived_coords_and_dims, - cube_signature.derived_coords_and_dims, - ): - # Derived coords that span the candidate axis can differ - if ( - candidate_axis not in coord_a.dims - or candidate_axis not in coord_b.dims - ): - if not coord_a == coord_b: - match = False + if match and check_derived_coords: + match = check_coord_match("derived_coords_and_dims") if match: # Register the cube as a source-cube for this proto-cube. self._add_skeleton(coord_signature, cube.lazy_data()) # Declare the nominated axis of concatenation. self._axis = candidate_axis - - if match: # If the protocube dimension order is constant (indicating it was # created from a cube with a length 1 dimension coordinate) but # a subsequently registered cube has a non-constant dimension diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 40984248d1..66701871c0 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -34,11 +34,14 @@ def is_lazy_data(data): """Return whether the argument is an Iris 'lazy' data array. At present, this means simply a :class:`dask.array.Array`. - We determine this by checking for a "compute" property. """ - result = hasattr(data, "compute") - return result + return isinstance(data, da.Array) + + +def is_masked_data(a): + """Determine whether the argument is a masked array.""" + return isinstance(da.utils.meta_from_array(a), np.ma.MaskedArray) def is_lazy_masked_data(data): @@ -48,7 +51,7 @@ def is_lazy_masked_data(data): underlying array is of masked type. Otherwise return False. """ - return is_lazy_data(data) and ma.isMA(da.utils.meta_from_array(data)) + return is_lazy_data(data) and is_masked_data(data) @lru_cache From 35facc78df5ae781de01a1b6cc8e9f115dee86bf Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Thu, 25 Apr 2024 17:57:48 +0200 Subject: [PATCH 3/7] Clean up a bit --- lib/iris/_concatenate.py | 183 +++++++++++++++++++++++---------------- 1 file changed, 109 insertions(+), 74 deletions(-) diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index 824ec66406..41ac806be7 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -6,11 +6,11 @@ from collections import namedtuple import itertools +from typing import Iterable import warnings import dask import dask.array as da -from dask.base import tokenize import numpy as np from xxhash import xxh3_64 @@ -39,35 +39,6 @@ _INCREASING = 1 -def hash_array(a: da.Array | np.ndarray) -> np.int64: - def arrayhash(x): - value = xxh3_64(x.data.tobytes()) - if is_masked_data(x): - value.update(x.mask.tobytes()) - return np.frombuffer(value.digest(), dtype=np.int64) - - return da.reduction( - a, - chunk=lambda x, axis, keepdims: arrayhash(x).reshape((1,) * a.ndim), - combine=lambda x, axis, keepdims: arrayhash(x).reshape((1,) * a.ndim), - aggregate=lambda x, axis, keepdims: arrayhash(x)[0], - keepdims=False, - meta=np.empty(tuple(), dtype=np.int64), - dtype=np.int64, - ) - - -class ArrayHash: - def __init__(self, value: np.int64, chunks: tuple) -> None: - self.value = value - self.chunks = chunks - - def __eq__(self, other: "ArrayHash") -> bool: - if self.chunks != other.chunks: - raise ValueError("Unable to compare arrays with different chunks.") - return self.value == other.value - - class _CoordAndDims(namedtuple("CoordAndDims", ["coord", "dims"])): """Container for a coordinate and the associated data dimension(s). @@ -315,6 +286,105 @@ class _CoordExtent(namedtuple("CoordExtent", ["points", "bounds"])): __slots__ = () +def _hash_array(a: da.Array | np.ndarray) -> np.int64: + """Calculate a hash representation of the provided array. + + Calculates a 64-bit non-cryptographic hash of the provided array, using + the extremely fast ``xxhash`` hashing algorithm, and returns the hexdigest + string representation of the hash. + + Note that the computed hash depends on how the array is chunked. + + Parameters + ---------- + a : + The array that requires to have its hexdigest calculated. + + Returns + ------- + np.int64 + The string hexadecimal representation of the item's 64-bit hash. + + """ + + def arrayhash(x): + value = xxh3_64(x.data.tobytes()) + if is_masked_data(x): + value.update(x.mask.tobytes()) + return np.frombuffer(value.digest(), dtype=np.int64) + + return da.reduction( + a, + chunk=lambda x, axis, keepdims: arrayhash(x).reshape((1,) * a.ndim), + combine=lambda x, axis, keepdims: arrayhash(x).reshape((1,) * a.ndim), + aggregate=lambda x, axis, keepdims: arrayhash(x)[0], + keepdims=False, + meta=np.empty(tuple(), dtype=np.int64), + dtype=np.int64, + ) + + +class _ArrayHash: + def __init__(self, value: np.int64, chunks: tuple) -> None: + self.value = value + self.chunks = chunks + + def __eq__(self, other: "_ArrayHash") -> bool: + if self.chunks != other.chunks: + raise ValueError("Unable to compare arrays with different chunks.") + return self.value == other.value + + +def _compute_hashes( + cubes: Iterable[iris.cube.Cube], + check_aux_coords: bool, + check_derived_coords: bool, + check_cell_measures: bool, + check_ancils: bool, +) -> dict[str, _ArrayHash]: + """Compute hashes for the arrays that will be compared.""" + arrays = [] + for cube in cubes: + if check_aux_coords: + for coord in cube.aux_coords: + arrays.append(coord.core_points()) + if coord.has_bounds(): + arrays.append(coord.core_bounds()) + if check_derived_coords: + for coord in cube.derived_coords: + arrays.append(coord.core_points()) + if coord.has_bounds(): + arrays.append(coord.core_bounds()) + if check_cell_measures: + for var in cube.cell_measures(): + arrays.append(var.core_data()) + if check_ancils: + for var in cube.ancillary_variables(): + arrays.append(var.core_data()) + + hashes = {} + + def get_shape(a): + return a.shape + + arrays.sort(key=get_shape) + for _, group in itertools.groupby(arrays, key=get_shape): + group = list(group) + # TODO: Unify dtype as the hash depends on the dtype + # Unify chunks as the hash depends on the chunks. + indices = tuple(range(group[0].ndim))[::-1] + argpairs = [(a, indices) for a in group] + _, rechunked_group = da.core.unify_chunks(*itertools.chain(*argpairs)) + for array, rechunked in zip(group, rechunked_group): + hashes[dask.base.tokenize(array)] = ( + _hash_array(rechunked), + rechunked.chunks, + ) + + result = dask.compute(hashes)[0] + return {k: _ArrayHash(*v) for k, v in result.items()} + + def concatenate( cubes, error_on_mismatch=False, @@ -366,48 +436,13 @@ def concatenate( axis = None # Register each cube with its appropriate proto-cube. - arrays = [] - - # 1 collect list of arrays - for cube in cubes: - if check_aux_coords: - for coord in cube.aux_coords: - arrays.append(coord.core_points()) - if coord.has_bounds(): - arrays.append(coord.core_bounds()) - if check_derived_coords: - for coord in cube.derived_coords: - arrays.append(coord.core_points()) - if coord.has_bounds(): - arrays.append(coord.core_bounds()) - if check_cell_measures: - for var in cube.cell_measures(): - arrays.append(var.core_data()) - if check_ancils: - for var in cube.ancillary_variables(): - arrays.append(var.core_data()) - - # 2 unify chunks of arrays that have matching shape - hashes = {} - - def grouper(a): - return a.shape - - arrays.sort(key=grouper) - for _, group in itertools.groupby(arrays, key=grouper): - group = list(group) - indices = tuple(range(group[0].ndim))[::-1] - argpairs = [(a, indices) for a in group] - _, rechunked_group = da.core.unify_chunks(*itertools.chain(*argpairs)) - for array, rechunked in zip(group, rechunked_group): - hashes[dask.base.tokenize(array)] = ( - hash_array(rechunked), - rechunked.chunks, - ) - - # 3 compute hashes - (hashes,) = dask.compute(hashes) - hashes = {k: ArrayHash(*v) for k, v in hashes.items()} + hashes = _compute_hashes( + cubes, + check_aux_coords=check_aux_coords, + check_cell_measures=check_cell_measures, + check_ancils=check_ancils, + check_derived_coords=check_derived_coords, + ) for cube in cubes: registered = False @@ -995,7 +1030,7 @@ def register( warnings.warn(msg, category=iris.warnings.IrisUserWarning) def get_hash(array): - return hashes[tokenize(array)] + return hashes[dask.base.tokenize(array)] def get_hashes(coord): result = [] @@ -1012,7 +1047,7 @@ def check_coord_match(coord_type): getattr(self._cube_signature, coord_type), getattr(cube_signature, coord_type), ): - # AuxCoords that span the candidate axis can differ + # Coordinates that span the candidate axis can differ if ( candidate_axis not in coord_a.dims or candidate_axis not in coord_b.dims From 6e883da02c7cd0360fa357a8ca0e87de105e9c71 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Fri, 26 Apr 2024 12:51:51 +0200 Subject: [PATCH 4/7] Add support for comparing different data types --- lib/iris/_concatenate.py | 93 ++++++++++--------- .../tests/unit/concatenate/test_hashing.py | 21 +++++ 2 files changed, 68 insertions(+), 46 deletions(-) create mode 100644 lib/iris/tests/unit/concatenate/test_hashing.py diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index 41ac806be7..ae7d4e4cc1 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -290,8 +290,7 @@ def _hash_array(a: da.Array | np.ndarray) -> np.int64: """Calculate a hash representation of the provided array. Calculates a 64-bit non-cryptographic hash of the provided array, using - the extremely fast ``xxhash`` hashing algorithm, and returns the hexdigest - string representation of the hash. + the fast ``xxhash`` hashing algorithm. Note that the computed hash depends on how the array is chunked. @@ -303,12 +302,13 @@ def _hash_array(a: da.Array | np.ndarray) -> np.int64: Returns ------- np.int64 - The string hexadecimal representation of the item's 64-bit hash. + The array's hash. """ def arrayhash(x): - value = xxh3_64(x.data.tobytes()) + value = xxh3_64(np.array(x.shape, dtype=np.uint).tobytes()) + value.update(x.data.tobytes()) if is_masked_data(x): value.update(x.mask.tobytes()) return np.frombuffer(value.digest(), dtype=np.int64) @@ -335,47 +335,34 @@ def __eq__(self, other: "_ArrayHash") -> bool: return self.value == other.value -def _compute_hashes( - cubes: Iterable[iris.cube.Cube], - check_aux_coords: bool, - check_derived_coords: bool, - check_cell_measures: bool, - check_ancils: bool, -) -> dict[str, _ArrayHash]: +def _compute_hashes(arrays: Iterable[np.ndarray | da.Array]) -> dict[str, _ArrayHash]: """Compute hashes for the arrays that will be compared.""" - arrays = [] - for cube in cubes: - if check_aux_coords: - for coord in cube.aux_coords: - arrays.append(coord.core_points()) - if coord.has_bounds(): - arrays.append(coord.core_bounds()) - if check_derived_coords: - for coord in cube.derived_coords: - arrays.append(coord.core_points()) - if coord.has_bounds(): - arrays.append(coord.core_bounds()) - if check_cell_measures: - for var in cube.cell_measures(): - arrays.append(var.core_data()) - if check_ancils: - for var in cube.ancillary_variables(): - arrays.append(var.core_data()) - hashes = {} - def get_shape(a): - return a.shape + def is_numerical(dtype): + return np.issubdtype(dtype, np.bool_) or np.issubdtype(dtype, np.number) - arrays.sort(key=get_shape) - for _, group in itertools.groupby(arrays, key=get_shape): + def group_key(a): + if is_numerical(a.dtype): + dtype = "numerical" + else: + dtype = str(a.dtype) + return a.shape, dtype + + arrays = sorted(arrays, key=group_key) + for _, group in itertools.groupby(arrays, key=group_key): group = list(group) - # TODO: Unify dtype as the hash depends on the dtype + # Unify dtype for numerical arrays, as the hash depends on it + if is_numerical(group[0].dtype): + dtype = np.result_type(*group) + same_dtype_arrays = [a.astype(dtype) for a in group] + else: + same_dtype_arrays = group # Unify chunks as the hash depends on the chunks. indices = tuple(range(group[0].ndim))[::-1] - argpairs = [(a, indices) for a in group] - _, rechunked_group = da.core.unify_chunks(*itertools.chain(*argpairs)) - for array, rechunked in zip(group, rechunked_group): + argpairs = [(a, indices) for a in same_dtype_arrays] + rechunked_arrays = da.core.unify_chunks(*itertools.chain(*argpairs))[1] + for array, rechunked in zip(group, rechunked_arrays): hashes[dask.base.tokenize(array)] = ( _hash_array(rechunked), rechunked.chunks, @@ -435,15 +422,29 @@ def concatenate( # which requires to be negotiated. axis = None - # Register each cube with its appropriate proto-cube. - hashes = _compute_hashes( - cubes, - check_aux_coords=check_aux_coords, - check_cell_measures=check_cell_measures, - check_ancils=check_ancils, - check_derived_coords=check_derived_coords, - ) + # Compute hashes for parallel array comparison. + arrays = [] + for cube in cubes: + if check_aux_coords: + for coord in cube.aux_coords: + arrays.append(coord.core_points()) + if coord.has_bounds(): + arrays.append(coord.core_bounds()) + if check_derived_coords: + for coord in cube.derived_coords: + arrays.append(coord.core_points()) + if coord.has_bounds(): + arrays.append(coord.core_bounds()) + if check_cell_measures: + for var in cube.cell_measures(): + arrays.append(var.core_data()) + if check_ancils: + for var in cube.ancillary_variables(): + arrays.append(var.core_data()) + hashes = _compute_hashes(arrays) + + # Register each cube with its appropriate proto-cube. for cube in cubes: registered = False diff --git a/lib/iris/tests/unit/concatenate/test_hashing.py b/lib/iris/tests/unit/concatenate/test_hashing.py new file mode 100644 index 0000000000..609661b5f7 --- /dev/null +++ b/lib/iris/tests/unit/concatenate/test_hashing.py @@ -0,0 +1,21 @@ +import dask.array as da +from dask.base import tokenize +import numpy as np +import pytest + +from iris import _concatenate + + +@pytest.mark.parametrize( + "a,b,eq", + [ + (np.arange(2), da.arange(2), True), + (np.array([1], dtype=np.float32), np.array([1], dtype=bool), True), + (np.array([1]), np.array([[1]]), False), + (np.ma.array([1, 2], mask=[0, 1]), np.ma.array([1, 2], mask=[0, 1]), True), + (da.arange(2, chunks=1), da.arange(2, chunks=2), True), + ], +) +def test_compute_hashes(a, b, eq): + hashes = _concatenate._compute_hashes([a, b]) + assert eq == (hashes[tokenize(a)].value == hashes[tokenize(b)].value) From add3f66dfc49aff91587906cb634edec2c1fe9be Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Wed, 15 May 2024 09:34:31 +0200 Subject: [PATCH 5/7] Undo unnessary change --- lib/iris/_lazy_data.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 14d70f77fc..b1f8e9aa85 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -35,9 +35,11 @@ def is_lazy_data(data): """Return whether the argument is an Iris 'lazy' data array. At present, this means simply a :class:`dask.array.Array`. + We determine this by checking for a "compute" property. """ - return isinstance(data, da.Array) + result = hasattr(data, "compute") + return result def is_masked_data(data: np.ndarray | da.Array) -> bool: From eb0b340a221bd1468c3de62ded5fe4afbea45fb6 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Wed, 15 May 2024 09:37:09 +0200 Subject: [PATCH 6/7] More tests --- .../tests/unit/concatenate/test_hashing.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/lib/iris/tests/unit/concatenate/test_hashing.py b/lib/iris/tests/unit/concatenate/test_hashing.py index 161355f09c..ee166011c9 100644 --- a/lib/iris/tests/unit/concatenate/test_hashing.py +++ b/lib/iris/tests/unit/concatenate/test_hashing.py @@ -17,11 +17,24 @@ [ (np.arange(2), da.arange(2), True), (np.array([1], dtype=np.float32), np.array([1], dtype=bool), True), - (np.array([1]), np.array([[1]]), False), (np.ma.array([1, 2], mask=[0, 1]), np.ma.array([1, 2], mask=[0, 1]), True), - (da.arange(2, chunks=1), da.arange(2, chunks=2), True), + (np.ma.array([1, 2], mask=[0, 1]), np.ma.array([1, 2], mask=[0, 0]), False), + (da.arange(6).reshape((2, 3)), da.arange(6, chunks=1).reshape((2, 3)), True), + (da.arange(20, chunks=1), da.arange(20, chunks=2), True), + ( + da.ma.masked_array([1, 2], mask=[0, 1]), + da.ma.masked_array([1, 2], mask=[0, 1]), + True, + ), ], ) def test_compute_hashes(a, b, eq): hashes = _concatenate._compute_hashes([a, b]) - assert eq == (hashes[tokenize(a)].value == hashes[tokenize(b)].value) + assert eq == (hashes[tokenize(a)] == hashes[tokenize(b)]) + + +def test_arrayhash_incompatible_chunks_raises(): + hash1 = _concatenate._ArrayHash(1, chunks=(1, 1)) + hash2 = _concatenate._ArrayHash(1, chunks=(2,)) + with pytest.raises(ValueError): + hash1 == hash2 From 24615a05b9e98b225ed9b92213a01ece7aed4f36 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Wed, 15 May 2024 10:10:27 +0200 Subject: [PATCH 7/7] Use faster lookup --- lib/iris/_concatenate.py | 132 ++++++++++-------- .../tests/unit/concatenate/test_hashing.py | 3 +- 2 files changed, 76 insertions(+), 59 deletions(-) diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index a1b006ff33..3676249c0f 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -5,17 +5,18 @@ """Automatic concatenation of multiple cubes over one or more existing dimensions.""" from collections import namedtuple +from collections.abc import Sequence import itertools -from typing import Iterable +from typing import Any, Iterable import warnings import dask import dask.array as da +from dask.base import tokenize import numpy as np -from xxhash import xxh3_64 +from xxhash import xxh3_64_digest from iris._lazy_data import concatenate as concatenate_arrays -from iris._lazy_data import is_masked_data import iris.coords import iris.cube import iris.exceptions @@ -308,11 +309,7 @@ def _hash_array(a: da.Array | np.ndarray) -> np.int64: """ def arrayhash(x): - value = xxh3_64(np.array(x.shape, dtype=np.uint).tobytes()) - value.update(x.data.tobytes()) - if is_masked_data(x): - value.update(x.mask.tobytes()) - return np.frombuffer(value.digest(), dtype=np.int64) + return np.frombuffer(xxh3_64_digest(x.tobytes()), dtype=np.int64) return da.reduction( a, @@ -325,20 +322,41 @@ def arrayhash(x): ) -class _ArrayHash: - def __init__(self, value: np.int64, chunks: tuple) -> None: - self.value = value - self.chunks = chunks +class _ArrayHash(namedtuple("ArrayHash", ["value", "chunks"])): + """Container for a hash value and the chunks used when computing it. - def __eq__(self, other: "_ArrayHash") -> bool: + Parameters + ---------- + value : :class:`np.int64` + The hash value. + chunks : tuple + The chunks the array had when the hash was computed. + """ + + __slots__ = () + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, self.__class__): + return NotImplemented if self.chunks != other.chunks: - raise ValueError("Unable to compare arrays with different chunks.") + raise ValueError( + "Unable to compare arrays with different chunks: " + f"{self.chunks} != {other.chunks}" + ) return self.value == other.value +def array_id(array: np.ndarray | da.Array) -> str: + """Get a deterministic token representing `array`.""" + if isinstance(array, np.ma.MaskedArray): + # Tokenizing a masked array is much slower than separately tokenizing + # the data and mask. + return tokenize((tokenize(array.data), tokenize(array.mask))) + return tokenize(array) + + def _compute_hashes(arrays: Iterable[np.ndarray | da.Array]) -> dict[str, _ArrayHash]: """Compute hashes for the arrays that will be compared.""" - hashes = {} def is_numerical(dtype): return np.issubdtype(dtype, np.bool_) or np.issubdtype(dtype, np.number) @@ -350,9 +368,11 @@ def group_key(a): dtype = str(a.dtype) return a.shape, dtype + hashes = {} + arrays = sorted(arrays, key=group_key) - for _, group in itertools.groupby(arrays, key=group_key): - group = list(group) + for _, group_iter in itertools.groupby(arrays, key=group_key): + group = list(group_iter) # Unify dtype for numerical arrays, as the hash depends on it if is_numerical(group[0].dtype): dtype = np.result_type(*group) @@ -364,23 +384,23 @@ def group_key(a): argpairs = [(a, indices) for a in same_dtype_arrays] rechunked_arrays = da.core.unify_chunks(*itertools.chain(*argpairs))[1] for array, rechunked in zip(group, rechunked_arrays): - hashes[dask.base.tokenize(array)] = ( + hashes[array_id(array)] = ( _hash_array(rechunked), rechunked.chunks, ) - result = dask.compute(hashes)[0] - return {k: _ArrayHash(*v) for k, v in result.items()} + hashes = dask.compute(hashes)[0] + return {k: _ArrayHash(*v) for k, v in hashes.items()} def concatenate( - cubes, - error_on_mismatch=False, - check_aux_coords=True, - check_cell_measures=True, - check_ancils=True, - check_derived_coords=True, -): + cubes: Sequence[iris.cube.Cube], + error_on_mismatch: bool = False, + check_aux_coords: bool = True, + check_cell_measures: bool = True, + check_ancils: bool = True, + check_derived_coords: bool = True, +) -> iris.cube.CubeList: """Concatenate the provided cubes over common existing dimensions. Parameters @@ -418,7 +438,7 @@ def concatenate( A :class:`iris.cube.CubeList` of concatenated :class:`iris.cube.Cube` instances. """ - proto_cubes = [] + proto_cubes: list[_ProtoCube] = [] # Initialise the nominated axis (dimension) of concatenation # which requires to be negotiated. axis = None @@ -718,7 +738,7 @@ def match(self, other, error_on_mismatch): class _CoordSignature: """Template for identifying a specific type of :class:`iris.cube.Cube` based on its coordinates.""" - def __init__(self, cube_signature): + def __init__(self, cube_signature: _CubeSignature) -> None: """Represent the coordinate metadata. Represent the coordinate metadata required to identify suitable @@ -737,7 +757,7 @@ def __init__(self, cube_signature): self.derived_coords_and_dims = cube_signature.derived_coords_and_dims self.dim_coords = cube_signature.dim_coords self.dim_mapping = cube_signature.dim_mapping - self.dim_extents = [] + self.dim_extents: list[_CoordExtent] = [] self.dim_order = [ metadata.kwargs["order"] for metadata in cube_signature.dim_metadata ] @@ -746,7 +766,10 @@ def __init__(self, cube_signature): self._calculate_extents() @staticmethod - def _cmp(coord, other): + def _cmp( + coord: iris.coords.DimCoord, + other: iris.coords.DimCoord, + ) -> tuple[bool, bool]: """Compare the coordinates for concatenation compatibility. Returns @@ -757,22 +780,17 @@ def _cmp(coord, other): """ # A candidate axis must have non-identical coordinate points. - candidate_axis = not array_equal(coord.core_points(), other.core_points()) + candidate_axis = not array_equal(coord.points, other.points) - if candidate_axis: - # Ensure both have equal availability of bounds. - result = (coord.core_bounds() is None) == (other.core_bounds() is None) - else: - if coord.core_bounds() is not None and other.core_bounds() is not None: - # Ensure equality of bounds. - result = array_equal(coord.core_bounds(), other.core_bounds()) - else: - # Ensure both have equal availability of bounds. - result = coord.core_bounds() is None and other.core_bounds() is None + # Ensure both have equal availability of bounds. + result = coord.has_bounds() == other.has_bounds() + if result and not candidate_axis: + # Ensure equality of bounds. + result = array_equal(coord.bounds, other.bounds) return result, candidate_axis - def candidate_axis(self, other): + def candidate_axis(self, other: "_CoordSignature") -> int | None: """Determine the candidate axis of concatenation with the given coordinate signature. If a candidate axis is found, then the coordinate @@ -804,13 +822,13 @@ def candidate_axis(self, other): # Only permit one degree of dimensional freedom when # determining the candidate axis of concatenation. if result and len(candidate_axes) == 1: - result = candidate_axes[0] + axis = candidate_axes[0] else: - result = None + axis = None - return result + return axis - def _calculate_extents(self): + def _calculate_extents(self) -> None: """Calculate the extent over each dimension coordinates points and bounds.""" self.dim_extents = [] for coord, order in zip(self.dim_coords, self.dim_order): @@ -950,15 +968,15 @@ def concatenate(self): def register( self, - cube, - hashes, - axis=None, - error_on_mismatch=False, - check_aux_coords=False, - check_cell_measures=False, - check_ancils=False, - check_derived_coords=False, - ): + cube: iris.cube.Cube, + hashes: dict[str, _ArrayHash], + axis: int | None = None, + error_on_mismatch: bool = False, + check_aux_coords: bool = False, + check_cell_measures: bool = False, + check_ancils: bool = False, + check_derived_coords: bool = False, + ) -> bool: """Determine if the given source-cube is suitable for concatenation. Determine if the given source-cube is suitable for concatenation @@ -1032,7 +1050,7 @@ def register( warnings.warn(msg, category=iris.warnings.IrisUserWarning) def get_hash(array): - return hashes[dask.base.tokenize(array)] + return hashes[array_id(array)] def get_hashes(coord): result = [] diff --git a/lib/iris/tests/unit/concatenate/test_hashing.py b/lib/iris/tests/unit/concatenate/test_hashing.py index ee166011c9..3f3483faff 100644 --- a/lib/iris/tests/unit/concatenate/test_hashing.py +++ b/lib/iris/tests/unit/concatenate/test_hashing.py @@ -5,7 +5,6 @@ """Test array hashing in :mod:`iris._concatenate`.""" import dask.array as da -from dask.base import tokenize import numpy as np import pytest @@ -30,7 +29,7 @@ ) def test_compute_hashes(a, b, eq): hashes = _concatenate._compute_hashes([a, b]) - assert eq == (hashes[tokenize(a)] == hashes[tokenize(b)]) + assert eq == (hashes[_concatenate.array_id(a)] == hashes[_concatenate.array_id(b)]) def test_arrayhash_incompatible_chunks_raises():