Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak with Numpy arrays and the threaded scheduler #3530

Open
mrocklin opened this issue May 24, 2018 · 56 comments
Open

Memory leak with Numpy arrays and the threaded scheduler #3530

mrocklin opened this issue May 24, 2018 · 56 comments

Comments

@mrocklin
Copy link
Member

This example leaks around 500MB of memory on my machine when using the threaded scheduler, and almost no memory when using the single-threaded scheduler:

import dask.array as da
x = da.ones((2e4, 2e4), chunks=(2e4, 100))
y = x.rechunk((100, 2e4))
z = y.rechunk((2e4, 100))

import psutil
proc = psutil.Process()

from distributed.utils import format_bytes
print(format_bytes(proc.memory_info().rss))
# 80MB

from dask.diagnostics import ProgressBar
ProgressBar().register()

# z.sum().compute(scheduler='single-threaded')  # This doesn't cause problems
z.sum().compute(scheduler='threads')  # This leaks around 500MB of memory

print(format_bytes(proc.memory_info().rss))
# 500-600MB

Notebook

This doesn't happen when I run it with the single-threaded scheduler.

Calling gc.collect() doesn't help. Allocating a new large numpy array afterwards also doesn't take up the leaked memory, the number just climbs. Looking at the objects that Python knows about shows that there isn't much around:

from pympler import muppy
all_objects = muppy.get_objects()

from pympler import summary
sum1 = summary.summarize(all_objects)
summary.print_(sum1)                          
                                      types |   # objects |   total size
=========================================== | =========== | ============
                                <class 'str |       60517 |      8.75 MB
                               <class 'dict |       11991 |      5.30 MB
                               <class 'code |       19697 |      2.72 MB
                               <class 'type |        2228 |      2.24 MB
                              <class 'tuple |       16142 |      1.04 MB
                                <class 'set |        2285 |    858.84 KB
                               <class 'list |        7284 |    738.09 KB
                            <class 'weakref |        4412 |    344.69 KB
                        <class 'abc.ABCMeta |         261 |    263.54 KB
                        function (__init__) |        1378 |    183.02 KB
  <class 'traitlets.traitlets.MetaHasTraits |         180 |    175.45 KB
                 <class 'wrapper_descriptor |        2240 |    175.00 KB
                                <class 'int |        5584 |    168.92 KB
                  <class 'getset_descriptor |        2389 |    167.98 KB
            <class 'collections.OrderedDict |         292 |    141.00 KB

The local schedulers don't have any persistent state. My next step is to reproduce with the standard concurrent.futures module, but I thought I'd put this up early in case people have suggestions.

cc @shoyer @pitrou @njsmith

@pitrou
Copy link
Member

pitrou commented May 24, 2018

See discussion in https://bugs.python.org/issue33444

@mrocklin
Copy link
Member Author

Hrm, interesting. I set export MALLOC_ARENA_MAX=1, then start a jupyter notebook and work from there.

Sometimes I get no leak, sometimes I get a much bigger leak at 1.4GB.

@mrocklin
Copy link
Member Author

cc @stefanv and @mattip

@mrocklin
Copy link
Member Author

Also, as suggested by @stuartarchibald

mrocklin@carbon:~/workspace/play$ python memory-sharding.py 
52.20 MB
[########################################] | 100% Completed |  7.4s
568.71 MB
mrocklin@carbon:~/workspace/play$ export MALLOC_MMAP_THRESHOLD_=0

mrocklin@carbon:~/workspace/play$ python memory-sharding.py 
142.88 MB
[########################################] | 100% Completed |  7.3s
147.49 MB

@pitrou
Copy link
Member

pitrou commented May 25, 2018

You probably don't want 0 for MALLOC_MMAP_THRESHOLD_ but a round number of 4kB pages (for example 16384 bytes). Smaller allocations should probably still go through the normal allocator.

@mrocklin
Copy link
Member Author

@pitrou , long term what do you think should happen here? Can this issue be globally resolved through changes in CPython? Should Numpy change the way it calls malloc? Or can this not safely be resolved in libraries and we need to encourage people to set environment variables?

@pitrou
Copy link
Member

pitrou commented May 26, 2018

I think you couid add a documentation section about memory consumption issues and mention those environment variables there.

It's not Numpy's or Python's business to make global decisions about the system memory allocator's heuristics in the process they execute in, IMHO. What Dask/distributed could perhaps do is, on Linux, call C mallopt with the right arguments (such as mallopt(M_MMAP_THRESHOLD, 16*1024)). This will require something like a ctypes wrapper, but it should be trivial given the mallopt signature.

cc'ing @njsmith just in case.

@pitrou
Copy link
Member

pitrou commented May 26, 2018

From /usr/include/malloc.h:

/* mallopt options that actually do something */
#define M_TRIM_THRESHOLD    -1
#define M_TOP_PAD           -2
#define M_MMAP_THRESHOLD    -3
#define M_MMAP_MAX          -4
#define M_CHECK_ACTION      -5
#define M_PERTURB           -6
#define M_ARENA_TEST        -7
#define M_ARENA_MAX         -8

Here is an example call from Python:

>>> from ctypes import *
>>> libc = CDLL('libc.so.6')
>>> mallopt = libc.mallopt
>>> mallopt.argtypes = [c_int, c_int]
>>> mallopt.restype = c_int
>>> 
>>> M_MMAP_THRESHOLD = -3
# The following would return 0 on error
>>> mallopt(M_MMAP_THRESHOLD, 16*1024)
1

@mrocklin
Copy link
Member Author

Also, for comparison with MALLOC_ARENA_MAX=1

mrocklin@carbon:~/workspace/play$ python memory-sharding.py 
52.40 MB
[########################################] | 100% Completed |  6.9s
[########################################] | 100% Completed |  0.1s
760.98 MB
mrocklin@carbon:~/workspace/play$ export MALLOC_ARENA_MAX=1
mrocklin@carbon:~/workspace/play$ python memory-sharding.py 
52.08 MB
[########################################] | 100% Completed |  5.8s
[########################################] | 100% Completed |  0.1s
966.14 MB
mrocklin@carbon:~/workspace/play$ python memory-sharding.py 
52.21 MB
[########################################] | 100% Completed |  5.9s
[########################################] | 100% Completed |  0.1s
57.24 MB

@pitrou
Copy link
Member

pitrou commented May 26, 2018

Interesting. According to the mallopt man page:

Arenas are thread safe and therefore may have multiple concurrent memory requests. The trade-off is between the number of threads and the number of arenas. The more arenas you have, the lower the per-thread contention, but the higher the memory usage.

Have you tried something like MALLOC_MMAP_THRESHOLD_=16384? With MALLOC_MMAP_THRESHOLD_=0, your memory-sharding.py example starts at 142 MB instead of 52 MB, which doesn't seem desirable.

@mrocklin
Copy link
Member Author

Yes, this seems nicer.

mrocklin@carbon:~/workspace/play$ python memory-sharding.py 
52.22 MB
[########################################] | 100% Completed |  6.5s
[########################################] | 100% Completed |  0.1s
697.00 MB
mrocklin@carbon:~/workspace/play$ export MALLOC_MMAP_THRESHOLD_=16384
mrocklin@carbon:~/workspace/play$ python memory-sharding.py 
52.42 MB
[########################################] | 100% Completed |  7.3s
[########################################] | 100% Completed |  0.1s
56.50 MB

It does reliably seem to increase runtimes, though that would be acceptable.

@pitrou
Copy link
Member

pitrou commented May 26, 2018

One downside of mmap is that it can cost more CPU (for example because it always returns zero-initialized memory). That's why the libc is cautious about using it.

@pitrou
Copy link
Member

pitrou commented May 26, 2018

By the way, an orthogonal possibility would be for Anaconda to link its Python builds with an alternative malloc library such as jemalloc. Perhaps that would improve the numbers, or not.

@mrocklin
Copy link
Member Author

cc @mingwandroid

@stuartarchibald
Copy link
Contributor

RE #3530 (comment) setting MALLOC_MMAP_THRESHOLD_=0 was with view of illustrating the issue, in practice this isn't a good idea as mmap is expensive and most of the time isn't really needed, as noted by @pitrou.

The details of the behaviour of the mmap thresholds are in the comments of malloc.c.
There is a dynamic threshold level (MALLOC_MMAP_THRESHOLD_) that is used to adjust the balance between mmap and brk. Essentially memory requests over the MALLOC_MMAP_THRESHOLD_ size go to mmap, once this memory is unmapped the size of the unmapped chunk is fed back to the threshold so that brk is used next time for that size chunk. This is intended to have the effect of large long term allocations being mmap'd and large but transient allocations being initially mmap'd but subsequently brk'd. It seems like the whole point of this is to balance the cost of using mmap against irretrievably fragmented heaps and to help "size" the heap to the size of common transient allocation requests.

It should also be noted that if MALLOC_MMAP_THRESHOLD_ is employed the dynamic adjustment of the threshold is disabled. I imagine if a program's memory allocation requirement is distinctly bimodal with a "small" and a "very large" peak, then setting this to somewhere in the middle of the two peaks may have a good effect on heap memory usage as a fragmented heap is less likely.

In the explicit case above, this means that if the .compute() is called in e.g. a loop, then the dynamic threshold adjustment will yield a potentially high but long term relatively stable memory usage pattern as the first run through the loop will size the heap/thresholds and as such subsequent allocations should be quick and not necessarily fragment the heap further. In the case of a single run of this sort of operation the threshold adjustment may not have a noticeable benefit, as demonstrated by the original reported issue.

Experimenting with the above, the chunks are (100*2e4*8) bytes (just under 16Mb), therefore setting MALLOC_MMAP_THRESHOLD_=14680064 (14Mb), should force the chunks to mmap from the outset and yield a small RSS. Locally this gives RSS 107Mb before and 122Mb after the call to .compute().

It is also worth noting:

  • mallinfo only sees the main arena, so in a multithreaded context may not be ideal.
  • That some members of the mallinfo struct are typed as int but implemented using long, so are subject to overflow/wraparound and may well be "inaccurate".

Thoughts on any of this are most welcome...!

Also, here's some somewhat hacky but functioning piece of code that records and plots mallinfo across a few iterations of the .compute(). Needs pandas and matplotlib along with the usual. Thanks to @pitrou for the MallInfo binding :)

import ctypes
import gc
import threading
import time
import datetime
from pandas import DataFrame as df
import matplotlib.pyplot as plt
from distributed.utils import format_bytes
import dask.array as da
import psutil


class MallInfo(ctypes.Structure):
    _fields_ = [(name, ctypes.c_int)
                for name in ('arena', 'ordblks', 'smblks', 'hblks', 'hblkhd',
                             'usmblks', 'fsmblks', 'uordblks', 'fordblks',
                             'keepcost')]


libc = ctypes.CDLL("libc.so.6")
mallinfo = libc.mallinfo
mallinfo.argtypes = []
mallinfo.restype = MallInfo


def get_mallinfo():
    info = mallinfo()
    fields = [(name, getattr(info, name)) for name, _ in info._fields_]
    return {k: v for k, v in fields}


def show_mallinfo():
    print("Malloc info:")
    for name, value in get_mallinfo().items():
        print(f"- {name}: {value}")
    print("")


show_mallinfo()
x = da.ones((2e4, 2e4), chunks=(2e4, 220))
y = x.rechunk((100, 2e4))
z = y.rechunk((2e4, 100))

proc = psutil.Process()

print(format_bytes(proc.memory_info().rss))
print(proc.memory_full_info())


def malloc_spy(signal, wait, ticks, current_op):
    while not signal.wait(1e-4):
        now = datetime.datetime.now()
        minfo = get_mallinfo()
        # add in current op info
        minfo['current_op'] = current_op['name']
        ticks.append((now, minfo))
        time.sleep(wait)


signal = threading.Event()  # to signal stop
ticks = []  # mutable, holds ticks
current_op = {'name': -1}  # mutable, current operation/iteration

thr = threading.Thread(
    target=malloc_spy,
    args=(
        signal,
        1e-3,
        ticks,
        current_op))
thr.start()

niterations = 5

for i in range(niterations):
    current_op['name'] = i
    print(str(i).center(80, '-'))
    z.sum().compute(scheduler='threads')
    print(format_bytes(proc.memory_info().rss))
    print(proc.memory_full_info())

# kill the malloc_spy
signal.set()
thr.join()

# process into a plot
info = df.from_dict({x[0]: x[1] for x in ticks}).transpose()
info.plot(subplots=True, style='.-')
plt.show() # hblkhd looks strange due to signed int overflow

@mattip
Copy link

mattip commented May 30, 2018

What python are you using? See this issue in cpython, which was fixed for python 3.3 but not for python 2.7
https://bugs.python.org/issue11849

@mrocklin
Copy link
Member Author

mrocklin commented May 30, 2018 via email

@mrocklin
Copy link
Member Author

By the way, an orthogonal possibility would be for Anaconda to link its Python builds with an alternative malloc library such as jemalloc. Perhaps that would improve the numbers, or not.

@mattip and I were talking about this option, and also discussing the possibility of doing it within NumPy itself rather than Anaconda.

@pitrou do you have any thoughts or concerns about a library like NumPy using jemalloc?

@jakirkham
Copy link
Member

Also seeing a leak. Prints out 655.65 MB at the end. Running on macOS 10.11 using 2.3 GHz Intel Core i7.

Environment:
name: test
channels:
  - conda-forge
  - defaults
dependencies:
  - alabaster=0.7.10=py36_1
  - appnope=0.1.0=py36_0
  - asciitree=0.3.3=py36_1
  - asn1crypto=0.24.0=py36_0
  - babel=2.5.3=py36_0
  - backcall=0.1.0=py_0
  - backports=1.0=py36_1
  - backports.functools_lru_cache=1.5=py36_0
  - backports.shutil_get_terminal_size=1.0.0=py_3
  - bkcharts=0.2=py36_0
  - blas=1.1=openblas
  - bleach=2.1.3=py_0
  - bokeh=0.12.15=py36_0
  - boost=1.66.0=py36_1
  - boost-cpp=1.66.0=1
  - boto3=1.7.21=py_0
  - botocore=1.10.21=py_0
  - bottleneck=1.2.1=py36_1
  - bzip2=1.0.6=1
  - ca-certificates=2018.4.16=0
  - certifi=2018.4.16=py36_0
  - cffi=1.11.5=py36_0
  - chardet=3.0.4=py36_0
  - clangdev=6.0.0=default_0
  - click=6.7=py_1
  - cloudpickle=0.5.3=py_0
  - coverage=4.5.1=py36_0
  - cryptography=2.2.1=py36_0
  - cycler=0.10.0=py36_0
  - cytoolz=0.9.0.1=py36_0
  - dask=0.17.4=py_0
  - dask-core=0.17.4=py_0
  - dask-distance=0.2.0=py_0
  - dask-glm=0.1.0=0
  - dask-imread=0.1.1=py36_0
  - dask-ml=0.4.1=py36_0
  - dask-ndfilters=0.1.2=py36_0
  - dask-ndfourier=0.1.2=py36_0
  - dask-ndmeasure=0.1.1=py36_0
  - dask-ndmorph=0.1.1=py36_0
  - dask-searchcv=0.2.0=py_0
  - dbus=1.11.0=0
  - decorator=4.3.0=py_0
  - distributed=1.21.8=py36_0
  - docutils=0.14=py36_0
  - entrypoints=0.2.3=py36_1
  - expat=2.2.5=0
  - fasteners=0.14.1=py36_2
  - fftw=3.3.7=0
  - freetype=2.8.1=0
  - future=0.16.0=py36_0
  - gettext=0.19.8.1=0
  - glances=2.11.1=py_0
  - glib=2.55.0=0
  - graphviz=2.38.0=7
  - h5py=2.8.0=py36h470a237_0
  - hdf5=1.10.1=2
  - heapdict=1.0.0=py36_0
  - html5lib=1.0.1=py_0
  - icu=58.2=0
  - idna=2.6=py36_1
  - imageio=2.3.0=py36_0
  - imagesize=1.0.0=py36_0
  - imgroi=0.0.2=py36_0
  - ipykernel=4.8.2=py36_0
  - ipyparallel=6.0.2=py36_0
  - ipython=6.4.0=py36_0
  - ipython_genutils=0.2.0=py36_0
  - ipywidgets=7.2.1=py36_1
  - jbig=2.1=0
  - jedi=0.12.0=py36_0
  - jinja2=2.10=py36_0
  - jmespath=0.9.3=py36_0
  - jpeg=9b=2
  - jq=1.5=4
  - jsonschema=2.6.0=py36_1
  - jupyter_client=5.2.3=py36_0
  - jupyter_contrib_core=0.3.3=py36_1
  - jupyter_contrib_nbextensions=0.5.0=py36_0
  - jupyter_core=4.4.0=py_0
  - jupyter_highlight_selected_word=0.2.0=py36_0
  - jupyter_latex_envs=1.4.4=py36_0
  - jupyter_nbextensions_configurator=0.4.0=py36_0
  - kenjutsu=0.5.1=py36_0
  - kiwisolver=1.0.1=py36_1
  - libcxx=6.0.0=0
  - libedit=3.1.20170329=0
  - libffi=3.2.1=3
  - libgfortran=3.0.0=0
  - libiconv=1.15=0
  - libpng=1.6.34=0
  - libsodium=1.0.16=0
  - libtiff=4.0.9=0
  - libxml2=2.9.8=0
  - libxslt=1.1.32=0
  - line_profiler=2.1.2=py36_0
  - llvm-meta=6.0.0=0
  - llvmdev=6.0.0=default_2
  - locket=0.2.0=py36_1
  - lxml=4.2.1=py36_0
  - mahotas=1.4.4=py36_0
  - markupsafe=1.0=py36_0
  - matplotlib=2.2.2=py36_1
  - metawrap=0.0.2=py36_0
  - mistune=0.8.3=py36_1
  - mock=2.0.0=py36_0
  - monotonic=1.5=py_0
  - mplview=0.0.5=py_0
  - msgpack-python=0.5.6=py36h2d50403_2
  - multipledispatch=0.5.0=py36_0
  - nbconvert=5.3.1=py_1
  - nbformat=4.4.0=py36_0
  - ncurses=5.9=10
  - networkx=2.1=py36_0
  - nose=1.3.7=py36_2
  - notebook=5.5.0=py36_0
  - npctypes=0.0.4=py36_0
  - numcodecs=0.5.5=py36_0
  - numpy=1.14.3=py36_blas_openblas_200
  - olefile=0.45.1=py36_0
  - oniguruma=6.8.0=0
  - openblas=0.2.20=7
  - openssl=1.0.2o=0
  - packaging=17.1=py_0
  - pandas=0.23.0=py36_0
  - pandoc=2.2.1=hde52d81_0
  - pandocfilters=1.4.2=py36_0
  - parso=0.2.0=py_0
  - partd=0.3.8=py36_0
  - pbr=4.0.2=py_0
  - pcre=8.41=1
  - pexpect=4.5.0=py36_0
  - pickleshare=0.7.4=py36_0
  - pillow=5.1.0=py36_0
  - pims=0.4.1=py_1
  - pip=9.0.3=py36_0
  - prompt_toolkit=1.0.15=py36_0
  - psutil=5.4.5=py36_0
  - ptyprocess=0.5.2=py36_0
  - pycparser=2.18=py36_0
  - pyfftw=0.10.4=py36_3
  - pygments=2.2.0=py36_0
  - pyopenssl=17.5.0=py36_1
  - pyparsing=2.2.0=py36_0
  - pyqt=5.6.0=py36_5
  - pysocks=1.6.8=py36_1
  - python=3.6.5=1
  - python-dateutil=2.7.3=py_0
  - python-graphviz=0.8.3=py36_0
  - python-spams=2.6=py36_blas_openblas_203
  - python.app=1.2=py36_0
  - pytz=2018.4=py_0
  - pywavelets=0.5.2=py36_1
  - pyyaml=3.12=py36_1
  - pyzmq=17.0.0=py36_4
  - qt=5.6.2=h9e3eb04_4
  - rank_filter=0.4.15=py36_0
  - readline=7.0=0
  - requests=2.18.4=py36_1
  - runipy=0.1.5=py36_1
  - s3fs=0.1.5=py_0
  - s3transfer=0.1.13=py36_0
  - scikit-image=0.13.1=py36_0
  - scikit-learn=0.19.1=py36_blas_openblas_201
  - scipy=1.1.0=py36_blas_openblas_200
  - send2trash=1.5.0=py_0
  - setuptools=39.1.0=py36_0
  - simplegeneric=0.8.1=py36_0
  - sip=4.18=py36_1
  - six=1.11.0=py36_1
  - slicerator=0.9.8=py_1
  - snowballstemmer=1.2.1=py36_0
  - sortedcontainers=1.5.10=py36_0
  - sphinx=1.7.4=py36_0
  - sphinxcontrib-websupport=1.0.1=py36_0
  - sqlite=3.20.1=2
  - tblib=1.3.2=py36_0
  - terminado=0.8.1=py36_0
  - testpath=0.3.1=py36_0
  - tifffile=0.14.0=py36_1
  - tk=8.6.7=0
  - toolz=0.9.0=py_0
  - tornado=5.0.2=py36_0
  - traitlets=4.3.2=py36_0
  - typing=3.6.4=py36_0
  - unidecode=1.0.22=py36_0
  - urllib3=1.22=py36_0
  - vigra=1.11.1=py36_6
  - wcwidth=0.1.7=py36_0
  - webcolors=1.8.1=py_0
  - webencodings=0.5.1=py36_0
  - wheel=0.31.0=py36_0
  - widgetsnbextension=3.2.1=py36_0
  - xnumpy=0.1.2=py36_0
  - xz=5.2.3=0
  - yail=0.0.2=py36_0
  - yaml=0.1.7=0
  - zarr=2.2.0=py_1
  - zeromq=4.2.5=1
  - zict=0.1.3=py_0
  - zlib=1.2.11=0
  - libgcc=4.8.5=1

@jakirkham
Copy link
Member

Any thoughts on the usage of jemalloc and possibly building things to use it, @mingwandroid?

@mattip
Copy link

mattip commented May 30, 2018

Just to be clear, currently NumPy uses PyObject_Malloc to allocate its basic PyArrayObject (in array_alloc), but calls malloc via a very small caching mechanism to allocate dims, strides, and data fields (in npy_alloc_cache). PyObject_Malloc uses the host python's pool management. So there are at least these things in play:

  • python's pool mechanism from PyObject_Malloc
  • numpy's caching mechanism for dims, strides, and small blocks of data
  • large data blocks allocated via malloc
  • malloc's own arena mechanism

@mrocklin
Copy link
Member Author

In conversation with @njsmith he raised a concern with the idea that Numpy might use a different malloc implementation than Python. Python might allocate some memory and then pass that to Numpy. Numpy would need to remember to call the right free based on if it allocated the data or someone else did. This seems error prone.

@stefanv
Copy link
Contributor

stefanv commented May 31, 2018 via email

@mrocklin
Copy link
Member Author

This might happen in Dask when we get bytes from a socket and use those to create a Numpy array.

@mattip
Copy link

mattip commented May 31, 2018

IMO then that buffer should become the base property of the ndarray, allowing the buffer to manage itself, unless someone is messing directly with the PyArrayObject_field's data pointer on the C level

@stefanv
Copy link
Contributor

stefanv commented May 31, 2018

Right, so when does the parent allocator "give up" control of its own memory, forcing NumPy to take on its management?

@njsmith
Copy link

njsmith commented May 31, 2018

After reminding myself of how this is supposed to work: in theory everyone is supposed to use PyDataMem_NEW, so it might be fine. But there are enough weird C API functions (including e.g. ones that take a data point and construct an array around it), and people who like to play games with raw C struct access, that it's something you'd want to double-check...

@mrocklin A silly but possibly simple workaround you might want to look into: what if you occasionally kill the threads and restart them? (Spawning a new thread is very fast, like ~100 µs on my laptop.)

@jakirkham
Copy link
Member

jakirkham commented May 31, 2018

FWIW people do see this issue outside the context of NumPy.

Here's a nice article discussing this issue with Python and Celery. They also conclude memory fragmentation is the problem and compare using jemalloc to system malloc. Unfortunately they didn't share test code, but did show how they invoked it.

Also here's a bunch of links to similar sounding problems. Have skimmed a few, but not all.

@chrisroat
Copy link
Contributor

Are there strategies for avoiding this leak? I'm using dask gateway, and find the scheduler's leak seems to make the scheduler ineffective after a few GB of leak (I've raised my container resources to accommodate the growth). My graph is only ~70k tasks, and I want to process a few hundred of them. My strategy is to launch a new cluster for every 4-5 graphs, which seems to be the max I can push to the scheduler.

Are there certain configurations that tickle this problem? I've also tried the runtime env var fixes:

          - name: "MALLOC_MMAP_THRESHOLD_"
            value: "16384"
          - name: "LD_PRELOAD"
            value: "libjemalloc.so.1"

It's a simple graph that reads in a volume, transposes/rechunks (to output cloud-volume format) and calculates some statistics.

import zarr
from dask.bytes.core import get_mapper

def make_tasks(path_out):
    data = xr.DataArray(da.zeros((5, 201, 7900, 7900), dtype=np.uint16, chunks=(1, 128, 512, 512)), dims=['channel', 'z', 'y', 'x'])
    bins = list(range(65536 + 1))  # +1 for upper edge of bin
    hist = da.stack([da.histogram(arr_chn, bins=bins)[0] for arr_chn in data])
    task_hist = hist.to_zarr(path_out + '/hist.zarr', compute=False)
    task_bins = da.asarray(bins).to_zarr(path_out + '/bins.zarr', compute=False)

    data = data.chunk({'channel': -1, 'z': 16, 'y': 256, 'x': 256})
    arr = data.data
    zstore = zarr.create(shape=arr.shape, chunks=arr.chunksize, dtype=arr.dtype, store=get_mapper(path_out), overwrite=True)
    task_store = arr.store(zstore, lock=False, compute=False, return_stored=False)

    return [task_hist, task_bins, task_store]

@kuraga
Copy link

kuraga commented Jun 11, 2020

glibc.malloc.mxfast tunable has been introduced in Glibc (https://www.gnu.org/software/libc/manual/html_node/Memory-Allocation-Tunables.html).

@TomAugspurger
Copy link
Member

TomAugspurger commented Jun 11, 2020

I'm using dask gateway, and find the scheduler's leak seems to make the scheduler ineffective after a few GB of leak

@chrisroat IIRC, I don't think this issue could happen on the scheduler, since Dask's scheduler is single-threaded and the issues here are only observed with multiple threads (as can happen on Dask workers or the threaded scheduler).

edit: And looking at your workflow a bit, you might be interested in https://github.com/pangeo-data/rechunker once it's ready for use.

@kuraga thanks for that link.

@chrisroat
Copy link
Contributor

Thanks for the clarification, and the pointer to the rechunker. My chunks are actually written in cloud-volume (which is then downsampled by an igneous cluster). The zarr output was just for the example.

I do not understand why I see so much memory growth, which is present even after all futures released and the client reset.

@ahirner
Copy link

ahirner commented Dec 27, 2020

Technically, the code snippet above doesn't prove memory leaks because usage doesn't increase (nearly) monotonic.

import psutil
import sys

import dask
import dask.array as da
from distributed.utils import format_bytes

def main():
    proc = psutil.Process()
    print(format_bytes(proc.memory_info().rss))
    # 54MB ubuntu, numpy 1.16.4/1.19.4
    # 48MB mac os, numpy 1.19.4

    x = da.ones((2e3, 2e4), chunks=(2e3, 100))
    y = x.rechunk((100, 2e3))
    z = y.rechunk((2e3, 100))

    for i in range(10):
        z.sum().compute()
        print(format_bytes(proc.memory_info().rss), i)
        # 264-319MB ubuntu, numpy 1.16.4/1.19.4 (54MB with single-threaded)
        #                   ca. +60MB per thread
        #  82-209MB mac os, numpy 1.19.4 (120MB with single-threaded)

if __name__=="__main__":
    num_workers = None if len(sys.argv) <= 1 else int(sys.argv[1])
    with dask.config.set(scheduler='threads', num_workers=num_workers):
        main()
        main()

    try:
        import mkl
        print("pre", mkl.mem_stat())
        mkl.free_buffers()
        print("post", mkl.mem_stat())
    except ImportError:
        print("If mkl is used for numpy, you might see drop in final memory "
              "use after `conda install mkl-service`")

    proc = psutil.Process()
    print(format_bytes(proc.memory_info().rss), "FIN")

Blas libs do their own memory pooling. Some of that in thread local storage.
I'm wondering if inspection beyond gc can shine light on those allocations. My env with mkl-service and numpy linking mkl yielded {'allocated_bytes': 0, 'allocated_buffers': 0} unexpectedly in both print statements.

@alimanfoo
Copy link
Contributor

Just thought I'd mention that I've been observing apparent memory leaks when using dask array on a kubernetes cluster, several GB was accumulating in the main process where the client and scheduler are running despite no large results being computed into memory. Memory usage remained after closing client and cluster. Clearing scheduler logs freed a little memory but did not resolve the main leak.

I found that setting MALLOC_MMAP_THRESHOLD_=16384 substantially improves this, i.e., now I get ~200MB where I was getting ~2GB leaking.

@MatthewLennie
Copy link

Forgive a naiive question here, this is not at all my wheel house, but does reshuffling/rechunking/unify_chunks etc. exacerbate these issues?

Reshuffling chunks causes a whole heap of new memory allocations and perhaps importantly, perhaps partially emptying some blocks of memory? Is this the cause of the interleaving of memory described by @stuartarchibald?

Is there any thing that can be done at the python level within dask to alleviate these issues. i.e. a memory defragment step that can be added to the rechunking operations? Does creating a deep copy of each chunk and deleting the old copy work? or something else similarly crude like that? Thanks for any patient answers.

@chakpak
Copy link

chakpak commented Nov 15, 2021

This seems to be a long standing issue. Is anyone working on a clean fix addressing root cause? Or just looking for workarounds for now?

@gjoseph92
Copy link
Collaborator

@chakpak I believe the tl;dr on this is that it's neither Dask's nor NumPy's problem, and not something we can safely resolve here, but instead an issue with the underlying C-level memory allocator and its heuristics for when to release memory back to the OS.

Therefore, I'd suggest either adjusting the settings for the allocator if you're on linux:

$ MALLOC_TRIM_THRESHOLD_=0 python my_script.py ...

Or trying a different allocator if you're on macOS:

$ brew install jemalloc
$ DYLD_INSERT_LIBRARIES=$(brew --prefix jemalloc)/lib/libjemalloc.dylib python my_script.py ...

Read this for more information: https://distributed.dask.org/en/latest/worker.html#memory-not-released-back-to-the-os

But a critical thing to note with this issue is that NumPy/Python/Dask isn't actually using too much memory. There's no leak.

It's just a bookkeeping issue. The Python process is hanging onto a bunch of memory it's not actually using at the moment. But if it needs more memory again, it should re-use that memory without asking the OS for more. The alternative (when you set MALLOC_TRIM_THRESHOLD_=0) is that unused memory gets returned to the OS immediately, but then the process has to ask the OS for memory again if it needs more in the future:
memory

If multiple important processes on the machine are all sharing memory, then having dask hog it is not ideal. However, if dask is the only major thing running (say, on a VM in AWS), then it may not actually matter that this unused memory doesn't get released back to the OS, because nothing else needs it.

@chakpak
Copy link

chakpak commented Nov 16, 2021

@gjoseph92 this was a very good explanation. Thanks for following up. This makes sense and we will experiment further.

@joni-herttuainen
Copy link

We have been suffering from the same issue, only in our case, we are also using dask-mpi and mpi4py for multi-node parallelization of complex computations.

We are also using pandas and numpy in our computations so those might be related. We were able to lessen the effect of the issue by running all the functions in subprocess, like this

def run_in_subprocess(it, func):
    def return_in_queue(queue, func, it):
        queue.put(func(it))

    queue = multiprocessing.Queue()
    process = multiprocessing.Process(target=return_in_queue, args=(queue, func, it))
    process.start()
    ret = queue.get()
    process.join()

    return ret

and then calling it as follows

result = dask.bag.from_sequence(it, partition_size=1).map(run_in_subprocess, func)
ret = result.compute()

in which func is the actual function we want to parallelize and it is the iterable input (2D numpy array in this case) and return value is a pandas DataFrame.

To my understanding (and please do correct me if I'm mistaken), by running the code in a subprocess, each worker only has the input data, given function and the return value from the subprocess in their memory as everything else is handled in a forked subprocess.

On top of that I've added all suggested mitigation methods found in different issues related to the "leak" including:

  • clearing the logs
  • MALLOC_TRIM_THRESHOLD_=0
  • the manual libc.malloc_trim(0)

Even then, after the run, and even after deleting the bag and the return values from compute, the total memory usage reported by the dask dashbard is 3.3Gb higher than it was before the computation after idling (sleeping) for 10 to 15 minutes. This amount of memory usage is also backed up by htop. These numbers are acquired with 48 workers (50-2) in a single node.

I might also add that the input size also matters. Running the same with 10% of the input data resulted in smaller memory usage after the runs (somewhere around 2Gb of extra, IIRC), which seems to indicate that the used memory cumulates.

I also run the (modified version suitable for dask_mpi) example provided in this comment of dask.distributed issues and can concur the memory is not released back to the os even with the mitigation methods mentioned above.

Used software versions:

dask==2021.11.2
dask-mpi==2021.11.0
distributed==2021.11.2
mpi4py==3.1.3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests