Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poor performance with concatenating and saving netCDF files #4448

Closed
hdyson opened this issue Dec 3, 2021 · 7 comments
Closed

Poor performance with concatenating and saving netCDF files #4448

hdyson opened this issue Dec 3, 2021 · 7 comments

Comments

@hdyson
Copy link
Contributor

hdyson commented Dec 3, 2021

🐛 Bug Report

We have mutliple netCDF files that need to be concatenated together and saved. Typically, this is about 80 files of 5Gb each. The load, concatenate and save takes multiple days until the job is killed (so we don't know if this is an infinite loop, or if more time would have resulted in the task completing). Below, I've included the script to do the concatenation and save, and a script to generate sample data which is analogous to the real data we're working with.

How To Reproduce

Steps to reproduce the behaviour:

  1. Generate sample data with this script (you may need to adjust the destination filename in line 74) - this takes ~15 minutes:
#!/usr/bin/env python
"""
Generates suitable sample data to demonstrate issue where loading multiple
files, concatenating, and saving has unexpected performance.

"""

import itertools
import os

import iris
import numpy as np


def _build_coordinate(start, stop, number_of_points, name):
    coordinate = iris.coords.DimCoord(np.linspace(start, stop, number_of_points))
    coordinate.guess_bounds()
    coordinate.units = "degrees"
    coordinate.rename(name)
    return coordinate


def generate_cubes(times=11280, longitudes=2560, latitudes=1920, x_split=8, y_split=10):
    """
    Generates cubes that tesselate the globe.

    Specifically, each individual cube has all times, but contains a longitude
    range of longitudes/x_split  (and similarly for latitude), where all of
    the cubes combined cover the range -180 to +180 degrees longitude, and
    -90 to +90 degrees latitude.

    All cubes are contiguously chunked.

    longitudes needs to be multiple of x_split; latitudes needs to be a
    multiple of y_split.  These are not checked.

    """
    tc = iris.coords.DimCoord(
        np.arange(times), units="days since epoch", standard_name="time"
    )
    delta_lon = 360.0 / longitudes
    delta_lat = 180.0 / latitudes
    xc = _build_coordinate(
        -180.0 + delta_lon / 2.0, 180.0 - delta_lon / 2.0, longitudes, "longitude"
    )
    yc = _build_coordinate(
        -90.0 + delta_lat / 2.0, 90.0 - delta_lat / 2.0, latitudes, "latitude"
    )
    # Check coordinates cover globe:
    assert np.min(xc.bounds) == -180.0
    assert np.max(xc.bounds) == 180.0
    assert np.min(yc.bounds) == -90.0
    assert np.max(yc.bounds) == 90.0
    x_points_per_cube = longitudes // x_split
    y_points_per_cube = latitudes // y_split
    for x_index, y_index in itertools.product(range(x_split), range(y_split)):
        subset_x = xc[x_index * x_points_per_cube : (x_index + 1) * x_points_per_cube]
        subset_y = yc[y_index * y_points_per_cube : (y_index + 1) * y_points_per_cube]
        x_points = len(subset_x.points)
        y_points = len(subset_y.points)
        data = np.arange(times * x_points * y_points).reshape(times, x_points, y_points)
        print(data.shape)
        cube = iris.cube.Cube(data)
        cube.rename("sample_data")
        cube.add_dim_coord(tc, 0)
        cube.add_dim_coord(subset_x, 1)
        cube.add_dim_coord(subset_y, 2)
        yield cube


def save_cubes(cubes):
    """Given an iterator of cubes, saves them in sequentially numbered files."""
    for index, cube in enumerate(cubes):
        filename = os.path.join(
            os.path.expandvars("$SCRATCH"), "iris_example", f"temp_{index}.nc"
        )
        iris.save(cube, filename)


def main():
    cubes = generate_cubes()
    save_cubes(cubes)


if __name__ == "__main__":
    main()
  1. Run this script to demonstrate the actual issue (again, filenames may need to be adjusted):
import iris
import os

def main():
    filename_input = os.path.join(
        os.path.expandvars("$SCRATCH"), "iris_example", "temp_*.nc"
    )
    filename_output = os.path.join(
        os.path.expandvars("$SCRATCH"), "iris_example", "concatenated.nc"
    )
    source_cubes = iris.load(filename_input)
    cubes = iris.cube.CubeList(source_cubes)
    cubes = cubes.concatenate()
    iris.save(cubes, filename_output)


if __name__ == "__main__":
    main()

Expected behaviour

Saving to complete in a sensible time.

Environment

  • OS & Version: RHEL7
  • Iris Version: 2.3, but also observed with iris 2.4 (and possibly later - @bjlittle was investigating)

Additional context

For the real world case that led to this example, we do have control over how the individual 80 files are saved - so if there's more sensible chunk sizes we should use for the netCDFs to workaround this problem, this would be a solution for us.

@bjlittle
Copy link
Member

bjlittle commented Dec 3, 2021

Thanks for taking the time to report this @hdyson. Much appreciated 👍

I can confirm that the issue here isn't concatenation, it's saving.

@pp-mo It would be good to see if changes to saving from recent UGRID work has improved this situation

@pp-mo
Copy link
Member

pp-mo commented Dec 3, 2021

👍 great example code ! It will really help to assess performance.

Also great timing...
I can't promise anything specific, but it does sound very close to my current task ...
which is testing performance of the new unstructured 'combine' operation.
So, I'm spending some time trying to work out what makes these types of operation go well or badly, and I think it should easily apply to this too.

@hdyson
Copy link
Contributor Author

hdyson commented Dec 6, 2021

My best guess is that it's a dask/netCDF chunking issue - but that is a fairly speculative guess (and, frankly, investigating that in more detail is a bit beyond my dask knowledge). Notice that there are a lot of times in the sample data, since the real data is 30 years of daily data.

@pp-mo
Copy link
Member

pp-mo commented Feb 9, 2022

Just linking related issues : This might help #4572

@hdyson
Copy link
Contributor Author

hdyson commented Feb 9, 2022

So if I'm interpreting #4572 and our earlier conversation correctly, the issue here is that what we're wanting to happen is:

for each time:
    for each tile:
        load tile data for current time
    concatenate all tiles for current time
    append concatenated data for current time to output file

whereas what is actually going on under the hood is:

for each time:
    for each tile:
        load tile data for all times
        slice out current time
    concatenate all tiles for current time
    append concatenated data for current time to output file

(I initially wrote this with the slice after the concatenate - I don't think this can be the case, though, since that would imply loading all the data at once which would exceed the memory allocation and the job would be killed long before the 3 day timeout.)

Hence there's a lot of unneccessary I/O. And the user controlled chunking operations being added in #4572 should mean that we can do the load for each time in turn rather than loading all the times from all the tiles for each time?

@hdyson
Copy link
Contributor Author

hdyson commented Apr 1, 2022

Following advice from @pp-mo, rechunking the data so that the innermost dimension is a single chunk seems to resolve this issue for our use case. Adding two lines to the main() from the original comment:

def concatenate_and_save():
    filename_input = os.path.join(
        os.path.expandvars("$SCRATCH"), "iris_example", "temp_*.nc"
    )
    filename_output = os.path.join(
        os.path.expandvars("$SCRATCH"), "iris_example", "concatenated.nc"
    )
    source_cubes = iris.load(filename_input)
    cubes = iris.cube.CubeList(source_cubes)
    cubes = cubes.concatenate()

    # These are the additional two lines:
    for cube in cubes:
        cube.data = cube.core_data().rechunk({0: "auto", 1: "auto", 2: -1})

    iris.save(cubes, filename_output)

is sufficient to get the save time down to around 1 hour. For a concatenation of ~0.5Tb of data in 80 files to a single file of ~0.5Tb, that doesn't seem excessive to me.

I think this gives us a workaround we can use in ANTS, so I'm happy to close this issue. If more thorough testing reveals that this is isn't sufficient, I can re-open it. It would be great if the eventual solution in #4572 either removed the need for this workaround, or at least didn't break the workaround 😃

@hdyson hdyson closed this as completed Apr 1, 2022
@pp-mo
Copy link
Member

pp-mo commented Mar 27, 2024

Also noted : without / with the above rechunk fix, @hdyson finds that

  • (without rechunk) Iris 2.3 takes hours + times out and v little memory : with rechunk --> ~2 hours and 2Gb
  • (without rechunk) Iris 3.7.1 likewise fails + times out : with rechunk --> 7 mins (!) and 60Gb

All run on 1cpu / 2 cores only (with control of workers/cpus both numpy and Dask)

Don't really know what the time cost is, but this is on SPICE where there is no swapfile space, so it should simply fail if exceeding memory, so it can't be that

Interest here from 2 points :

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants