Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add return_generator functionality #588

Closed
wants to merge 150 commits into from

Conversation

fcharras
Copy link
Contributor

@fcharras fcharras commented Jan 18, 2018

This PR is derived from #587, it keeps only the part that enable __call__ to return a generator. Some additional notes:

  • the keyword is now named return_generator to match master...lesteve:parallel-return-generator better
  • because of how the __call__ is now defined in Parallel (transforming the output generator as a list if self.return_generator = False), there is no need for many new unittests, the previous unittests already test the behavior of the generator.
  • for now the backend is terminated only when the generator is exhausted, or if the generator is deleted. It's probably possible to have the backend terminate when all the tasks are finished even if the generator is not exhausted yet, is that desirable ?
  • there seems to be a major difference with the previous implementation of @lesteve : with the current PR this unittest (only the second assert) cannot pass. Indeed this unittest suppose that if the user does not iterate on the result, it will freeze the flow of computations. This PR takes another aim: if the user does not iterate on the result, all the tasks will keep computing and stack in memory until they are iterated over. It means that a gain in memory usage is possible only if the user iterates fast enough on the results.
  • I think the aim in this PR is better, because if the user wants to freeze computations between batch of tasks that are not yet iterated over, he can simply call sequentially the same Parallel instance, without the need for a new api.
  • we can imagine a worst case scenario where the first job dispatched takes 1 hour, and the following jobs are completed in seconds. In this setup, outputting a generator or a list is equivalent performance-wise.
  • This can be avoided by letting the user the possibility to read results asynchronously, when the user is not concerned by the order in which the results are read (with respect to the order the tasks were submitted with). Then, in the previous worst case, the user could access immediately the tasks that finished in seconds without waiting for the first one to complete. This was the second part of 2 new parameters: return a generator / return results asynchronously / add helper function for callbacks #587.

@lesteve
Copy link
Member

lesteve commented Jan 18, 2018

Thanks a lot for splitting up the PR and all the details that all seem very sensible quickly looking at them. I need to think a little bit more which approach is best. Maybe they are not that different in practice, since most people probably consume completely the result iterator right after the Parallel.__call__. IIRC I think I was coming at it from a memory control point of view.

Just curious, is there a way you can adapt the memory usage tests so they pass, e.g. by making the task longer?

I'll try to look at it soonish but it may not happen before next week.

@fcharras
Copy link
Contributor Author

fcharras commented Jan 18, 2018

All the test passed on my local install, but the CI still also show some timeouts on other backends.

Let's see if the memory test pass with 0.05 additional seconds in the function. Also it seems that the memory
profiler is dependant of the behavior of the garbage collector:

Also, because of how the garbage collector works in Python the result might be different between platforms and even between runs.

We could force the garbage collection when iterating on the output, but then the read will be slow and the results might stack in the queue...

@codecov
Copy link

codecov bot commented Jan 18, 2018

Codecov Report

Base: 93.94% // Head: 93.82% // Decreases project coverage by -0.12% ⚠️

Coverage data is based on head (9bd8f13) compared to base (061d4ad).
Patch coverage: 94.25% of modified lines in pull request are covered.

❗ Current head 9bd8f13 differs from pull request most recent head 308e4db. Consider uploading reports for the commit 308e4db to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #588      +/-   ##
==========================================
- Coverage   93.94%   93.82%   -0.12%     
==========================================
  Files          52       52              
  Lines        7328     7628     +300     
==========================================
+ Hits         6884     7157     +273     
- Misses        444      471      +27     
Impacted Files Coverage Δ
joblib/test/test_memmapping.py 99.24% <ø> (ø)
joblib/test/test_testing.py 100.00% <ø> (ø)
joblib/test/common.py 74.60% <40.00%> (-2.99%) ⬇️
joblib/testing.py 91.83% <80.00%> (-3.41%) ⬇️
joblib/_memmapping_reducer.py 94.90% <85.00%> (-0.98%) ⬇️
joblib/parallel.py 95.09% <94.66%> (-0.95%) ⬇️
joblib/_parallel_backends.py 92.01% <96.15%> (-1.01%) ⬇️
joblib/test/test_parallel.py 96.96% <97.87%> (-0.09%) ⬇️
joblib/_dask.py 92.68% <100.00%> (ø)
joblib/pool.py 89.68% <100.00%> (+1.87%) ⬆️
... and 8 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

@fcharras
Copy link
Contributor Author

fcharras commented Jan 18, 2018

Trying the memory test with 0.1 seconds, if it doesn't pass I'll just remove it. I think the role of garbage collection is really hard to account for, did you manage to make this test pass on travis on your previous work ?

Also, I rebased to current master so that codecov becomes happier.

@fcharras
Copy link
Contributor Author

fcharras commented Jan 19, 2018

The last fail is different, it happens during the first next(result).
https://travis-ci.org/joblib/joblib/jobs/330553351#L2306
Why can we see in this traceback that joblib's pool.py is used (and in a deadlock) when backend = 'loky' ?

@fcharras
Copy link
Contributor Author

fcharras commented Jan 19, 2018

Apparently it was Travis killing our processes because of the result = np.zeros(int(1e7)) was too much. It passes after setting it to result = np.ones(int(5*1e5), dtype=bool) which is still enough to trigger the deadlock bug that we've seen before on my machine.

edit: it's all green now, except some pep8, and I've definitively removed the memory profiler tests.

joblib/parallel.py Outdated Show resolved Hide resolved
joblib/parallel.py Outdated Show resolved Hide resolved
@GaelVaroquaux
Copy link
Member

Thanks a lot for this work. This feature is a very much desired one, and it's really helpful that you are working on it.

I did a bunch of inline comments, and you have a failing PEP8 test. Aside from this, one thing that is needed is a paragraph in the documentation on this feature.

Also, with regards to the blocking on non-blocking mode, I think that both have their value. Typically, when pre_dispatch is not "all", we probably want blocking, because this is a setting where joblib is trying to be clever in its memory usage. If we've done the job right, the pre_dispatch mechanism should make it certain that the workers never starve. Was there a particular reason that your removed this part of the code?
In the long term, I think that what could be possible is that we make it that parallel is always a generator (it's trivial to call "list" on it). In this case, it's important that it doesn't lead to too much memory overhead... Hum, thinking out load, this may make it hard for a backend like dask...

Finally, we should probably use it "in anger", in other words in challenging really life scenarios, to make sure that it is robust.

@fcharras
Copy link
Contributor Author

fcharras commented Jan 19, 2018

Thank you for the comments. Indeed there is some cleaning needed.

Also, with regards to the blocking on non-blocking mode, I think that both have their value. Typically, when pre_dispatch is not "all", we probably want blocking, because this is a setting where joblib is trying to be clever in its memory usage. If we've done the job right, the pre_dispatch mechanism should make it certain that the workers never starve. Was there a particular reason that your removed this part of the code?

I started from scratch because I didn't have enough knowledge on joblib to understand the previous work when I began, and I happened to do differently because the non-blocking mode is the most straightforward. It's less clear to me how the blocking mode should be defined and how it interacts with pre-dispatch and I'm not sure we refer to the same ideas. To clarify, here's what I understand:

  • blocking mode: if the user does not iterate, stop dispatching tasks as long as there are at least X results available in the output queue. If the user iterates slowly, processes will starve. It requires keeping track of the number of available results in the output queue and make the callback thread wait if this value is above X. The case where pre_dispatch > X is a little special.
  • non-blocking mode (current): the tasks are being normally dispatched and returned as it would be in current master. Possibly all results will stacks in the output queue if the user does not iterate on it before it's over. It's basically changing self._output.extend(result) to for res in result yield res.

In the long term, I think that what could be possible is that we make it that parallel is always a generator (it's trivial to call "list" on it). In this case, it's important that it doesn't lead to too much memory overhead... Hum, thinking out load, this may make it hard for a backend like dask...

I also think it could always return a generator. I don't see why there would be a higher memory overhead ?

@GaelVaroquaux
Copy link
Member

GaelVaroquaux commented Jan 19, 2018 via email

@GaelVaroquaux
Copy link
Member

GaelVaroquaux commented Jan 19, 2018 via email

@lesteve
Copy link
Member

lesteve commented Jan 19, 2018

Finally, we should probably use it "in anger", in other words in challenging really life scenarios, to make sure that it is robust.

@GaelVaroquaux @ogrisel any suggestions on real-life scenarios that would stress test this PR would be great.

@fcharras
Copy link
Contributor Author

Some thoughts about blocking / non blocking: this is a naïve draft API:

...
    def __init__(self, n_jobs=1, backend=None, verbose=0, timeout=None,
                 pre_dispatch='2 * n_jobs', batch_size='auto',
                 temp_folder=None, max_nbytes='1M', mmap_mode='r',
                 return_generator=False, lazy_dispatch=False,
                 output_queue_size='pre_dispatch'):

    '''
    lazy_dispatch: bool
        if True, will not dispatch new jobs if the generator
        has more than output_queue_size results ready to output.
    output_queue_size: int or str
        Max size of the output queue. If more than output_queue_size
        results are available and lazy_dispatch = True, computation will
        freeze waiting for the user to iterate on results.
    '''
...

It might be interesting if the user really wants to control the memory usage of the output while optimizing the CPU usage.

However it feels complicated. Especially, pre_dispatchand batch_size would be overriden during computation, to make sure that the output queue does not contain more than output_queue_size results at a time. I think it's preferableto be able to manage this by hand in a more transparent/flexible fashion.

Maybe it would be good to enable instead queuing new tasks on the fly, either by calling __call__ again on another iterable, or with a method queue_new_tasks. Then the user could control the flow of tasks as he wishes and the inner mechanism would be simpler (just process all the available input tasks). It would extend the current possibilities offered by the context manager. I'd find it more convenient if Parallel behaves more like a flexible queue and it sounds doable.

@fcharras
Copy link
Contributor Author

fcharras commented Jan 22, 2018

I've disabled on windows the patch for the multiprocessing backend that solved the deadlock, and now it's green (but it doesn't look very stable on travis).

However it would be safer to try the deadlock test with 1e7 and more, both on windows and linux, on machines that can handle pickling big objects (whereas travis kill the processes if we push too much).

@fcharras fcharras force-pushed the return_generator branch 2 times, most recently from 90332ae to 94516a9 Compare January 22, 2018 19:07
@GaelVaroquaux
Copy link
Member

@fcharras : I talked to @lesteve and he convinced me that I am wrong in my logic: predispatch is about consuming the iterator, but the results list still gets filled (I should know, I wrote it :D ). Hence, I believe that your PR is not a decrease is performance in functionality in any way. I'll think about it, but it seems like an overall benefit.

With regards of an example to stress test this, computing a Welch power spectrum density is a typical case where it should be beneficial. Something that would be awesome is if you could prepare such a script. Indeed, we are going to have an example gallery very soon: #594 and such an example would be a good real-case usage.

I think that your PR is mergeable. @ogrisel : do you want to have a look.

@GaelVaroquaux
Copy link
Member

Actually, forget this whole idea of Welch. I was thinking about it when thinking about the opportunity to reduce memory usage when the return of parallel is actually a reducer. But it should be possible to do something much simpler when thinking of time. Something like:

def func(i):
   time.sleep(.1)
   return i

result = 0
for j in Parallel(n_jobs=2)(delayed(func)(i) for i in range(10):
    result += j
    sleep(.1)  # Here some expansive calculation

@fcharras
Copy link
Contributor Author

fcharras commented Jan 25, 2018

For the example, one could think of a media player, the processes download some files to the disk in parallel, and the main thread load and read them.

The processes could also directly return the loaded files (to challenge the deadlock with the pickler).

My personal use case is about bayesian parameter search but also needs the possibility to get results asynchronously, and to add new tasks on the fly (working in #587).

@ogrisel
Copy link
Contributor

ogrisel commented Feb 7, 2018

A realistic use case would be to do parallel CPU jpeg files decoding + random data transformation (random small rotation, random approximately centered crop, horizontal flip, random erasuring ...) then packing the result with other transformed images into a numpy array as a minibatch to feed a convolutional neural network training loop for instance.

@fcharras
Copy link
Contributor Author

fcharras commented Feb 8, 2018

I don't know how the two tests fail on travis, it does not consistently fail and I can't reproduce it locally. Also, 7 out of 8 pipeline passed. Could it be a missbehavior on travis ?

@ogrisel
Copy link
Contributor

ogrisel commented Feb 8, 2018

You might want to try to reproduce the deadlock locally using taskset -c 0 pytest ... to force all the workers to run on CPU 0 which I think can happen on travis workers.

@fcharras
Copy link
Contributor Author

fcharras commented Feb 16, 2023

So we had stabilized non-pypy pipelines after dropping return_generator support when backend is set to the (non-default) multiprocessing backend. This is not planned to be implemented later at all as it seems that python native multiprocessing library has a termination logic that we can't get compatible with return_generator.

Despite this change the pypy pipeline kept showing unstable tests but we want to have compatibility of return_generator with pypy. The last batch of commit should address the stability issues on pypy. The main issue that we have noticed is that the code that is triggered during garbage collection of the generator sometimes run in a random existing thread rather than in the main thread. We don't know if this is a bug or a feature in pypy. Still the latest changes address those issues, basically by adding a lock and ensuring that the thread that garbage collect will not hang.

Providing the azure pipeline are now stable, this would be the remaining TODO list to finish this PR and merge:

  • re-implement print_progress when n_jobs = 1
  • cleanup implementation for shutting down threads in pypy
  • cleanup implementation of LokyLock
  • review, format and document

edit: issues in pypy with thread bomb mitigation unit tests are also on main, not related to the pr

# If you install python3.9, you get the pypy3.9 implementation.
# Pinning to the precise 3.9.15 version prevent this.
# Life of a CI is so brittle....
PYTHON_VERSION: "3.9.15"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this has not been fixed upstream yet, merge on master before #588

@@ -0,0 +1,102 @@
"""Benchmark n_jobs=1 on high number of fast tasks
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this to master independently of #588

@@ -1,9 +1,11 @@
import os
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's considering merging this on master independently of #588

@@ -30,6 +30,8 @@ if [[ "$PYTHON_VERSION" == "pypy3" ]]; then
create_new_pypy3_env
else
create_new_conda_env
# Make sure we did not installed pypy by mistake
test -z "$(python --version | grep -i pypy)"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider merging this independently of #588

@@ -20,7 +20,7 @@ if [[ "$SKIP_TESTS" != "true" ]]; then
export PYTEST_ADDOPTS="--cov=joblib --cov-append"
fi

pytest joblib -vl --timeout=120 --junitxml="${JUNITXML}"
pytest joblib -vsl --timeout=240 --junitxml="${JUNITXML}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider merging this on master independently of #588

warninfo = [w.message for w in warninfo]
assert len(warninfo) == 1
assert len(warninfo) == 1, debug_msg
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider independent merge

for x, y in zip((0, 1), (1, 0))])
with raises(WorkerInterrupt):
[delayed(division)(x, y) for x, y in zip((0, 1), (1, 0))]
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, try to minimize amout of diff related to formating in this PR

'time.sleep(1.1)',
# TODO: changes in check_subprocess_call impacted this test
# with no particular reason, fix that.
'time.sleep(5)',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider independant merge

@@ -46,14 +46,20 @@ def check_subprocess_call(cmd, timeout=5, stdout_regex=None,
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider independant merge

@@ -60,13 +66,35 @@

DEFAULT_THREAD_BACKEND = 'threading'

# Backend hints and constraints to help choose the backend
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for the TASK_* variables definition, the diff here is somewhat unrelated to the PR. Maybe address independently of #588 ?

@@ -104,8 +130,7 @@ def get_active_backend(prefer=None, require=None, verbose=0):
# Try to use the backend set by the user with the context manager.
backend, n_jobs = backend_and_jobs
nesting_level = backend.nesting_level
supports_sharedmem = getattr(backend, 'supports_sharedmem', False)
if require == 'sharedmem' and not supports_sharedmem:
if require == 'sharedmem' and not backend.supports_sharedmem:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change also not related to pr ? 🤔

if require == 'sharedmem' and not backend.supports_sharedmem:
raise ValueError("Backend %s does not support shared memory"
% backend)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refacto also not related ?

@fcharras
Copy link
Contributor Author

fcharras commented Feb 17, 2023

Pipelines are looking good. (occasional fail is not related: also exists on main. And seems benign anyway).

I've opened #1393 that strips #588 of all diff that is not directly related. Will work on cleaning #1393 without adding any more unrelated changes or refacto.

@fcharras
Copy link
Contributor Author

Merged via #1393

@fcharras fcharras closed this Apr 18, 2023
@fcharras fcharras deleted the return_generator branch April 18, 2023 21:10
@tomMoral
Copy link
Contributor

So much discussion on this one :)
We have been a long way! congrats @fcharras for all the hard work! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet