-
Notifications
You must be signed in to change notification settings - Fork 413
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add return_generator functionality #588
Conversation
Thanks a lot for splitting up the PR and all the details that all seem very sensible quickly looking at them. I need to think a little bit more which approach is best. Maybe they are not that different in practice, since most people probably consume completely the result iterator right after the Just curious, is there a way you can adapt the memory usage tests so they pass, e.g. by making the task longer? I'll try to look at it soonish but it may not happen before next week. |
All the test passed on my local install, but the CI still also show some timeouts on other backends. Let's see if the memory test pass with 0.05 additional seconds in the function. Also it seems that the memory We could force the garbage collection when iterating on the output, but then the read will be slow and the results might stack in the queue... |
Codecov ReportBase: 93.94% // Head: 93.82% // Decreases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## master #588 +/- ##
==========================================
- Coverage 93.94% 93.82% -0.12%
==========================================
Files 52 52
Lines 7328 7628 +300
==========================================
+ Hits 6884 7157 +273
- Misses 444 471 +27
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
28c68da
to
afd2c16
Compare
Trying the memory test with 0.1 seconds, if it doesn't pass I'll just remove it. I think the role of garbage collection is really hard to account for, did you manage to make this test pass on travis on your previous work ? Also, I rebased to current master so that codecov becomes happier. |
a2d5069
to
6837475
Compare
The last fail is different, it happens during the first |
a1763ea
to
6837475
Compare
Apparently it was Travis killing our processes because of the edit: it's all green now, except some pep8, and I've definitively removed the memory profiler tests. |
Thanks a lot for this work. This feature is a very much desired one, and it's really helpful that you are working on it. I did a bunch of inline comments, and you have a failing PEP8 test. Aside from this, one thing that is needed is a paragraph in the documentation on this feature. Also, with regards to the blocking on non-blocking mode, I think that both have their value. Typically, when pre_dispatch is not "all", we probably want blocking, because this is a setting where joblib is trying to be clever in its memory usage. If we've done the job right, the pre_dispatch mechanism should make it certain that the workers never starve. Was there a particular reason that your removed this part of the code? Finally, we should probably use it "in anger", in other words in challenging really life scenarios, to make sure that it is robust. |
Thank you for the comments. Indeed there is some cleaning needed.
I started from scratch because I didn't have enough knowledge on joblib to understand the previous work when I began, and I happened to do differently because the non-blocking mode is the most straightforward. It's less clear to me how the blocking mode should be defined and how it interacts with pre-dispatch and I'm not sure we refer to the same ideas. To clarify, here's what I understand:
I also think it could always return a generator. I don't see why there would be a higher memory overhead ? |
Let's keep only retrieve and merge the two.
That's what I was thinking, but I wasn't sure that I hadn't missed
something.
|
This example used by @lesteve used to lead to a deadlock with multiprocessing
backend. Numpy is not mandatory, any object that is big enough so that the
pickle takes some time should do.
OK, don't bother changing it. Just add a note that we use a large numpy
object to delay the pickler.
|
@GaelVaroquaux @ogrisel any suggestions on real-life scenarios that would stress test this PR would be great. |
Some thoughts about blocking / non blocking: this is a naïve draft API:
It might be interesting if the user really wants to control the memory usage of the output while optimizing the CPU usage. However it feels complicated. Especially, Maybe it would be good to enable instead queuing new tasks on the fly, either by calling |
I've disabled on windows the patch for the multiprocessing backend that solved the deadlock, and now it's green (but it doesn't look very stable on travis). However it would be safer to try the deadlock test with |
90332ae
to
94516a9
Compare
@fcharras : I talked to @lesteve and he convinced me that I am wrong in my logic: predispatch is about consuming the iterator, but the results list still gets filled (I should know, I wrote it :D ). Hence, I believe that your PR is not a decrease is performance in functionality in any way. I'll think about it, but it seems like an overall benefit. With regards of an example to stress test this, computing a Welch power spectrum density is a typical case where it should be beneficial. Something that would be awesome is if you could prepare such a script. Indeed, we are going to have an example gallery very soon: #594 and such an example would be a good real-case usage. I think that your PR is mergeable. @ogrisel : do you want to have a look. |
Actually, forget this whole idea of Welch. I was thinking about it when thinking about the opportunity to reduce memory usage when the return of parallel is actually a reducer. But it should be possible to do something much simpler when thinking of time. Something like: def func(i): time.sleep(.1) return i result = 0 for j in Parallel(n_jobs=2)(delayed(func)(i) for i in range(10): result += j sleep(.1) # Here some expansive calculation |
94516a9
to
e1e7467
Compare
For the example, one could think of a media player, the processes download some files to the disk in parallel, and the main thread load and read them. The processes could also directly return the loaded files (to challenge the deadlock with the pickler). My personal use case is about bayesian parameter search but also needs the possibility to get results asynchronously, and to add new tasks on the fly (working in #587). |
A realistic use case would be to do parallel CPU jpeg files decoding + random data transformation (random small rotation, random approximately centered crop, horizontal flip, random erasuring ...) then packing the result with other transformed images into a numpy array as a minibatch to feed a convolutional neural network training loop for instance. |
I don't know how the two tests fail on travis, it does not consistently fail and I can't reproduce it locally. Also, 7 out of 8 pipeline passed. Could it be a missbehavior on travis ? |
You might want to try to reproduce the deadlock locally using |
So we had stabilized non-pypy pipelines after dropping Despite this change the pypy pipeline kept showing unstable tests but we want to have compatibility of Providing the azure pipeline are now stable, this would be the remaining TODO list to finish this PR and merge:
edit: issues in pypy with thread bomb mitigation unit tests are also on main, not related to the pr |
# If you install python3.9, you get the pypy3.9 implementation. | ||
# Pinning to the precise 3.9.15 version prevent this. | ||
# Life of a CI is so brittle.... | ||
PYTHON_VERSION: "3.9.15" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this has not been fixed upstream yet, merge on master before #588
@@ -0,0 +1,102 @@ | |||
"""Benchmark n_jobs=1 on high number of fast tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add this to master independently of #588
@@ -1,9 +1,11 @@ | |||
import os |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's considering merging this on master independently of #588
@@ -30,6 +30,8 @@ if [[ "$PYTHON_VERSION" == "pypy3" ]]; then | |||
create_new_pypy3_env | |||
else | |||
create_new_conda_env | |||
# Make sure we did not installed pypy by mistake | |||
test -z "$(python --version | grep -i pypy)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider merging this independently of #588
@@ -20,7 +20,7 @@ if [[ "$SKIP_TESTS" != "true" ]]; then | |||
export PYTEST_ADDOPTS="--cov=joblib --cov-append" | |||
fi | |||
|
|||
pytest joblib -vl --timeout=120 --junitxml="${JUNITXML}" | |||
pytest joblib -vsl --timeout=240 --junitxml="${JUNITXML}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider merging this on master independently of #588
warninfo = [w.message for w in warninfo] | ||
assert len(warninfo) == 1 | ||
assert len(warninfo) == 1, debug_msg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider independent merge
for x, y in zip((0, 1), (1, 0))]) | ||
with raises(WorkerInterrupt): | ||
[delayed(division)(x, y) for x, y in zip((0, 1), (1, 0))] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, try to minimize amout of diff related to formating in this PR
'time.sleep(1.1)', | ||
# TODO: changes in check_subprocess_call impacted this test | ||
# with no particular reason, fix that. | ||
'time.sleep(5)', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider independant merge
@@ -46,14 +46,20 @@ def check_subprocess_call(cmd, timeout=5, stdout_regex=None, | |||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, | |||
stderr=subprocess.PIPE) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider independant merge
@@ -60,13 +66,35 @@ | |||
|
|||
DEFAULT_THREAD_BACKEND = 'threading' | |||
|
|||
# Backend hints and constraints to help choose the backend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except for the TASK_*
variables definition, the diff here is somewhat unrelated to the PR. Maybe address independently of #588 ?
@@ -104,8 +130,7 @@ def get_active_backend(prefer=None, require=None, verbose=0): | |||
# Try to use the backend set by the user with the context manager. | |||
backend, n_jobs = backend_and_jobs | |||
nesting_level = backend.nesting_level | |||
supports_sharedmem = getattr(backend, 'supports_sharedmem', False) | |||
if require == 'sharedmem' and not supports_sharedmem: | |||
if require == 'sharedmem' and not backend.supports_sharedmem: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change also not related to pr ? 🤔
if require == 'sharedmem' and not backend.supports_sharedmem: | ||
raise ValueError("Backend %s does not support shared memory" | ||
% backend) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refacto also not related ?
Merged via #1393 |
So much discussion on this one :) |
This PR is derived from #587, it keeps only the part that enable
__call__
to return a generator. Some additional notes:return_generator
to match master...lesteve:parallel-return-generator better__call__
is now defined inParallel
(transforming the output generator as a list ifself.return_generator = False
), there is no need for many new unittests, the previous unittests already test the behavior of the generator.Parallel
instance, without the need for a new api.