Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast-like operations are poorly scheduled (widely-shared dependencies) #6570

Open
gjoseph92 opened this issue Jun 13, 2022 · 2 comments
Labels
memory performance scheduling stability Issue or feature related to cluster stability (e.g. deadlock)

Comments

@gjoseph92
Copy link
Collaborator

Graphs like this are not currently scheduled well:

. . . . . . . .   . . . . . . . .
|\|\|\|\|/|/|/|   |\|\|\|\|/|/|/|
| | | | a | | |   | | | | b | | |
* * * * * * * *   * * * * * * * *

The . tasks should definitely take into account the location of the * data when scheduling. But if we have 5 workers, every worker will have * data on it, but only 2 workers will have an a or b. In scheduling the first few .s, there's a tug-of-war between the a and the *—which do we want to schedule near? We want a way to disregard the a.

Say (*, 0) completes first, and a is already complete, on a different worker. Each * is the same size (or smaller than) a. We now schedule (., 0). If we choose to go to a, we might have a short-term gain, but we've taken a spot that could have gone to better use in the near future. Say the worker holding a is already running (*, 6). Now, (., 6) may get scheduled on yet another worker, because (., 0) is already running where it should have gone, and the scheduler prioritizes "where can I start this task soonest" over "how can I minimize data transfer".

This can cascade through all the .s, until we've transferred most root tasks to different workers (on top of a, which we have to transfer everywhere no matter what).

What could have been a nearly-zero-transfer operation is instead likely to transfer every piece of input data to a different worker, greatly increasing memory usage.

This pattern will occur anytime you broadcast one thing against another in a binary operation (which can occur in arrays, dataframes, bags, etc.).

import dask.array as da
a = da.random.random(100, chunks=10)
x = da.random.random(1)
r = (a[1:] * x)  # `[1:]` slicing prevents blockwise fusion
r.visualize(optimize_graph=True, collapse_outputs=True)

mydask

In the above case, the mul tasks will tend to "dogpile" onto the one worker that holds the middle random_sample task (x).

@crusaderky has also observed cases where this "dogpile" effect can cause what should be an embarrassingly-parallell operation to all get scheduled on one worker, overwhelming it.

#5325 was a heuristic attempt to fix this, but there are probably better ways to approach it.

@fjetter
Copy link
Member

fjetter commented Jun 14, 2022

Do we see this graph structure when using any high level collection like dataframes, arrays or bags?

Does this also happen with work-stealing disabled?

@gjoseph92
Copy link
Collaborator Author

Do we see this graph structure when using any high level collection like dataframes, arrays or bags?

See my above example using dask array. I'm sure I could make a similar one with dataframes. As I said, this is basically going to affect any sort of broadcast operation.

I don't think work-stealing has an effect here; if anything it might help. I need to double-check though.

The basic problem is that the decide_worker objective function only considers the sizes of the inputs, not the bigger picture of whether some tasks are more worth moving (for parallelism) even if they're larger. See b4ebbee:

It's more meant to discourage transferring keys that could have just stayed in one place. The goal is that if A and B are on different workers, and we're the only task that will ever need A, but plenty of other tasks will need B, we should schedule alongside A even if B is a bit larger to move.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
memory performance scheduling stability Issue or feature related to cluster stability (e.g. deadlock)
Projects
None yet
Development

No branches or pull requests

2 participants