Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiprocessing of operations on cubes through decomposition and recombination #4752

Open
wjbenfold opened this issue May 19, 2022 · 10 comments

Comments

@wjbenfold
Copy link
Contributor

wjbenfold commented May 19, 2022

✨ Feature Request

Iris could offer a clear, tested, robust method for multiprocessing cubes by splitting them into pieces, multiprocessing the pieces and then stitching them back together.

Motivation

Working with a team who use Iris on their code aimed at doing exactly this, it didn't feel like a use case specific to their problems but something more general.

Additional context

Requirements:

  • Allow the user to specify how the splitting will happen (number of pieces, max data size and/or other sensible ways).
  • Handle halos (where calculations in a given tile relies on original data from neighbouring tiles) with user specification of how much of this is necessary.
  • Multiprocess in a fashion that respects constraints on the number of CPUs that the process can claim, without an explosion of threads/processes from each process.

Ideas:

  • We could offer a context manager that could be used to wrap operations applied to a cube
  • Temporary saving to disk might help for rechunking and passing files between processes.
@zklaus
Copy link
Contributor

zklaus commented May 20, 2022

It seems most of the goals here would best be achieved by better supporting the dask.distributed scheduler. This would not only allow for parallelism on one node, as multiprocessing does, but even across nodes. This is already possible to considerable extent, with a better I/O layer being the main stumbling block.

@pp-mo
Copy link
Member

pp-mo commented May 20, 2022

@zklaus most of the goals here would best be achieved by better supporting the dask.distributed scheduler.

I think that is not really the point of this ticket..
The point here is, simply, that we need to assist the provision of some complex operations on cubes, where it is already possible to do the relevant on calculations on arrays via Dask.

So, we'd like a generic operation that can take a cube function and parallelise the operation as stated.

Simple element-wise cube calculations can be achieved using 'map_blocks', and plenty of users have already used that themselves. As it's not very tricky, we haven't yet moved to provide any generic facility to convert a cube function in that way.
But we now identified a need for overlap processing, as done in Dask with 'map_overlap', which is a rather trickier to manage, so now seems like a good time to develop a special facility.

( IMHO : the possible saving of intermediate results to disk, and even the 'splitting' (like a rechunk) are maybe separable operations, and not really core to the main task here.
)

@zklaus
Copy link
Contributor

zklaus commented May 20, 2022

Perhaps I was thrown off by the use of multiprocessing. So to be clear, you don't mean multiprocessing?

@pp-mo
Copy link
Member

pp-mo commented May 20, 2022

you don't mean multiprocessing?

I don't think so.
@wjbenfold ??

@wjbenfold
Copy link
Contributor Author

I meant "multiprocessing" in the sense of "having work done by multiple processes", with no intention of implying a specific library

@pp-mo
Copy link
Member

pp-mo commented May 20, 2022

Another point :

It seems logical to me that this is best provided as a "wrapper" to Dask.map_overlap.
Since that provides a lot of detailed controls, it makes very good sense to simply borrow, thinly wrap and/or pass-through those keys -- and totally lean on the Dask terminology + documentation.

So, that approach could be very capable, but totally Dask-specific.
That would make a bit of a break with previous practice in core Iris, where we have tried to provide for an abstracted "lazy" data handling, via our own APIs, rather than simply declaring "cube content may be a Dask array".
( whether that was a good idea is another debate ! ... )

From offline discussion with @bjlittle, rather than just cave in and say "Iris lazy is really just Dask", we might instead choose to have this facility live somewhere else, i.e. not in "core Iris".
That should be ok, since I don't foresee this needing to use a lot of private Cube API.

( Actually there is also special-case benefit, since the expected partners here would rather run this with a release-version Iris )

@zklaus
Copy link
Contributor

zklaus commented May 20, 2022

I'll be interested to see what you come up with. From my personal experience, it seems that almost any non-trivial application will require direct dask programming for the core array parts, while the metadata part would benefit from not being split up since the individual parts will usually have the same units etc.

@github-actions
Copy link
Contributor

github-actions bot commented Oct 3, 2023

In order to maintain a backlog of relevant issues, we automatically label them as stale after 500 days of inactivity.

If this issue is still important to you, then please comment on this issue and the stale label will be removed.

Otherwise this issue will be automatically closed in 28 days time.

@github-actions github-actions bot added the Stale A stale issue/pull-request label Oct 3, 2023
@pp-mo
Copy link
Member

pp-mo commented Oct 3, 2023

Just re-reviewing this in context of discussion with @hdyson et al ...
@trexfeathers can you suggest what kind of priority you think this might get ?

Personally, I think this idea only flies if we can provide a generalised operation that is easier for the user than just "pull out the arrays and use Dask yourself".
It certainly feels to me like we will struggle to support the "interesting" cases --i.e. those with block overlaps-- while still shielding the user from the underlying complexities of the Dask implementation (i.e. Dask map_blocks or blockwise).
In my experience of such problems, it's really only possible to assess or optimise performance for a specific usecase, and it requires an understanding of how Dask will "see" that problem, and how it could be adjusted.

So I don't think we can realistically hope to just "solve" that problem...
but we might be able to provide a "potted" solution to help some computations work without blowing memory.
Probably we can specialise the operation to horizontal data dimensions, which is a typically obvious simplification which Dask itself would not be aware of.
I am imagining a simplified API providing a first stab at problems of that sort, with perhaps keywords to control the size or number of divisions and overlaps in X and Y. The user would need to provide a function of cubes --> cube, along with the relative sizes and alignments of its inputs and outputs.

@github-actions github-actions bot removed the Stale A stale issue/pull-request label Oct 4, 2023
@trexfeathers
Copy link
Contributor

Discussed offline with @trexfeathers @pp-mo @hdyson. Something to make clear publicly - our prioritisation is currently driven by:

With the resource we have available, there are inevitably plenty of perfectly valid issues that just can't be addressed within a reasonable time frame, hence the Stale A stale issue/pull-request label. Always open to discuss.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: No status
Development

No branches or pull requests

4 participants