Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ease memory pressure by deprioritizing root tasks? #6360

Open
TomNicholas opened this issue May 17, 2022 · 24 comments
Open

Ease memory pressure by deprioritizing root tasks? #6360

TomNicholas opened this issue May 17, 2022 · 24 comments
Assignees
Labels
discussion Discussing a topic with no specific actions yet memory performance stability Issue or feature related to cluster stability (e.g. deadlock)

Comments

@TomNicholas
Copy link

TomNicholas commented May 17, 2022

The need for memory backpressure has been discussed at length, and a full solution is arguably necessary to solve many of the use cases in the pangeo community. Whilst efforts in that direction do seem be underway, I would like to suggest a simpler stop-gap solution for the shorter term. The basic issue is that workers can overeargerly open new data at a rate faster than other tasks free up memory, and is best understood via the figure under "root task overproduction" here.

All my tasks that consume memory are root tasks, so is there any way to deprioritize root tasks? This should be a lot simpler than solving the general problem, because it doesn't require (a) any records kept of how much memory each task uses or (b) workers to change their priorities over the course of a computation based on new information. All I need is for workers to be discouraged / prevented from opening new data until the task chains that are already begun are as complete as possible. This will obviously waste time as workers wait, but I would be happy to accept a considerable slowdown if it means that I definitely won't run out of memory.

This problem applies to any computation where the data is largest when opened and then steadily reduced (which is most pangeo workflows, with the important exception of the groupby stuff IIUC), but the opening task and the memory-relieving tasks can't be directly fused into one.

Is there some way of implementing this? I'm happy to provide an example use case if that would help, or to try hacking away at the distributed code, but I wanted to know if this is even possible first.

cc @gjoseph92 , and @rabernat @dcherian @TomAugspurger , because this is related to the discussion we had in the pangeo community meeting the other week.

@gjoseph92
Copy link
Collaborator

The figure @TomNicholas was referencing (thanks for the reminder, I forgot about this!):
image

I mention that because this problem is inherent to the way the current task scheduling algorithm works in distributed; see #5223 and #5555.. So approaches like #4891 are not, in my mind, the "full solution"—they'd be layering on more complexity to an already-complex algorithm with two components (normal eager scheduling, and task stealing aka occasional rebalancing). "Speculative task assignment" (#3974) is more what I'd consider the "full solution", thought I think there are other approaches that could work as well.

Just for context :)

@gjoseph92
Copy link
Collaborator

so is there any way to deprioritize root tasks?

As a way to hack around this, right now, I'd play with worker resources. For example, something along the lines of (untested):

import dask
import coiled
import xarray as xr

NTHREADS = 4
cluster = coiled.Cluster(..., worker_cpu=NTHREADS, worker_options={"resources": {"ROOT": NTHREADS}})
client = dask.distributed.client(cluster)

with dask.annotate(resources={"ROOT": 1}):
    ds = xr.open_dataset(...)

result = ds.a - ds.a.mean()
with dask.config.set({"optimization.fuse.active": False}):
    result.compute()

A few caveats:

@dcherian
Copy link

One way to "solve" this is to batch the computation (e.g. compute the result 100 timesteps at a time) and write to zarr (some ugly code below).

def batch_load(obj, factor=2):
    """
    Load xarray object values by calling compute on block subsets (that are an integral multiple of chunks along each chunked dimension)

    Parameters
    ----------
    obj: xarray object
    factor: int
        multiple of chunksize to load at a single time.
        Passed on to split_blocks
    """
    if isinstance(obj, xr.DataArray):
        dataset = obj._to_temp_dataset()
    else:
        dataset = obj


    # result = xr.full_like(obj, np.nan).load()
    computed = []
    for label, chunk in split_blocks(dataset, factor=factor):
        print(f"computing {label}")
        computed.append(chunk.compute())
    result = xr.combine_by_coords(computed)


    if isinstance(obj, xr.DataArray):
        result = obj._from_temp_dataset(result)


    return result




def batch_to_zarr(ds, file, dim, batch_size, restart=False, **kwargs):
    """
    Batched writing of dask arrays to zarr files.

    Parameters
    ----------
    ds : xarray.Dataset
        Dataset to write.
    file : str
        filename
    dim : str
        Dimension along which to split dataset and append. Passed to `to_zarr` as `append_dim`
    batch_size : int
        Size of a single batch

    Returns
    -------
    None
    """


    import tqdm


    if not restart:
        ds.isel({dim: [0]}).to_zarr(file, consolidated=True, mode="w", **kwargs)
    else:
        print("Restarting...")
        opened = xr.open_zarr(file, consolidated=True)
        ds = ds.sel(time=slice(opened[dim][-1], None))
        print(
            f"Last index = {opened[dim][-1].values}. Starting from {ds[dim][1].values}"
        )
        opened.close()


    for t in tqdm.tqdm(range(1, ds.sizes[dim], batch_size)):
        if "encoding" in kwargs:
            del kwargs["encoding"]
        ds.isel({dim: slice(t, t + batch_size)}).to_zarr(
            file, consolidated=True, mode="a", append_dim=dim, **kwargs
        )

Effectively this prevents the scheduler from ever seeing "too many" root tasks.

This now reminds me of the rechunker approach
image

where the dummy task in the middle prevents scheduler from getting too far ahead, and consequently limits memory usage.

So perhaps there can be a new function in graph_manipulation.py that takes as input an integer N and a task token token. This should make the N+1:2Nth tasks execute only after the Nth task has completed executing.

@TomNicholas
Copy link
Author

TomNicholas commented May 26, 2022

Thanks both for your replies.

@dcherian batching like you suggested is one of my backup plans 😅, but thank you for the code example!

@gjoseph92 I tried playing with worker resources as you suggested, but if it made any difference to the behaviour I couldn't tell.

Without setting a limit on resources my cluster starts failing like this:

Screenshot from 2022-05-26 14-37-46

and after I set a limit via

options = g.cluster_options()
NTHREADS = 1
options.worker_cores = NTHREADS
options.worker_memory = 50

with dask.config.set({"resources": {"ROOT": NTHREADS}}):
    gc = g.new_cluster(cluster_options=options)

with dask.annotate(resources={"ROOT": 1}):
    ds = xr.open_dataset(...)

omega = vort(ds)  # my calculation, an xarray.apply_ufunc

with dask.config.set({"optimization.fuse.active": False}):
    with gc.get_client():
        # write result to zarr
        st = omega.to_dataset(name="vort").to_zarr(mapper, mode='w', compute=True)

it looks like this

Screenshot from 2022-05-26 16-33-31

You can see that in both cases it has managed to save out ~300 chunks (store_map), but the data spilled to disk still keeps steadily growing.


At @jbusecke's suggestion I also tried setting the task priority via

with dask.annotate(priority=-999999):
    ds = xr.open_dataset(...)

with dask.annotate(priority=999999):
    with gc.get_client():
        # write result to zarr
        st = omega.to_dataset(name="vort").to_zarr(mapper, mode='w', compute=True)

which appeared to help the computation proceed a bit somewhat, but after a while my memory usage still keeps growing indefinitely.


This is a subset of my optimised (partially fused) task graph, for two timesteps. It's basically two open_dataset calls per timestep, each of which is padded with xarray, then fed into a simple apply_ufunc which returns one chunk, then written to a single zarr chunk.

image

At full scale I'm trying to compute 9000 timesteps. The difference is that the outer-most 4 task chains (2 on each side) are duplicated (there are 18000 open's for the full problem), but the middle 3 chains don't change (those are grid-related variables that don't depend on time), so it's basically 2N+3 open_dataset calls for N time chunks.

The static ordering looks reasonable - dark orange should be written out before yellow and light orange are loaded

image

But this static ordering seems to be largely irrelevant to what actually happens when you have multiple workers.


Is this a pathological fail case? Whatever I've tried my 20 workers still race to open ~400 chunks (loading hundreds of GBs into memory and spilling to disk) before they manage write a single chunk out, even though each output chunk only requires two input chunks.

There might be some way for me to inline/fuse my way to an embarrassingly parallel graph, so that the open and write steps are in one task, but at that point I may as well have just used map_blocks (or a different parallel execution framework).

@gjoseph92
Copy link
Collaborator

But this static ordering seems to be largely irrelevant to what actually happens when you have multiple workers

Yes, the priority ordering doesn't have that much effect (it's just a tie-breaker, not an actual ordering that the scheduler follows).

if it made any difference to the behaviour I couldn't tell

I'm a bit surprised by this (but I agree, it didn't look like anything changed).

I looked into it a little and it turns out the scheduling of tasks that use resources doesn't work the way I expected:

#6468 makes it work the way you'd expect. @TomNicholas, I'd be really curious to see how things work if you keep using resources, but switch to that PR. There's been lots of discussion of root-task overproduction, but we've never actually gotten to see how things would go if it didn't happen, so having this comparison would be very valuable.

@TomNicholas
Copy link
Author

Thanks so much for that @gjoseph92 - I'm excited to try it out, but it'll take me a few days as I need to work out how to run on a custom version of distributed first.

@gjoseph92
Copy link
Collaborator

You could use Coiled if that would be easier; you can install git+https://github.com/gjoseph92/distributed.git@hold-back-tasks-without-available-resources in your software environment.

@TomNicholas
Copy link
Author

TomNicholas commented Jun 1, 2022

@gjoseph92 @jbusecke and I are trying to test this but our coiled cluster won't actually start :/

This is the environment file we're using, which we tried both locally and in the pangeo cloud

name: hero_calculation
channels:
  - conda-forge
dependencies:
  - ipykernel
  - coiled
  - xgcm
  - numpy
  - zarr
  - fsspec
  - gcsfs
  - pip
  - pip:
    - git+https://github.com/pydata/xarray.git
    - git+https://github.com/gjoseph92/distributed.git@hold-back-tasks-without-available-resources

With the default environment on pangeo cloud we are able to start a cluster, but that doesn't use your updated version of distributed.

This succeeds:

import coiled
coiled.create_software_environment(
    conda="environment.yml",
)

But then
cluster = coiled.Cluster()
never finished.

When we try to use our altered environment (with your distributed PR and xarray's main branch) the notebook hangs on creating the coiled cluster.

Is there some dask dependency we've forgotten here? Is there a better channel to get help with this coiled cluster issue?

@gjoseph92
Copy link
Collaborator

@TomNicholas why don't you join the Coiled slack; it'll be easier to go back and forth there. I think this link works, but if not it's at the bottom-left of https://cloud.coiled.io: https://join.slack.com/t/coiled-users/shared_invite/zt-hx1fnr7k-In~Q8ui3XkQfvQon0yN5WQ.

I also haven't tested my PR at all. It's entirely possible I messed up something silly. The cluster logs might show more. I also wouldn't be surprised if you need to add git+https://github.com/dask/dask.git@main, though I thought distributed might pull this in automatically.

@jbusecke
Copy link

jbusecke commented Jun 2, 2022

I also wouldn't be surprised if you need to add git+https://github.com/dask/dask.git@main, though I thought distributed might pull this in automatically.

We actually tried this actually, but it failed due a dependency pinned on the last dask release. Just FYI

@gjoseph92 gjoseph92 self-assigned this Jun 9, 2022
@gjoseph92
Copy link
Collaborator

@TomNicholas and I reproduced a variant of his failing workload on Coiled, running on my PR #6467. We used worker resources to annotate root tasks as described in #6360 (comment). With my PR, this made it so the scheduler would stream root tasks into workers, only submitting a new root task when it heard the previous had completed, instead of submitting all root tasks up front, which the current scheduling algorithm does.

This allowed us to simulate how a cluster would perform if root task overproduction were fixed. The results are compelling:
plot

Withholding root tasks from workers enormously reduced memory use (15x lower peak memory usage) while cutting runtime nearly in half.

These trials used the same workload on the same cluster, both on #6467. The only difference was "root tasks withheld" wrapped xr.open_dataset in dask.annotate(resources={"ROOT": 1}), and "standard" did not. All workers were r5.2xlarge, with 1 ROOT resource each.

You can see how different memory usage looked on the dashboard:

dashboards

It's important to note that because we used resource restrictions, scheduling didn't use co-assignment logic (#4967). I believe that's the reason you see transfers on the second dashboard (two tasks that contributed to the same one output were scheduled on different workers). Implemented properly to take advantage of this, I expect fixing root task overproduction would give even better runtime (maybe 20-40% faster?) and lower memory usage than what we see here.


Important takeaways:

  • Root task overproduction is a critical issue, hampering memory usage and therefore both runtime and stability.
  • It could be addressed just on the scheduler side, without the complexity of Speculatively assign tasks to workers #3974, and still have far better performance than current scheduling.

I estimate that a scheduler-only fix for root task overproduction, which just withheld root tasks from workers while maintaining our co-assignment logic, could be implemented in a week.

cc @fjetter @hayesgb @mrocklin

@mrocklin
Copy link
Member

mrocklin commented Jun 9, 2022

Those are compelling plots.

There's a lot here. Is there a thing that I or someone like @fjetter should read to understand the proposed design?

In principle if we could reliably solve this problem with a week of dev time and not cause other problems then that sounds like time well spent.

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Jun 9, 2022

Part of the week includes coming up the the design. But the overall idea is something like:

  • Identify root-ish tasks using something like the logic we already use
  • Don't have more than SchedulerState.total_nthreads of these root-ish tasks in processing at once. Hold back the extras on the scheduler somewhere, in either state no-worker or a new state like withheld (or ready, with the same meaning as on the worker).

The implementation of this is what needs consideration, particularly how we still get co-assignment (which right now relies on running through all the root tasks at once). Maybe something like:

  • transition_waiting_processing recommends the task to withheld instead of processing if ts.rootish and len(selected_worker.processing) >= selected_worker.nthreads.
  • It stores the planned worker assignment on ts.planned_worker (this is hacky and I don't like it, but it's an easy way to maintain co-assignment)
  • It tracks these withheld tasks in a data structure like priority heap (SchedulerState.unrunnable is basically this already, just not ordered)
  • When any rootish task completes, we pop the next task off the heap and recommend scheduling it as part of _remove_from_processing

The advantage is that it all happens on the scheduler. Nothing has to change with the worker; from the worker's perspective, it just doesn't get lots of tasks queued on it. That makes me more confident in it not causing deadlocks, since the scheduler is generally easier to reason about than the worker state machine.

This obviously introduces a bit more idleness on the worker: when it completes a root task, it doesn't have another root task queued, so it has to wait for a roundtrip to the scheduler for the next instruction. Speculative task assignment would eventually be a nice optimization to avoid this. This experiment here shows accepting that latency (and a horrendously slow scheduling algorithm; it couldn't even handle 200k tasks) but taking a better path through the graph is still faster overall than quickly taking a bad path through the graph. Slow is smooth, smooth is fast.

@mrocklin
Copy link
Member

mrocklin commented Jun 9, 2022

Historically I would have said "I'd rather not complicate the scheduler further. It makes sense to have the worker handle this given how relatively simple the worker is" With recent changes though I'm not sure that that is still the case. It seems like the worker might be a place of more complexity than the scheduler these days. I neither love nor hate this plan.

@gjoseph92
Copy link
Collaborator

The scheduler-only aspect of it is the main appeal to me, for that reason. The worker also can't do this on its own without extra information from the scheduler (it doesn't know enough about overall graph structure). So we either have to touch both scheduler and worker, or only scheduler.

@mrocklin
Copy link
Member

mrocklin commented Jun 9, 2022

It could guess. We could pass rootishness around. It could also see that "hey, I often have tasks that arrive just after these rootish tasks are done. I guess I should maybe wait a bit". I'm not pushing for this though. The worker seems to be a larger source of complexity today than the scheduler.

@gjoseph92
Copy link
Collaborator

I really don't like the idea of guessing. Passing rootishness around was one of my thoughts, but it introduces consistency issues (if downstream tasks are cancelled, you have to inform workers to de-rootify their tasks) without providing any complexity or performance benefit (without STA, the worker would still have to wait for a scheduler roundtrip before it could run a new task).

@mrocklin
Copy link
Member

mrocklin commented Jun 9, 2022

I'm not pushing for my thought above. There's no need to argue against it.

I do think that you should develop some comfort with guessing though. We don't need to be perfect, just good in aggregate.

@fjetter
Copy link
Member

fjetter commented Jun 10, 2022

I agree with @gjoseph92 here and I wouldn't want to implement any logic on the worker that would introduce artificial waiting times. It's current state is not well suited to handle this kind of logic well. I do not see any problems with STA on worker side. We likely need to adjust a few transition rules but that should be doable after the latest changes.


Don't have more than SchedulerState.total_nthreads of these root-ish tasks in processing at once

The request of "I would like to limit the total number of tasks of type X concurrently executing on the cluster" has come up a couple of times, see

I'm wondering if this can/should be solved first generically before we write a specialized version for root tasks. That would obviously only be part of the solution but it would be a re-usable part

The implementation of this is what needs consideration, particularly how we still get co-assignment

Can we first deliver something that does not care about co-assignment? IIUC, your prototype does not consider co-assignment and it still is a major win.

@mrocklin
Copy link
Member

I agree with @gjoseph92 here and I wouldn't want to implement any logic on the worker

Just to be perfectly clear, so do I. I brought up the other approach to say "there are other ways we could think about doing this" not to say "we should do it this way instead".

@gjoseph92
Copy link
Collaborator

Update: we're planning on working on this, starting late next week or the week after: #6560. The goal is a draft PR (not necessarily merged) within 2 weeks. When that's up, we'd very much appreciate input from people involved in this thread who could test us out and let us know how it affects performance on real-world cases.

@fjetter fjetter added performance discussion Discussing a topic with no specific actions yet stability Issue or feature related to cluster stability (e.g. deadlock) memory labels Jun 13, 2022
@TomNicholas
Copy link
Author

TomNicholas commented Jun 13, 2022

Here's a notebook containing a simple example demonstrating the root task overproduction problem.

This example only uses dask, not importing xarray or xGCM anywhere. It's set up to run on a coiled cluster, using Gabe's PR. The conda environment file is just

name: root_overproduction_test_random
channels:
  - conda-forge
dependencies:
  - ipykernel
  - coiled
  - numpy
  - python-graphviz
  - graphviz
  - pip
  - pip:
    - git+https://github.com/gjoseph92/distributed.git@f05c94e4c986d7e23e27215ce646a12d6008c849

The offending graph looks like this

image

I think @gjoseph92 was able to come up with some even simpler examples, but I'm sharing this one because it's the simplest one I could create that still looks somewhat like the original problem I was trying to compute (with xGCM).

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Jun 13, 2022

Two even simpler examples (pure dask). These are a + b with just enough complexity to make them not fuse into a single task:

import dask.array as da

a = da.random.random(100, chunks=10)
b = da.random.random(100, chunks=10)
s = da.stack([a, b])
r = s.sum(axis=0)
r.visualize(optimize_graph=True)

mydask

import dask.array as da

a = da.random.random(100, chunks=10)
b = da.random.random(100, chunks=10)
r = a[1:] + b[1:]
r.visualize(optimize_graph=True)

mydask

sum and slicing aren't blockwise operations. So graph optimization won't fuse a, b, and the operation into a single task per chunk—there will be three tasks per chunk.

The input tasks (random array creation) will be overproduced relative to output tasks (sum/add), leading to excessive memory use.

(@TomNicholas is demonstrating the same thing in his notebook. Instead of slicing or concatenation, he's adding a rechunk step before the a + b, which has the same effect of preventing fusion.)

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Jun 13, 2022

Also, the problem @TomNicholas has been demonstrating here is a simplified form of what he actually needs to solve.

Here, he's computing a + b (which runs out of memory due to root task overproduction).

What he actually needs to compute is broadcast1 * a + broadcast2 * b. Where a and b are n-dimensional, and the broadcast are (N-1)-dimensional, and broadcast against many chunks of a and b.

This not only suffers from root task overproduction, but also hits the "widely-shared dependencies dogpile": #6570

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet memory performance stability Issue or feature related to cluster stability (e.g. deadlock)
Projects
None yet
Development

No branches or pull requests

6 participants