Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support lazy saving #4190

Closed
bouweandela opened this issue Jun 11, 2021 · 13 comments · Fixed by #5191
Closed

Support lazy saving #4190

bouweandela opened this issue Jun 11, 2021 · 13 comments · Fixed by #5191
Assignees
Labels
Feature: ESMValTool Peloton 🚴‍♂️ Target a breakaway issue to be caught and closed by the peloton

Comments

@bouweandela
Copy link
Member

bouweandela commented Jun 11, 2021

✨ Feature Request

The iris.save function only supports saving a single file at a time and is not lazy. However, the dask.array.store function that backs the NetCDF saver supports delayed saving. For our use case, it would be computationally more efficient to have this supported by iris.save. Would it be possible to allow providing the compute=False option to the current iris.save function, so instead of saving directly it returns a dask.delayed.Delayed object that can be computed at a later time?

Alternatively, having a save function in iris that allows saving a list of cubes to a list of files, one cube per file (also similar to da.store but then working for cubes) would also work for us.

Motivation

In our case, multi model statistics in ESMValTool, we are interested in computing statistics (e.g. mean, median) over a number of climate models (cubes). Before we can compute those statistics, we need to load the data from disk and regrid the cubes to the same horizontal grid (and optionally to the same vertical levels). Then we merge all cubes into a single cube with a 'model' dimension and collapse along that dimension using e.g. iris.analysis.MEAN to compute the mean.

We want to store both the regridded input cubes and the cube(s) containing the statistics, each cube in it's own netCDF file according to the CMIP/CMOR conventions. Because iris.save only allows saving a single cube to a single file and is immediately executed, the load and regrid needs to be executed (1 + the number of statistics) times. Having support for delayed saving (or saving a list of cubes to a matching list of files) would save computational time, because the regridded chunks can be re-used for computing each statistic (as well as for storing the regridded cube), so we only need to load and regrid the chunk once.

Additional context

Example script that shows the use case

This is an example script that demonstrates our workflow and how we could use the requested save function to speed up the multi-model statistics computation. Note that the script uses lazy multi-model statistics, which are still in development in ESMValGroup/ESMValCore#968.

import os
import sys

import dask
import dask.array as da
import iris
from netCDF4 import Dataset

from esmvalcore.preprocessor import multi_model_statistics, regrid


def save(cube, target, compute):
    """Save the data from a 3D cube to file using da.store."""
    dataset = Dataset(target, "w")
    dataset.createDimension("time", cube.shape[0])
    dataset.createDimension("lat", cube.shape[1])
    dataset.createDimension("lon", cube.shape[2])
    dataset.createVariable(
        "var",
        "f4",
        (
            "time",
            "lat",
            "lon",
        ),
    )

    return da.store(cube.core_data(), dataset["var"], compute=compute)


def main(in_filenames):
    """Compute multi-model statistics over the input files."""
    target_grid = "1x1"
    cubes = {}
    for in_filename in in_filenames:
        cube = iris.load_cube(in_filename)
        cube = regrid(cube, target_grid, scheme="linear")
        out_filename = os.path.basename(in_filename)
        cubes[out_filename] = cube

    statistics = multi_model_statistics(cubes.values(), "overlap", ["mean", "std_dev"])
    for statistic, cube in statistics.items():
        out_filename = statistic + ".nc"
        cubes[out_filename] = cube

    results = []
    for out_filename, cube in cubes.items():
        result = save(cube, out_filename, compute=False)
        results.append(result)

    dask.compute(results)

    # for out_filename, cube in cubes.items():
    #     iris.save(cube, out_filename)


if __name__ == "__main__":
    # This script takes a list of netCDF files containing 3D variables as arguments
    main(sys.argv[1:])
@rcomer
Copy link
Member

rcomer commented Jun 11, 2021

I did not know such functionality existed, but I can already think of an example in our operational post-processing that this might help:

We load a large cube from many files (forecast_reference_time, year, realization, latitude, longitude), then loop through years to make leave-one-out subcubes (i.e n_year subcubes containing n_year - 1 years each). For each subcube we calculate percentiles and save. Since the subsetting is lazy, the process would go back to the many files for each pass through the loop, which was somewhat inefficient. I recently added a context manager to save and reload the large cube to/from a temporary file, which sped things up considerably.

If I've understood, lazy saving would make my context manager redundant - is that right? Assuming we also get #3901 in.

@rcomer
Copy link
Member

rcomer commented Jun 11, 2021

I also have code that calculates some statistic and both saves it to file and creates a plot. It took longer than it should have because the statistic got calculated twice: once for the file and once for the plot.

@bouweandela
Copy link
Member Author

If I've understood, lazy saving would make my context manager redundant - is that right?

Yes, I think so.

I also have code that calculates some statistic and both saves it to file and creates a plot.

This would only improve if there was some way to save the plot lazily in addition to lazy saving of the NetCDF file. However, if the data is small by the time you're plotting it (after all, how much information fits in a single plot, no more than the number of pixels right?) then you could realize the data before saving to avoid the double calculation.

@rcomer
Copy link
Member

rcomer commented Jun 15, 2021

you could realize the data before saving to avoid the double calculation

Thanks, this is what I am currently doing (since I realised why it was taking so long).

@pp-mo pp-mo self-assigned this Jun 16, 2021
@pp-mo pp-mo added this to Backlog in Peloton via automation Jun 16, 2021
@pp-mo
Copy link
Member

pp-mo commented Jun 17, 2021

I'm looking into this, but I'm rather struggling to see how the lazy solutions would work in practice.

In @rcomer examples, the calculations require the data, which is available in a lazy form, but the only way to avoid reading the data multiple times is to perform all the calculations in parallel. Which you can do with 'co_realize_cubes', in case you didn't know ?
N.B. this approach assumes that the calculation results are small enough to be all realised, though the inputs are too large.

The case for "lazy saves" sounds a bit different, but I guess the same is true : you can only avoid re-calculation if you get Dask to do it all at once. In my experience, this doesn't actually always work as you might expect / hope !
( Just anecdotally, I have found that, at least as regards memory usage, da.store seems more frugal than da.compute in many cases. )

The 'save to individual files' option sounds easier to achieve, but again if the sources are lazy with a common computation factor (the regridding, in this case), you can only improve if it is all done in parallel. So, that would also mean "lazy saves" (using deferreds, as you have described), at least "inside" the ordinary save operation.
@bouweandela how would saving to individual files help in itself ?

@rcomer
Copy link
Member

rcomer commented Jun 17, 2021

you can do with 'co_realize_cubes', in case you didn't know ?

I did not, thanks!

@pp-mo
Copy link
Member

pp-mo commented Jun 17, 2021

P.S. I looked into supporting a 'deferred save' within Iris save operations once before,
but we decided we did not have a killer need for it.

If we support this, there is also a question of how we control open/close of the datasets.
Especially relevant if we are saving e.g. multiple statistics to the same file (not so much if they save to different ones).
It may be easier+quicker, or even necessary, to allow "save" to work with provided, open files or netCDF4.Datasets, instead of just filepaths.
Of course there is a lot of file "structure" to be written as well as just data, including other array-containing components (coordinates). This can in principle be separated from the main "data array" writing. The possibilities are rather complicated.

@rcomer rcomer added the Peloton 🚴‍♂️ Target a breakaway issue to be caught and closed by the peloton label Aug 25, 2021
@pp-mo
Copy link
Member

pp-mo commented Apr 30, 2022

Hi @bouweandela .
We at last have a bit more time here to consider the more complex outstanding issues + requests.
So now, just wanting to confirm that this is still wanted, in some form ?


If so, we really need to discuss what the interface might look like ?

I would expect iris.save to return a set of 'dask.delayed' objects, analagous to the results of da.store(arrays, vars, compute=False) (which is doubtless how we would implement it).
The problem is, while we can fairly easily produce a 'delayed' to write results into a cube-data variable, it's not so clear at what point the file dimensions + variables are created, nor at what point subsidiary components are written -- i.e. those other (potentially large) variables for which we already use the 'store' operation : aux-coords / ancillary-variables / cell-measures.

What seems most natural to me is ...

  • call as save(cubes, filename, compute=False) (or similar)
  • this creates the dataset (file) and all variables, but defers writing of the content of the potentially large multidimensional variables : i.e. cube data, aux-coords, cell measures, ancillaries.
  • 'save' returns one 'dask.delayed' per cube
  • each 'delayed.compute()' fills in a cube.

The "simple" part of that is that it returns only one 'delayed' per saved cube, which is easy to interpret.
However, practically it really isn't that simple, because each 'delayed' does more than one thing, and potentially must combine the compute operation for several different variables.
There is also a complication regarding when shared variables get written, e.g. an aux-coord shared between several cubes (*) .

The alternative would be to return a delayed per target variable, but these would then need to be identified somehow, i.e. referred back to the Iris objects.
Maybe a simple dict mapping the actual iris object (cube, coord, cell-measure etc) to a 'delayed' -- even then, for a coord, we probably need to provide separate points and bounds objects.

We should probably also consider possible (future) use with parallel-netcdf implementations,
Which I personally know ~0 about !

@bouweandela I'd be interested to hear your ideas on what is useful in such an API, and how you anticipate using it.

@pp-mo
Copy link
Member

pp-mo commented Apr 30, 2022

(*) ref from previous
Extra note
There are also some awkward questions here about the sharing of AuxCoords (and similar components) between cubes.

  • It is possible for 2 cubes to both contain the exact same AuxCoord, in which case the saver code will only create one target variable ✔️
  • the saver will also create only one target variable when two cubes have different AuxCoords which test equal
    -- but in that case, it must compare them to see if the new (target) variable was already created.
  • If you have loaded two cubes from netcdf data-variables which share an auxiliary coordinate, each will have its own AuxCoord -- though both will contain lazy arrays which refer to the same file variable. ❓
  • but I fear that, if you then save those cubes, the save code will ask for the data twice, and compare those two arrays ❌

This is a basically a problem caused by Iris' obsession with copying things ! See #3172

@bouweandela
Copy link
Member Author

bouweandela commented Jun 28, 2022

Thank you for getting back to this issue @pp-mo! My apologies it took me so long to respond. I did not have time to work on this, but I found some funding and will have more time from now on.

I'd be interested to hear your ideas on what is useful in such an API, and how you anticipate using it.

Here is a (heavily simplified) example of how I would like to use this:

import sys
from pathlib import Path

import dask.distributed
import dask_jobqueue
import dask
import iris
import xarray

def compute_example(in_file, out_file):
    cube = iris.load_cube(in_file)
    # Simplistic example computation
    cube = cube.collapsed(['latitude', 'longitude'], iris.analysis.MEAN)
    result = iris.save(cube, out_file, compute=False)
    # This already works:
    # ds = xarray.DataArray.from_iris(cube)
    # result = ds.to_netcdf(out_file, compute=False)
    return result

def main(in_files):
    # start a dask cluster, e.g.
    # cluster = dask.distributed.LocalCluster(processes=False)
    # or when on Jasmin/Levante/etc:
    cluster = dask_jobqueue.SLURMCluster(
        cores=8,
        memory='24GiB',
    )
    cluster.scale(10)
    # or a cluster in the cloud once we are able to use zarr

    print(cluster.dashboard_link)

    client = dask.distributed.Client(cluster)

    results = []
    for i, in_file in enumerate(in_files):
        out_file = Path(f'result{i}.nc')
        result = compute_example(in_file, out_file)
        results.append(result)

    dask.compute(results)


if __name__ == '__main__':
    main(sys.argv[1:])

The script above takes a list of netcdf files as input, load the data and compute some statistics and then write the result again to a NetCDF file.

A lazy saving feature like this is already available in the xarray.Dataset.to_netcdf method. Unfortunately, all the coordinate bounds are lost when using this, but the data looks OK.

Our real use case would be ESMValCore, where we typically have hundreds of input NetCDF files that we run some computations on and then we write the results again to NetCDF files.

A simplified example of our current workflow is:

import sys
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path

import iris


def compute_example(in_file, out_file):
    cube = iris.load_cube(in_file)
    cube = cube.collapsed(["latitude", "longitude"], iris.analysis.MEAN)
    iris.save(cube, out_file)


def main(in_files):

    with ProcessPoolExecutor() as executor:
        futures = {}
        for i, in_file in enumerate(in_files):
            out_file = Path(f"result{i}.nc")
            future = executor.submit(compute_example, in_file, out_file)
            futures[future] = in_file

        for future in as_completed(futures):
            print("Saved ", futures[future])
            future.result()


if __name__ == "__main__":
    main(sys.argv[1:])

This is not the correct way to parallelize using dask, but at the moment I see no other option with iris.

@bouweandela
Copy link
Member Author

We mostly work with CMIP-like data, so the NetCDF files we work with are typically relatively simple:

  • Files only contain a single variable
  • Cell measures and ancillary coordinates are stored in separate files

so all the complicated cases you mention are not applicable to our use case.

@pp-mo
Copy link
Member

pp-mo commented Jun 29, 2022

Here is a (heavily simplified) example of how I would like to use this:

Thanks @bouweandela this is really useful progress.

I can see what is done here, but I'm still a bit vague as to how this actually helps with control/performance
-- i.e. what is the key difference from just letting dask "do it's thing" with all these saves ?
( e.g. maybe sharing between the results, computing in parallel ? I believe that can be anyway achieved with dask bags )

I believe we have a remote discussion meeting this Friday (July 1st) ?
so we can hopefully have a dialog about this there.

BTW I'm also currently thinking that the "netcdf-xarray-wrapper" approach I've been investigating might also possibly have value here.

@bouweandela
Copy link
Member Author

I can see what is done here, but I'm still a bit vague as to how this actually helps with control/performance
-- i.e. what is the key difference from just letting dask "do it's thing" with all these saves ?

There are two issues that we run into with the current approach really:

  1. It is slow
  2. In some cases it requires more memory than what is available in a single computer

Regarding point 1): I'm not sure exactly why this is, but I have some ideas. For example: most file systems support parallel writes and I suspect the current iris.save function is not making optimal use of that capability. If there is a bottleneck at writing the output, this stalls the whole computation and leads to limited parallelism for the computation.

Regarding point 2): this may actually get worse when we build one big complicated graph and then try to compute it, but there is some hope that using a distributed scheduler would help because it makes it possible to access more memory than there is in a single node and distributed schedulers seem to be recommended, so they might be smarter at how they manage memory use, but I would need to try it to see if that is actually the case.

( e.g. maybe sharing between the results, computing in parallel ? I believe that can be anyway achieved with dask bags )

Indeed sharing between the results and computing in parallel are things we hope to achieve. Do you have a suggestion of how this could be implemented with dask bags?

BTW I'm also currently thinking that the "netcdf-xarray-wrapper" approach I've been investigating might also possibly have value here.

That certainly looks interesting, I will have a look.

I believe we have a remote discussion meeting this Friday (July 1st) ?
so we can hopefully have a dialog about this there.

Thanks for the informative meeting and see you today for another one!

Peloton automation moved this from Backlog to Done Apr 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Feature: ESMValTool Peloton 🚴‍♂️ Target a breakaway issue to be caught and closed by the peloton
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

4 participants