-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory leak with Numpy arrays and the threaded scheduler #3530
Comments
See discussion in https://bugs.python.org/issue33444 |
Hrm, interesting. I set Sometimes I get no leak, sometimes I get a much bigger leak at 1.4GB. |
Also, as suggested by @stuartarchibald
|
You probably don't want 0 for |
@pitrou , long term what do you think should happen here? Can this issue be globally resolved through changes in CPython? Should Numpy change the way it calls |
I think you couid add a documentation section about memory consumption issues and mention those environment variables there. It's not Numpy's or Python's business to make global decisions about the system memory allocator's heuristics in the process they execute in, IMHO. What Dask/distributed could perhaps do is, on Linux, call C cc'ing @njsmith just in case. |
From /* mallopt options that actually do something */
#define M_TRIM_THRESHOLD -1
#define M_TOP_PAD -2
#define M_MMAP_THRESHOLD -3
#define M_MMAP_MAX -4
#define M_CHECK_ACTION -5
#define M_PERTURB -6
#define M_ARENA_TEST -7
#define M_ARENA_MAX -8 Here is an example call from Python:
|
Also, for comparison with
|
Interesting. According to the
Have you tried something like |
Yes, this seems nicer.
It does reliably seem to increase runtimes, though that would be acceptable. |
One downside of |
By the way, an orthogonal possibility would be for Anaconda to link its Python builds with an alternative malloc library such as jemalloc. Perhaps that would improve the numbers, or not. |
RE #3530 (comment) setting The details of the behaviour of the It should also be noted that if In the explicit case above, this means that if the Experimenting with the above, the chunks are It is also worth noting:
Thoughts on any of this are most welcome...! Also, here's some somewhat hacky but functioning piece of code that records and plots import ctypes
import gc
import threading
import time
import datetime
from pandas import DataFrame as df
import matplotlib.pyplot as plt
from distributed.utils import format_bytes
import dask.array as da
import psutil
class MallInfo(ctypes.Structure):
_fields_ = [(name, ctypes.c_int)
for name in ('arena', 'ordblks', 'smblks', 'hblks', 'hblkhd',
'usmblks', 'fsmblks', 'uordblks', 'fordblks',
'keepcost')]
libc = ctypes.CDLL("libc.so.6")
mallinfo = libc.mallinfo
mallinfo.argtypes = []
mallinfo.restype = MallInfo
def get_mallinfo():
info = mallinfo()
fields = [(name, getattr(info, name)) for name, _ in info._fields_]
return {k: v for k, v in fields}
def show_mallinfo():
print("Malloc info:")
for name, value in get_mallinfo().items():
print(f"- {name}: {value}")
print("")
show_mallinfo()
x = da.ones((2e4, 2e4), chunks=(2e4, 220))
y = x.rechunk((100, 2e4))
z = y.rechunk((2e4, 100))
proc = psutil.Process()
print(format_bytes(proc.memory_info().rss))
print(proc.memory_full_info())
def malloc_spy(signal, wait, ticks, current_op):
while not signal.wait(1e-4):
now = datetime.datetime.now()
minfo = get_mallinfo()
# add in current op info
minfo['current_op'] = current_op['name']
ticks.append((now, minfo))
time.sleep(wait)
signal = threading.Event() # to signal stop
ticks = [] # mutable, holds ticks
current_op = {'name': -1} # mutable, current operation/iteration
thr = threading.Thread(
target=malloc_spy,
args=(
signal,
1e-3,
ticks,
current_op))
thr.start()
niterations = 5
for i in range(niterations):
current_op['name'] = i
print(str(i).center(80, '-'))
z.sum().compute(scheduler='threads')
print(format_bytes(proc.memory_info().rss))
print(proc.memory_full_info())
# kill the malloc_spy
signal.set()
thr.join()
# process into a plot
info = df.from_dict({x[0]: x[1] for x in ticks}).transpose()
info.plot(subplots=True, style='.-')
plt.show() # hblkhd looks strange due to signed int overflow |
What python are you using? See this issue in cpython, which was fixed for python 3.3 but not for python 2.7 |
3.6
…On Wed, May 30, 2018 at 1:19 AM, Matti Picus ***@***.***> wrote:
What python are you using? See this issue in cpython, which was fixed for
python 3.3 but not for python 2.7
https://bugs.python.org/issue11849
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#3530 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszIjrP2Cbm9jwSWFa30tnaZKsSHL2ks5t3ivhgaJpZM4UMvR7>
.
|
@mattip and I were talking about this option, and also discussing the possibility of doing it within NumPy itself rather than Anaconda. @pitrou do you have any thoughts or concerns about a library like NumPy using jemalloc? |
Also seeing a leak. Prints out 655.65 MB at the end. Running on macOS 10.11 using 2.3 GHz Intel Core i7. Environment:name: test
channels:
- conda-forge
- defaults
dependencies:
- alabaster=0.7.10=py36_1
- appnope=0.1.0=py36_0
- asciitree=0.3.3=py36_1
- asn1crypto=0.24.0=py36_0
- babel=2.5.3=py36_0
- backcall=0.1.0=py_0
- backports=1.0=py36_1
- backports.functools_lru_cache=1.5=py36_0
- backports.shutil_get_terminal_size=1.0.0=py_3
- bkcharts=0.2=py36_0
- blas=1.1=openblas
- bleach=2.1.3=py_0
- bokeh=0.12.15=py36_0
- boost=1.66.0=py36_1
- boost-cpp=1.66.0=1
- boto3=1.7.21=py_0
- botocore=1.10.21=py_0
- bottleneck=1.2.1=py36_1
- bzip2=1.0.6=1
- ca-certificates=2018.4.16=0
- certifi=2018.4.16=py36_0
- cffi=1.11.5=py36_0
- chardet=3.0.4=py36_0
- clangdev=6.0.0=default_0
- click=6.7=py_1
- cloudpickle=0.5.3=py_0
- coverage=4.5.1=py36_0
- cryptography=2.2.1=py36_0
- cycler=0.10.0=py36_0
- cytoolz=0.9.0.1=py36_0
- dask=0.17.4=py_0
- dask-core=0.17.4=py_0
- dask-distance=0.2.0=py_0
- dask-glm=0.1.0=0
- dask-imread=0.1.1=py36_0
- dask-ml=0.4.1=py36_0
- dask-ndfilters=0.1.2=py36_0
- dask-ndfourier=0.1.2=py36_0
- dask-ndmeasure=0.1.1=py36_0
- dask-ndmorph=0.1.1=py36_0
- dask-searchcv=0.2.0=py_0
- dbus=1.11.0=0
- decorator=4.3.0=py_0
- distributed=1.21.8=py36_0
- docutils=0.14=py36_0
- entrypoints=0.2.3=py36_1
- expat=2.2.5=0
- fasteners=0.14.1=py36_2
- fftw=3.3.7=0
- freetype=2.8.1=0
- future=0.16.0=py36_0
- gettext=0.19.8.1=0
- glances=2.11.1=py_0
- glib=2.55.0=0
- graphviz=2.38.0=7
- h5py=2.8.0=py36h470a237_0
- hdf5=1.10.1=2
- heapdict=1.0.0=py36_0
- html5lib=1.0.1=py_0
- icu=58.2=0
- idna=2.6=py36_1
- imageio=2.3.0=py36_0
- imagesize=1.0.0=py36_0
- imgroi=0.0.2=py36_0
- ipykernel=4.8.2=py36_0
- ipyparallel=6.0.2=py36_0
- ipython=6.4.0=py36_0
- ipython_genutils=0.2.0=py36_0
- ipywidgets=7.2.1=py36_1
- jbig=2.1=0
- jedi=0.12.0=py36_0
- jinja2=2.10=py36_0
- jmespath=0.9.3=py36_0
- jpeg=9b=2
- jq=1.5=4
- jsonschema=2.6.0=py36_1
- jupyter_client=5.2.3=py36_0
- jupyter_contrib_core=0.3.3=py36_1
- jupyter_contrib_nbextensions=0.5.0=py36_0
- jupyter_core=4.4.0=py_0
- jupyter_highlight_selected_word=0.2.0=py36_0
- jupyter_latex_envs=1.4.4=py36_0
- jupyter_nbextensions_configurator=0.4.0=py36_0
- kenjutsu=0.5.1=py36_0
- kiwisolver=1.0.1=py36_1
- libcxx=6.0.0=0
- libedit=3.1.20170329=0
- libffi=3.2.1=3
- libgfortran=3.0.0=0
- libiconv=1.15=0
- libpng=1.6.34=0
- libsodium=1.0.16=0
- libtiff=4.0.9=0
- libxml2=2.9.8=0
- libxslt=1.1.32=0
- line_profiler=2.1.2=py36_0
- llvm-meta=6.0.0=0
- llvmdev=6.0.0=default_2
- locket=0.2.0=py36_1
- lxml=4.2.1=py36_0
- mahotas=1.4.4=py36_0
- markupsafe=1.0=py36_0
- matplotlib=2.2.2=py36_1
- metawrap=0.0.2=py36_0
- mistune=0.8.3=py36_1
- mock=2.0.0=py36_0
- monotonic=1.5=py_0
- mplview=0.0.5=py_0
- msgpack-python=0.5.6=py36h2d50403_2
- multipledispatch=0.5.0=py36_0
- nbconvert=5.3.1=py_1
- nbformat=4.4.0=py36_0
- ncurses=5.9=10
- networkx=2.1=py36_0
- nose=1.3.7=py36_2
- notebook=5.5.0=py36_0
- npctypes=0.0.4=py36_0
- numcodecs=0.5.5=py36_0
- numpy=1.14.3=py36_blas_openblas_200
- olefile=0.45.1=py36_0
- oniguruma=6.8.0=0
- openblas=0.2.20=7
- openssl=1.0.2o=0
- packaging=17.1=py_0
- pandas=0.23.0=py36_0
- pandoc=2.2.1=hde52d81_0
- pandocfilters=1.4.2=py36_0
- parso=0.2.0=py_0
- partd=0.3.8=py36_0
- pbr=4.0.2=py_0
- pcre=8.41=1
- pexpect=4.5.0=py36_0
- pickleshare=0.7.4=py36_0
- pillow=5.1.0=py36_0
- pims=0.4.1=py_1
- pip=9.0.3=py36_0
- prompt_toolkit=1.0.15=py36_0
- psutil=5.4.5=py36_0
- ptyprocess=0.5.2=py36_0
- pycparser=2.18=py36_0
- pyfftw=0.10.4=py36_3
- pygments=2.2.0=py36_0
- pyopenssl=17.5.0=py36_1
- pyparsing=2.2.0=py36_0
- pyqt=5.6.0=py36_5
- pysocks=1.6.8=py36_1
- python=3.6.5=1
- python-dateutil=2.7.3=py_0
- python-graphviz=0.8.3=py36_0
- python-spams=2.6=py36_blas_openblas_203
- python.app=1.2=py36_0
- pytz=2018.4=py_0
- pywavelets=0.5.2=py36_1
- pyyaml=3.12=py36_1
- pyzmq=17.0.0=py36_4
- qt=5.6.2=h9e3eb04_4
- rank_filter=0.4.15=py36_0
- readline=7.0=0
- requests=2.18.4=py36_1
- runipy=0.1.5=py36_1
- s3fs=0.1.5=py_0
- s3transfer=0.1.13=py36_0
- scikit-image=0.13.1=py36_0
- scikit-learn=0.19.1=py36_blas_openblas_201
- scipy=1.1.0=py36_blas_openblas_200
- send2trash=1.5.0=py_0
- setuptools=39.1.0=py36_0
- simplegeneric=0.8.1=py36_0
- sip=4.18=py36_1
- six=1.11.0=py36_1
- slicerator=0.9.8=py_1
- snowballstemmer=1.2.1=py36_0
- sortedcontainers=1.5.10=py36_0
- sphinx=1.7.4=py36_0
- sphinxcontrib-websupport=1.0.1=py36_0
- sqlite=3.20.1=2
- tblib=1.3.2=py36_0
- terminado=0.8.1=py36_0
- testpath=0.3.1=py36_0
- tifffile=0.14.0=py36_1
- tk=8.6.7=0
- toolz=0.9.0=py_0
- tornado=5.0.2=py36_0
- traitlets=4.3.2=py36_0
- typing=3.6.4=py36_0
- unidecode=1.0.22=py36_0
- urllib3=1.22=py36_0
- vigra=1.11.1=py36_6
- wcwidth=0.1.7=py36_0
- webcolors=1.8.1=py_0
- webencodings=0.5.1=py36_0
- wheel=0.31.0=py36_0
- widgetsnbextension=3.2.1=py36_0
- xnumpy=0.1.2=py36_0
- xz=5.2.3=0
- yail=0.0.2=py36_0
- yaml=0.1.7=0
- zarr=2.2.0=py_1
- zeromq=4.2.5=1
- zict=0.1.3=py_0
- zlib=1.2.11=0
- libgcc=4.8.5=1 |
Any thoughts on the usage of |
Just to be clear, currently NumPy uses
|
In conversation with @njsmith he raised a concern with the idea that Numpy might use a different malloc implementation than Python. Python might allocate some memory and then pass that to Numpy. Numpy would need to remember to call the right free based on if it allocated the data or someone else did. This seems error prone. |
I thought about this, but couldn't figure out when that situation would
occur. Do you have an example?
|
This might happen in Dask when we get bytes from a socket and use those to create a Numpy array. |
IMO then that buffer should become the |
Right, so when does the parent allocator "give up" control of its own memory, forcing NumPy to take on its management? |
After reminding myself of how this is supposed to work: in theory everyone is supposed to use @mrocklin A silly but possibly simple workaround you might want to look into: what if you occasionally kill the threads and restart them? (Spawning a new thread is very fast, like ~100 µs on my laptop.) |
FWIW people do see this issue outside the context of NumPy. Here's a nice article discussing this issue with Python and Celery. They also conclude memory fragmentation is the problem and compare using Also here's a bunch of links to similar sounding problems. Have skimmed a few, but not all. |
Are there strategies for avoiding this leak? I'm using dask gateway, and find the scheduler's leak seems to make the scheduler ineffective after a few GB of leak (I've raised my container resources to accommodate the growth). My graph is only ~70k tasks, and I want to process a few hundred of them. My strategy is to launch a new cluster for every 4-5 graphs, which seems to be the max I can push to the scheduler. Are there certain configurations that tickle this problem? I've also tried the runtime env var fixes:
It's a simple graph that reads in a volume, transposes/rechunks (to output cloud-volume format) and calculates some statistics.
|
|
@chrisroat IIRC, I don't think this issue could happen on the scheduler, since Dask's scheduler is single-threaded and the issues here are only observed with multiple threads (as can happen on Dask workers or the threaded scheduler). edit: And looking at your workflow a bit, you might be interested in https://github.com/pangeo-data/rechunker once it's ready for use. @kuraga thanks for that link. |
Thanks for the clarification, and the pointer to the rechunker. My chunks are actually written in cloud-volume (which is then downsampled by an igneous cluster). The zarr output was just for the example. I do not understand why I see so much memory growth, which is present even after all futures released and the client reset. |
Technically, the code snippet above doesn't prove memory leaks because usage doesn't increase (nearly) monotonic. import psutil
import sys
import dask
import dask.array as da
from distributed.utils import format_bytes
def main():
proc = psutil.Process()
print(format_bytes(proc.memory_info().rss))
# 54MB ubuntu, numpy 1.16.4/1.19.4
# 48MB mac os, numpy 1.19.4
x = da.ones((2e3, 2e4), chunks=(2e3, 100))
y = x.rechunk((100, 2e3))
z = y.rechunk((2e3, 100))
for i in range(10):
z.sum().compute()
print(format_bytes(proc.memory_info().rss), i)
# 264-319MB ubuntu, numpy 1.16.4/1.19.4 (54MB with single-threaded)
# ca. +60MB per thread
# 82-209MB mac os, numpy 1.19.4 (120MB with single-threaded)
if __name__=="__main__":
num_workers = None if len(sys.argv) <= 1 else int(sys.argv[1])
with dask.config.set(scheduler='threads', num_workers=num_workers):
main()
main()
try:
import mkl
print("pre", mkl.mem_stat())
mkl.free_buffers()
print("post", mkl.mem_stat())
except ImportError:
print("If mkl is used for numpy, you might see drop in final memory "
"use after `conda install mkl-service`")
proc = psutil.Process()
print(format_bytes(proc.memory_info().rss), "FIN") Blas libs do their own memory pooling. Some of that in thread local storage. |
Just thought I'd mention that I've been observing apparent memory leaks when using dask array on a kubernetes cluster, several GB was accumulating in the main process where the client and scheduler are running despite no large results being computed into memory. Memory usage remained after closing client and cluster. Clearing scheduler logs freed a little memory but did not resolve the main leak. I found that setting MALLOC_MMAP_THRESHOLD_=16384 substantially improves this, i.e., now I get ~200MB where I was getting ~2GB leaking. |
Forgive a naiive question here, this is not at all my wheel house, but does reshuffling/rechunking/unify_chunks etc. exacerbate these issues? Reshuffling chunks causes a whole heap of new memory allocations and perhaps importantly, perhaps partially emptying some blocks of memory? Is this the cause of the interleaving of memory described by @stuartarchibald? Is there any thing that can be done at the python level within dask to alleviate these issues. i.e. a memory defragment step that can be added to the rechunking operations? Does creating a deep copy of each chunk and deleting the old copy work? or something else similarly crude like that? Thanks for any patient answers. |
This seems to be a long standing issue. Is anyone working on a clean fix addressing root cause? Or just looking for workarounds for now? |
@chakpak I believe the tl;dr on this is that it's neither Dask's nor NumPy's problem, and not something we can safely resolve here, but instead an issue with the underlying C-level memory allocator and its heuristics for when to release memory back to the OS. Therefore, I'd suggest either adjusting the settings for the allocator if you're on linux: $ MALLOC_TRIM_THRESHOLD_=0 python my_script.py ... Or trying a different allocator if you're on macOS: $ brew install jemalloc
$ DYLD_INSERT_LIBRARIES=$(brew --prefix jemalloc)/lib/libjemalloc.dylib python my_script.py ... Read this for more information: https://distributed.dask.org/en/latest/worker.html#memory-not-released-back-to-the-os But a critical thing to note with this issue is that NumPy/Python/Dask isn't actually using too much memory. There's no leak. It's just a bookkeeping issue. The Python process is hanging onto a bunch of memory it's not actually using at the moment. But if it needs more memory again, it should re-use that memory without asking the OS for more. The alternative (when you set If multiple important processes on the machine are all sharing memory, then having dask hog it is not ideal. However, if dask is the only major thing running (say, on a VM in AWS), then it may not actually matter that this unused memory doesn't get released back to the OS, because nothing else needs it. |
@gjoseph92 this was a very good explanation. Thanks for following up. This makes sense and we will experiment further. |
We have been suffering from the same issue, only in our case, we are also using We are also using pandas and numpy in our computations so those might be related. We were able to lessen the effect of the issue by running all the functions in subprocess, like this def run_in_subprocess(it, func):
def return_in_queue(queue, func, it):
queue.put(func(it))
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=return_in_queue, args=(queue, func, it))
process.start()
ret = queue.get()
process.join()
return ret and then calling it as follows result = dask.bag.from_sequence(it, partition_size=1).map(run_in_subprocess, func)
ret = result.compute() in which To my understanding (and please do correct me if I'm mistaken), by running the code in a subprocess, each worker only has the input data, given function and the return value from the subprocess in their memory as everything else is handled in a forked subprocess. On top of that I've added all suggested mitigation methods found in different issues related to the "leak" including:
Even then, after the run, and even after deleting the I might also add that the input size also matters. Running the same with 10% of the input data resulted in smaller memory usage after the runs (somewhere around 2Gb of extra, IIRC), which seems to indicate that the used memory cumulates. I also run the (modified version suitable for Used software versions:
|
This example leaks around 500MB of memory on my machine when using the threaded scheduler, and almost no memory when using the single-threaded scheduler:
Notebook
This doesn't happen when I run it with the single-threaded scheduler.
Calling
gc.collect()
doesn't help. Allocating a new large numpy array afterwards also doesn't take up the leaked memory, the number just climbs. Looking at the objects that Python knows about shows that there isn't much around:The local schedulers don't have any persistent state. My next step is to reproduce with the standard concurrent.futures module, but I thought I'd put this up early in case people have suggestions.
cc @shoyer @pitrou @njsmith
The text was updated successfully, but these errors were encountered: