Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler memory leak / large worker footprint on simple workload #3898

Open
chrisroat opened this issue Jun 15, 2020 · 41 comments
Open

Scheduler memory leak / large worker footprint on simple workload #3898

chrisroat opened this issue Jun 15, 2020 · 41 comments
Labels

Comments

@chrisroat
Copy link
Contributor

What happened:

Running an embarrassingly parallel map_overlap workload may be causing a memory leak in the scheduler. Upon completion, releasing the tasks and restarting the client does not reclaim the memory. The example below, with 200k tasks, shows a jump in scheduler memory from 100MB to 1.3GB while running the graph. After client.restart, it remains at 1.1GB.

In addition, the memory of the workers climb into the "yellow", where I believe swapping to disk begins to happen. Given the parallel nature of this workload, workers ought to be able discard pieces when they are done with them.

From a performance perspective, during client.compute, the scheduler gets unresponsive (it takes 20ish seconds to start), presumably because its loading a large graph. I have seen this cause already running computations to start erroring. I've seen lost keys and KilledWorkers.

And finally, anecdotally, it sometimes happens that one worker runs hot, getting 10x the tasks of other workers. Eventually, forward progress halts. I now watch for this, and then kill that worker, which redistributes the work and finishes the job. (I'm using dask-gateway on K8s).

What you expected to happen:

  • The scheduler should not use up additional memory once a computation is done.
  • Workers should shard a parallel job so that each shard can be discarded when done, keeping a low worker memory profile
  • Loading a graph should not disrupt ongoing computation

Minimal Complete Verifiable Example:

import dask.array as da
import distributed
client = distributed.Client(n_workers=4, threads_per_worker=1, memory_limit='10GB')
arr = da.zeros((50, 2, 8192, 8192), chunks=(1, -1, 512, 512))
result = arr.map_overlap(lambda x: x, depth=(0,0,200,200))
store = result.to_zarr('/media/ssd/test/memory_scheduler.zarr', compute=False, overwrite=True)
future = client.compute(store)

Environment:

  • Dask version: 2.18.1
  • Distributed version: 2.18.0
  • Python version: 3.7
  • Operating System: Ubuntu 18.04
  • Install method (conda, pip, source): conda
@chrisroat
Copy link
Contributor Author

I also see a lot of this in the logs

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 7.03 GB -- Worker memory limit: 10.00 GB

@mrocklin
Copy link
Member

For memory issues on the worker I recommend that you look through dask/dask#3530

20ish seconds for 200k tasks is a bit long, but not terribly. Task overhead is in the range of 200us per task, so this is in line with expectations. There are efforts to accelerate this, but they're probably a few months out at best.

I suspect that your minimal example is intentionally simplified, but your chunk sizes are quite small. A good way to avoid task overhead is to have fewer larger tasks.

I don't know why your scheduler memory would stay at 1GB after a restart, except perhaps that your OS has decided to hold onto the memory for a while (this is common).

@chrisroat
Copy link
Contributor Author

Thanks for following up. I did start at #3530, but was advised that since I was using a single-threaded scheduler, the problem there was not mine.

The large-graph problem is partially the chunk size, but also because overlap computations are part of the mix.

I can use larger chunks for some things, but this is one use case I have where smaller chunks are required. I'm aligned images, and the alignment varies across the larger image. Basically, I'm micro aligning 512x512 chunks. I suppose I could go the inception path: a map_overlap within a map_overlap using scheduler=synchronous on the inside.

As an aside, my real chunks are more like ~5x200x512x512 uint16, which is ~500MB. But I wanted an example that would be the same number of tasks, but quickly do-able as a self contained example.

I'm still curious as to why a purely parallel computation (other than the overlap) is not proceeding in a way that minimizes memory consumption.... it seems like it's trying to read the whole thing into all the workers memory at once.

How long could the OS hold onto things? I run both locally and on GCP on Ubuntu images, and I've never seen it go away. Each time I add a graph, it eats up more space, so it doesn't seem to be re-using a heap or anything like that. I run my dask-gateway scheduler with 15GB of memory, but generally restart before I hit that because things get touchy. (I had to turn off autoscaling, because including that on top of this seemed to bog the scheduler.)

@mrocklin
Copy link
Member

Thanks for following up. I did start at #3530, but was advised that since I was using a single-threaded scheduler, the problem there was not mine.

To be clear, in the example above you're using the distributed scheduler. When we talk about the single-threaded scheduler we mean when you don't create a client, and you call .compute(scheduler="single-threaded")

But I wanted an example that would be the same number of tasks, but quickly do-able as a self contained example

I figured, and I appreciate it.

I'm still curious as to why a purely parallel computation (other than the overlap) is not proceeding in a way that minimizes memory consumption.... it seems like it's trying to read the whole thing into all the workers memory at once.

It should. If you're interested you can try x.visualize(color="order") and get out a pretty picture. This only works on graphs that have less than, say 500 tasks or so.

How long could the OS hold onto things?

Some OS's hold onto memory until someone else needs it. If your machine is properly running out of RAM though then that's not this.

@mrocklin
Copy link
Member

In short, I personally don't know what's going on. I would probably have to dive in more deeply to figure out what the problem is, but it's unlikely that I'll have time to do that any time in the near future.

@quasiben
Copy link
Member

Instead of client.restart can you call client.cancel(future) at the end of the computation ? This is usually where I start when I want to make sure everything is gone.

@chrisroat
Copy link
Contributor Author

chrisroat commented Jun 17, 2020

I do both - I cancel all futures, then restart the client. The memory still persists.

The color="order" is super neat. With optimize_graph=True, collapse_outputs=True, it is pretty fun to use.

One thing I've noticed is that the designations for overlap tasks have fractions. Is there any comparison of floating points that might happen?

From playing around with a few small examples, the orderings look fine to me. Is order fully respected globally in some way (or even just on a worker)? I could imagine that if some task is waiting due to some upstream bottleneck, later-ordered but read-to-go tasks could be processed. For example, data can always be read in and consume memory.

And with overlap tasks, is it safe to assume that just the necessary neighbor data is sliced before being transferred (and a full transfer of the neighbor chunks is not done)?

@mrocklin
Copy link
Member

mrocklin commented Jun 17, 2020 via email

@TomAugspurger
Copy link
Member

TomAugspurger commented Jun 23, 2020 via email

@chrisroat
Copy link
Contributor Author

chrisroat commented Jun 24, 2020

I was curious if any of the leak was fractional keys, and found there is some code in the overlap module that is being reported as leaking. Here is the top-20 lines reported, which totals ~400MB (of 1GB growth I see), within dask/distributed 2.19.0 (Tagging @jakirkham from the other thread)

Perhaps those knowledgable of dask internals can verify whether tracemalloc has found something real, and what possible mitigations would be.

Report:

Top 20 lines
#1: protocol/core.py:199: 32271.9 KiB
    return msgpack.loads(payload, use_list=False, **msgpack_opts)
#2: array/overlap.py:35: 26246.2 KiB
    rounded = (task[0],) + tuple(int(round(i)) for i in task[1:])
#3: array/overlap.py:112: 16850.3 KiB
    seq = list(product(*args))
#4: array/overlap.py:57: 16536.8 KiB
    if all(ind == slice(None, None, None) for ind in index):
#5: distributed/scheduler.py:2057: 15708.7 KiB
    prefix_key = key_split(key)
#6: array/overlap.py:48: 12670.9 KiB
    index.append(slice(None, None, None))
#7: <frozen importlib._bootstrap_external>:525: 11340.3 KiB
#8: distributed/scheduler.py:4723: 11140.5 KiB
    self.transition_log.append((key, start, finish2, recommendations, time()))
#9: distributed/scheduler.py:4744: 10847.7 KiB
    plugin.transition(key, start, finish2, *args, **kwargs)
#10: array/optimization.py:121: 10240.1 KiB
    dsk = dsk.copy()
#11: array/overlap.py:145: 9641.2 KiB
    frac_slice = fractional_slice((x.name,) + k, axes)
#12: distributed/scheduler.py:4713: 9206.8 KiB
    a = a.copy()
#13: array/overlap.py:52: 8693.4 KiB
    index.append(slice(-left_depth, None))
#14: array/optimization.py:191: 8232.2 KiB
    dsk[k] = (get, a, a_index)
#15: array/overlap.py:50: 7290.1 KiB
    index.append(slice(0, right_depth))
#16: distributed/scheduler.py:2055: 6600.3 KiB
    ts = TaskState(key, spec)
#17: distributed/scheduler.py:3949: 5815.5 KiB
    ts.waiters = {dts for dts in ts.dependents if dts.state == "waiting"}
#18: distributed/scheduler.py:655: 5800.7 KiB
    self.who_has = set()
#19: distributed/scheduler.py:653: 5800.3 KiB
    self.waiting_on = set()
#20: distributed/scheduler.py:650: 5800.3 KiB
    self.who_wants = set()
15712 other: 161295.1 KiB
Total allocated size: 398029.4 KiB

Code:

import tracemalloc
import linecache
import gc
import os
import tracemalloc
import dask.array as da
import distributed


tracemalloc.start()

client = distributed.Client(n_workers=4, threads_per_worker=1, memory_limit='10GB')
print(client)

arr = da.zeros((50, 2, 8192, 8192), chunks=(1, -1, 512, 512))
result = arr.map_overlap(lambda x: x, depth=(0,0,200,200))
result.to_zarr('/media/ssd/test/memory_scheduler.zarr', overwrite=True)

client.restart()  # Don't wait.  Stop immediately, reset, and see if everything is at baseline.
gc.collect()  # Really!  We want everything to go back to baseline.
# Look at System display of client -- it shows >1GB of memory usage.  Baseline is ~100MB


# Pretty print function from tracemalloc webpage
def display_top(snapshot, key_type='lineno', limit=20):
    snapshot = snapshot.filter_traces((
        tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
        tracemalloc.Filter(False, "<unknown>"),
    ))
    top_stats = snapshot.statistics(key_type)

    print("Top %s lines" % limit)
    for index, stat in enumerate(top_stats[:limit], 1):
        frame = stat.traceback[0]
        # replace "/path/to/module/file.py" with "module/file.py"
        filename = os.sep.join(frame.filename.split(os.sep)[-2:])
        print("#%s: %s:%s: %.1f KiB"
              % (index, filename, frame.lineno, stat.size / 1024))
        line = linecache.getline(frame.filename, frame.lineno).strip()
        if line:
            print('    %s' % line)

    other = top_stats[limit:]
    if other:
        size = sum(stat.size for stat in other)
        print("%s other: %.1f KiB" % (len(other), size / 1024))
    total = sum(stat.size for stat in top_stats)
    print("Total allocated size: %.1f KiB" % (total / 1024))


snapshot = tracemalloc.take_snapshot()   
display_top(snapshot)

@chrisroat
Copy link
Contributor Author

I re-tested this in 2.25.0 and found the memory increase on the scheduler (after reset) is now even higher - 2.2GB.

The code above just shows possible leaks in the main process, which is roughly the same. I'm not sure yet how to figure out where the scheduler memory is going.

My workaround is to bring up a new cluster for each calculation.

@mrocklin
Copy link
Member

mrocklin commented Sep 2, 2020

Thanks for keeping up with this @chrisroat . I'm not sure I fully understand the printout that you gave a few months ago. What do the sizes mean there? Data that is no longer being tracked by Python? If so, I'm curious how this is being discovered. Some of those lines, like set(), seem fairly innocuous.

It's my first time seeing data like this. Can you help interpret it?

@chrisroat
Copy link
Contributor Author

Those are coming from tracemalloc, which is supposed to detect memory leaks.

I'm realizing now, however, that the 2GB scheduler memory increase is likely not traced here, as it's likely a child process. I think memory-profiler can profile child processes, but I haven't tried that yet.

@mrocklin
Copy link
Member

mrocklin commented Sep 2, 2020 via email

@fjetter
Copy link
Member

fjetter commented Jan 19, 2021

Not sure if related but I observed in the past a significant memory buildup, especially on the scheduler, by the various logs we keep. (transition_log, events log, etc.) While these logs are implemented as a deque and are limited by design, the limit is sometimes too large for small pod sizes. I've seen this taking up several GBs of memory at occasion
That can be controlled with the option distributed.scheduler.transition-log-length and might be worth a shot. For a running cluster you could try something like

def clear_logs(dask_scheduler):
    dask_scheduler.log.clear()
    dask_scheduler.transition_log.clear()
    dask_scheduler.events.clear()
    
client.run_on_scheduler(clear_logs)

and see if memory drops.

These collections are not cleared when triggering restart (in fact, an event is logged :) )

We keep similar structures on the worker but I haven't investigated the impact over there since it is used more rarely and there are fewer

@chrisroat
Copy link
Contributor Author

Thanks for the tip on the the scheduler logs. Unfortunately, that doesn't seem to be the problem here. I ran clear_logs in the middle (48:00 in the image below) of the workload at the start of this issue -- and again at the end. I didn't see any change in memory usage.

The schedule jumps up early to 1GB of memory and pegs at 60%+ CPU.

Screen Shot 2021-01-22 at 11 49 48 PM

This is with 2020.1.0 dask/distributed.

@chrisroat
Copy link
Contributor Author

chrisroat commented Mar 18, 2021

With the most recent HEAD, this seems to have gotten larger, at least via the system graph on the scheduler system monitor. The leak has gone from 1GB of leak in 2021.3.0 to 3GB at dask 2021.03.0+30.g76a3f866 and distributed 2021.03.0+17.g206c6f84.

I ran tracemalloc directly on the scheduler as suggested in #4571 (comment), I was able to get what seems to be a much more believable set of memory leaks than those listed above. But the total still doesn't quite fully add up to the total seen in the system viewer.

@madsbk because they seem to have recently been working on this code and might know if this is real and what might cause it.

import dask.array as da
import distributed
client = distributed.Client(n_workers=4, threads_per_worker=1, memory_limit='10GB')

def clear_logs(dask_scheduler): # As suggested in #3898 
    dask_scheduler.log.clear()
    dask_scheduler.transition_log.clear()
    dask_scheduler.events.clear()

def test_scheduler():
    import tracemalloc, time, gc
    client.run_on_scheduler(tracemalloc.start)
    print(client.run_on_scheduler(gc.collect)) # GC!
    snapshot1 = client.run_on_scheduler(tracemalloc.take_snapshot)
    
    arr = da.zeros((50, 2, 8192, 8192), chunks=(1, -1, 512, 512))
    result = arr.map_overlap(lambda x: x, depth=(0,0,200,200))
    store = result.to_zarr('/media/ssd/test/memory_scheduler.zarr', compute=False, overwrite=True)
    future = client.compute(store)    
    _ = future.result()
    
    client.run_on_scheduler(clear_logs) # Clear logs!
    print(client.run_on_scheduler(gc.collect)) # GC!
    snapshot2 = client.run_on_scheduler(tracemalloc.take_snapshot)
    client.run_on_scheduler(tracemalloc.stop) # Remember to stop tracing for a production scheduler

    top_stats = snapshot2.compare_to(snapshot1, 'lineno')

    print("[ Top 30 differences ]")
    for stat in top_stats[:30]:
        print(stat)

test_scheduler()

Output (deleted path prefixes):

/distributed/protocol/core.py:160: size=145 MiB (+145 MiB), count=1095412 (+1095412), average=138 B
/distributed/protocol/core.py:104: size=71.6 MiB (+71.6 MiB), count=1378388 (+1378382), average=54 B
/distributed/scheduler.py:1791: size=55.3 MiB (+55.3 MiB), count=213203 (+213203), average=272 B
/distributed/scheduler.py:2039: size=43.9 MiB (+43.9 MiB), count=213203 (+213203), average=216 B
/distributed/scheduler.py:1205: size=43.9 MiB (+43.9 MiB), count=213204 (+213204), average=216 B
/distributed/scheduler.py:1204: size=43.9 MiB (+43.9 MiB), count=213204 (+213204), average=216 B
/distributed/scheduler.py:1209: size=43.9 MiB (+43.9 MiB), count=213203 (+213203), average=216 B
/distributed/scheduler.py:1207: size=43.9 MiB (+43.9 MiB), count=213203 (+213203), average=216 B
/distributed/scheduler.py:1206: size=43.9 MiB (+43.9 MiB), count=213202 (+213202), average=216 B
/distributed/scheduler.py:1935: size=34.3 MiB (+34.3 MiB), count=200003 (+200003), average=180 B
/distributed/protocol/core.py:57: size=23.2 MiB (+23.2 MiB), count=82202 (+82202), average=296 B
/distributed/utils.py:648: size=18.5 MiB (+18.5 MiB), count=213200 (+213200), average=91 B
/dask/array/overlap.py:49: size=16.0 MiB (+16.0 MiB), count=299200 (+299200), average=56 B
/distributed/scheduler.py:1795: size=15.3 MiB (+15.3 MiB), count=99999 (+99999), average=161 B
/dask/array/overlap.py:36: size=13.7 MiB (+13.7 MiB), count=290603 (+290603), average=49 B
/distributed/scheduler.py:1220: size=13.0 MiB (+13.0 MiB), count=213205 (+213205), average=64 B
/distributed/scheduler.py:4082: size=13.0 MiB (+13.0 MiB), count=213204 (+213204), average=64 B
/distributed/scheduler.py:1221: size=13.0 MiB (+13.0 MiB), count=213203 (+213203), average=64 B
/dask/array/overlap.py:113: size=10.3 MiB (+10.3 MiB), count=135200 (+135200), average=80 B
/dask/array/core.py:1019: size=10.0 MiB (+10.0 MiB), count=5 (+5), average=2048 KiB
/distributed/scheduler.py:1810: size=10.0 MiB (+10.0 MiB), count=1 (+1), average=10.0 MiB
/dask/array/overlap.py:148: size=9297 KiB (+9297 KiB), count=119000 (+119000), average=80 B
/dask/array/overlap.py:56: size=8367 KiB (+8367 KiB), count=119000 (+119000), average=72 B
/distributed/protocol/serialize.py:500: size=8355 KiB (+8355 KiB), count=164445 (+164445), average=52 B
/distributed/scheduler.py:6709: size=8192 KiB (+8192 KiB), count=4 (+4), average=2048 KiB
/distributed/scheduler.py:1970: size=7700 KiB (+7700 KiB), count=46936 (+46936), average=168 B
/dask/array/overlap.py:61: size=7437 KiB (+7437 KiB), count=118999 (+118999), average=64 B
/distributed/scheduler.py:1197: size=7392 KiB (+7392 KiB), count=213202 (+213202), average=36 B
/dask/array/overlap.py:53: size=7252 KiB (+7252 KiB), count=176800 (+176800), average=42 B
/distributed/scheduler.py:2033: size=6912 KiB (+6912 KiB), count=12801 (+12801), average=553 B

@chunmingchen
Copy link

Lowering distributed.scheduler.transition-log-length value to 1K worked for me. Have you tried that under the HEAD version?

@chrisroat
Copy link
Contributor Author

I have used HEAD, and am in fact digging into the scheduler code this week in an effort to understand this leak (and other performance issues I am having).

My current versions are dask 2021.03.1+3.gcb34b298 and distributed 2021.03.1+5.g77a1fd19. Using a lower log length does remove some of the memory usage -- the same amount I can remove by clearing the logs at the end of the processing, which makes sense to me.

When you say "worked for me", did you mean you don't see the memory jump in this use case? Or in your own use case? Without all the tracemalloc stuff I put above to help anyone attempting to jump in, I've simplified my case to a simple map_overlap:

import dask
import dask.array as da
import distributed

with dask.config.set({"distributed.scheduler.transition-log-length": 3}):
    client = distributed.Client()
    _ = da.zeros((40, 40, 40), chunks=2).map_overlap(lambda x: x, depth=1).compute()

The scheduler initially uses ~100MB of memory. Running the above once causes it to jump to ~1.3GB. Subsequent runs (without creating a new client), cause the memory to jump ~200MB each time and cause a few gc warnings.

I think the big initial jump might be some memory that gets allocated and re-used in subsequent runs, and the true leak is just ~200MB per execution.

I have no concrete evidence, but I worry map_overlap uses floating point keys and perhaps some comparison is messed up somewhere because of that. I'm only at the very beginning of digging into the scheduler, which seems pretty large and complex.

@chrisroat
Copy link
Contributor Author

chrisroat commented Apr 6, 2021

I spent some more time investigating this. I've also tested with recent dask/dask#7525 #4677 (@jakirkham @madsbk).

The memory leaks are generated without the need of complicated graphs. Simply:

with dask.config.set({"distributed.scheduler.transition-log-length": 100}):
    client = distributed.Client(threads_per_worker=1)
_ = da.zeros(100_000, chunks=1).map_blocks(lambda x: x).compute()
  1. The first run generates a 1GB bump in memory.* (If the array doubled to 200_000, the memory jump is 3.5GB)
  2. Running the compute line additional times shows ~200MB growth per execution, and triggers GC warnings from perf_utils.
  3. Tracemalloc seems to indicate the part of the leak proportional to graph size comes frommsgpack.loads . I used MSGPACK_PUREPYTHON (and see the same leak sizes) and the leak seems to come from strings in the HLG for the 'keys' key, but I'm not sure if that holds if MSGPACK_PUREPYTHON is off.
  4. Tracemalloc also indicates key_split, but this is likely just the lru cache.
  • I cannot find where this comes from, but running the following will remove ~300MB (but its temporary, as it comes back on subsequent runs above)
def clear_some_mem(dask_scheduler=None):
    import ctypes
    libc = ctypes.CDLL("libc.so.6")
    libc.malloc_trim(0)
client.run_on_scheduler(clear_some_mem)

@max-sixty
Copy link
Contributor

max-sixty commented Jun 20, 2021

I'm not sure how helpful it is to add on anecdotes rather than detail, but I've recently hit this specifically with to_zarr, similar to the initial example, though seemingly much more severe than it.

With 100 workers each producing a ~250MB array and attempting xarray's to_zarr on a chunked array, each worker was adding (very approx) 1GB/ min to its unmanaged memory. Of 8000 tasks on 100 workers, a bit more than half failed because of a killed worker. Trimming memory helped but did not solve the problem.

Here's the repro, from: pydata/xarray#5499

import dask
import distributed
from dask.distributed import Client
import socket
from pathlib import Path
import xarray as xr

client = Client(f"tcp:https://localhost:8786")

size = 5000

def func(v):
    return xr.Dataset(
        dict(
            a=xr.DataArray(np.ones((size, size)), dims=["x", "y"]) * v,
            b=xr.DataArray(np.ones((size, size)), dims=["x", "y"]) * v * -1,
        ),
        coords=dict(v=[v]),
    )
    
DATA_PATH = Path("~/workspace/dask-test/").expanduser()
DATA_PATH.mkdir(exist_ok=True)

def write_zarr(v):
    ds = func(v)

    ds.chunk(dict(x=-1)).to_zarr(
        f"{DATA_PATH}/chunks/{v}.zarr", mode="w", consolidated=True
    )

futures = client.map(write_zarr, list(range(20)))

Running this creates 8.5GB of unmanaged memory.
image

...though only leaves 3.8GB of unmanaged memory after a few minutes:
image

On that instance it did work. Often it fails with Event loop was unresponsive in Scheduler for 4.59s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

@fjetter
Copy link
Member

fjetter commented Jun 21, 2021

In case this is of help to folks here. We've been adding on to our documentation about unmanaged memory which some might find useful. In particular when running on UNIX systems, there are various possibilities to reduce the unmanaged memory.

https://distributed.dask.org/en/latest/worker.html#memory-not-released-back-to-the-os

@j3pic
Copy link

j3pic commented Oct 18, 2021

@fjetter Could someone explain why we have to mess around with M_TRIM_THRESHOLD while working with Dask? With the Glibc default threshold of 128KB, we should never see anything that looks like a memory leak. Does Dask set the threshold to an absurdly high value (or -1, which disables trimming entirely)?

@gjoseph92
Copy link
Collaborator

@j3pic see dask/dask#3530 (comment) for some explanation. I'm not actually sure why the low glibc default 128KiB default doesn't work—I think the setting doesn't quite behave as you'd think. According to mallopt(3) it only applies to memory at the top of the heap, though https://www.joyfulbikeshedding.com/blog/2019-03-14-what-causes-ruby-memory-bloat.html#a-magic-trick-trimming indicates that the documentation may be incorrect. However, if MALLOC_TRIM_THRESHOLD_=0 vs MALLOC_TRIM_THRESHOLD_=128*1024 had different behavior, and 0 trimmed the whole heap while 128K only trimmed from the top, that would certainly explain it. I haven't read the source in a while though so I have no clue if this is true.

@fjetter
Copy link
Member

fjetter commented Jan 10, 2022

@fjetter Could someone explain why we have to mess around with M_TRIM_THRESHOLD while working with Dask? With the Glibc default threshold of 128KB, we should never see anything that looks like a memory leak. Does Dask set the threshold to an absurdly high value (or -1, which disables trimming entirely)?

@j3pic FWIW we recently introduced some default parameters for this here

environ:
MALLOC_TRIM_THRESHOLD_: 65536
OMP_NUM_THREADS: 1
MKL_NUM_THREADS: 1
but other than this we are not doing anything to modify the default behaviour of glibc. We ourselves are confused by this behaviour and cannot actually explain it as mentioned above. If you have some insights beyond what we put down in the docs or this issue, we're all very keen to learn.

@crusaderky has been looking into this the most and may be able to provide more info

@crusaderky
Copy link
Collaborator

Indeed explicitly setting MALLOC_TRIM_THRESHOLD_ to its supposed default will alter the glibc behaviour. Unsure why; sounds like a bug in either the documentation or the glibc itself.
dask sets it to 64k instead of 128k but either will more or less give you the same behaviour.
Setting it to 0 will work just the same, unless you have a C/C++ module that doesn't use PyMalloc, in which case you may observe performance degradation.

@zklaus
Copy link

zklaus commented Apr 8, 2022

Just found the following snippet in the mallopt man page under the M_MMAP_THRESHOLD entry:

Note: Nowadays, glibc uses a dynamic mmap threshold by default.
The initial value of the threshold is 128*1024, but when blocks
larger than the current threshold and less than or equal to
DEFAULT_MMAP_THRESHOLD_MAX are freed, the threshold is adjusted
upwards to the size of the freed block. When dynamic mmap
thresholding is in effect, the threshold for trimming the heap
is also dynamically adjusted to be twice the dynamic mmap
threshold. Dynamic adjustment of the mmap threshold is disabled
if any of the M_TRIM_THRESHOLD, M_TOP_PAD, M_MMAP_THRESHOLD, or
M_MMAP_MAX parameters is set.

Perhaps it helps in understanding this?

@crusaderky
Copy link
Collaborator

@zklaus it does.

This issue should be resolved by setting MALLOC_TRIM_THRESHOLD_.
However, please be aware of an ongoing issue in the dask config (with a straightforward workaround):

I'm closing this - please reopen if, after properly setting MALLOC_TRIM_THRESHOLD_, you still experience issues.

@zklaus
Copy link

zklaus commented Apr 8, 2022

I tried setting the variable in the shell before I came across the nanny version. In my case, it didn't help, but I may be suffering from a completely unrelated leak, so I am not gonna reopen this issue.

I do wonder if there is something else to be done here: The dynamic mmap thresholding means that as soon as large blocks are freed, the heap will see much more use. The DEFAULT_MMAP_THRESHOLD_MAX is 16MB or 32MB on a 64bit system (depending on whether you are operating under 64lp or not), so perhaps setting MALLOC_MMAP_THRESHOLD_ should also be considered.

@gjoseph92
Copy link
Collaborator

I may be suffering from a completely unrelated leak

FYI, if you're running a recent version, there's a known worker memory leak especially if your graph contains large-ish objects: #5960

@chrisroat
Copy link
Contributor Author

I don't think this should be closed. At the very least, we should re-run the original post's code and verify the problem no longer exists, or demonstrate a workaround that avoids the problem. It's the scheduler where the leak might be, so the current worker leak may not be related.

I'm recopying the code here:

import dask.array as da
import distributed
client = distributed.Client(n_workers=4, threads_per_worker=1, memory_limit='10GB')
arr = da.zeros((50, 2, 8192, 8192), chunks=(1, -1, 512, 512))
result = arr.map_overlap(lambda x: x, depth=(0,0,200,200))
store = result.to_zarr('/media/ssd/test/memory_scheduler.zarr', compute=False, overwrite=True)
future = client.compute(store)

I did try the fixes proposed throughout this thread. I generally see hundreds of extra MB each time the graph is submitted.

My solution, which may be good or not, was to have our libraries check the scheduler memory usage prior to submitting a graph. If the scheduler memory is above a few GB, the scheduler is killed and restarted. It means we can't share schedulers among processes -- each scheduler is "owned" by one process. (And anyway submitting large graphs from many processes slows down the scheduler, in addition to the memory issues.)

@crusaderky crusaderky reopened this Apr 8, 2022
@jafet-gonzalez
Copy link

We did a work around this by using KubeCluster

@gjoseph92
Copy link
Collaborator

Thanks @chrisroat. The #5960 thread is confusing, because there are actually two issues being discussed there (leak on workers, leak on scheduler). I consider that thread to only be about the worker leak (and we have found the underlying problem there, though it's not fixed). I just saw "nanny" in @zklaus's message, so I assumed that was about workers too.

I think @djhoese may be talking about similar scheduler memory growth as you.

#5971 is not relevant to that, because the scheduler doesn't run within a Nanny. Knowing if setting MALLOC_TRIM_THRESHOLD_ yourself before starting the scheduler helps is a useful data point. But it's probably not the actual solution.

@jakirkham
Copy link
Member

If the thread is confusing because of too many commingled discussions, wonder if we should raise new issues about these specific pieces and close this out.

@crusaderky
Copy link
Collaborator

Knowing if setting MALLOC_TRIM_THRESHOLD_ yourself before starting the scheduler helps is a useful data point. But it's probably not the actual solution.

If it did, wouldn't it be a red flag for large numpy arrays* sitting on the scheduler inside the dask graph instead of being scattered directly to the workers?

*or any other objects featuring contiguous buffers - e.g. not basic python types

@chrisroat
Copy link
Contributor Author

Hi. I was investigating this again for another project, and the memory bloat seems to still be present. (This means we have to kill & restart our pool after a couple of large overlapping calculations, as the scheduler just grinds to a halt.) I did a fair amount of debugging in the past to try and narrow things down. Not sure if anything earlier in this thread is helpful.

@fjetter
Copy link
Member

fjetter commented Sep 18, 2023

There is some internal logging that has been causing a memory increase over time and we're about to reduce the size of those internal logs in #8173

On #8164 there is a short analysis on how the memory increase behaves over time. From this analysis, after the internal logs are cut, the remaining memory increase is related to the actual computation graph and is of the order of ~16KiB per task in the graph distributed on all workers (i.e. a graph of 1MM tasks would have an overhead of ~16GiB. If you have 100 workers, that'd be roughly 160MiB on each worker). This remaining increase could theoretically also be cut if it poses problems but it is a little more work.

If somebody is observing an increase that is larger than what I am describing here, please let us know. If you manage to produce a reproducer, that would be ideal, of course.

@chrisroat
Copy link
Contributor Author

You are in luck -- the reproducer from the first post of this thread still shows the issue. Throughout the thread you can see the various things I tried -- e.g., memory allocation env vars and aggressively clearing logs. I also dug through a lot of the communication code; it's not my area of expertise, but it smelled like something was leaking at that level. I used to use a linux system, but yesterday I checked with my mac -- the issue is still present.

Note the memory leak here is not related to the workers. As per the title, it is the scheduler that jumps ~1GB on the first computation. Additional, similar computations keep increasing the memory ~200MB. Eventually, the scheduler becomes unresponsive. See this comment with a graph of scheduler memory initial jump.

Through trial and error I found the memory level at which the scheduler was unusable. In my old work, I just checked the scheduler memory before submitting a graph. If the memory was too large, I killed it and started a new scheduler; and then submitted the graph on a fresh cluster.

@fjetter
Copy link
Member

fjetter commented Sep 19, 2023

The reproducer from the first post looks like this on my machine running on current main

I ran the reproducer in a loop (this screenshot after three runs).

Three observations

  • The graph submission takes memory. This is known and expected behavior. We're working on making this peak smaller but for now this is there to stay
  • There is a very weird blip (reproducible even) shortly after the run starts. I suspect this is related to GC but IDK
  • Squinting a little, I can see a memory increase of maybe 100MiB per run. I set the log lengths to a smaller value but there is also some lingering (but bound) state for the dashboard. This may be something but it may also be just memory fragmentation or internal state.

Screenshot 2023-09-19 at 10 33 58

This was produced on a M1 mac book. Memory behaves a little funny on OSX and I suspect that the weird peak and intermediate low memory is just a relict of OSX memory compression. I'll reproduce this on Linux shortly.

Recently I did a scheduler memory profiling for a large graph (see #7998 (comment)) that shows how the scheduler memory usage breaks out. Indeed, most of it is internal state that should be released or at the very least be reusable if it cannot be released due to fragmentation.

@fjetter
Copy link
Member

fjetter commented Sep 19, 2023

I ran this reproducer on Coiled for a couple of times
image
same view in grafana
image

seems to suggest that the memory is indeed stable after one or two runs at just above 2GB

I modified the reproducer mildly with

def arr_to_devnull(arr: da.Array):
    "Simulate storing an array to zarr, without writing anything (just drops every block once it's computed)"

    # NOTE: this class must be defined inside the function so it's cloudpickled as code,
    # otherwise `tests/utils_test` would have to be installed on the cluster.
    class _DevNull:
        def __setitem__(self, k, v):
            pass

    # TODO `da.store` should use blockwise to be much more efficient https://github.com/dask/dask/issues/9381
    return da.store(arr, _DevNull(), lock=False, compute=False)

instead of to_zarr so I was not actually storing anything to S3 or the like. afaik, fsspec/s3fs is caching things... maybe something is kept in memory there but this should not affect scheduler memory.

@chrisroat
Copy link
Contributor Author

Thanks for looking into this. I ran the graph 5 times, but the system memory plot didn't keep the full history. It does look like things become stable in the screengrab I could make, which shows the end of the 4th iteration and the full 5 iteration. It must be 1.1GB of fragmented/unreturned memory, with about 200MB of releasable memory used during graph computation. So this is much better than 1-2 years ago, when I last performed this sort of computation. FWIW, I am writing to local disk, not s3.

Screenshot 2023-09-20 at 09 46 38

Using your devnull writer (still with 5 iterations of the loop), I see different behavior from both the above graph and your graphs. First, it runs much faster. :) The memory usage stays much lower, but does jump on each run. It does seem to be stabilizing to 500MB or so. I don't see the peaks you highlight in your graphs.

Screenshot 2023-09-20 at 09 52 25

Note that I am using an older machine -- a 2017 MacBook Pro with 16GB RAM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests