Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiprocessing.Pool shouldn't hang forever if a worker process dies unexpectedly #66587

Open
danoreilly mannequin opened this issue Sep 11, 2014 · 18 comments
Open

multiprocessing.Pool shouldn't hang forever if a worker process dies unexpectedly #66587

danoreilly mannequin opened this issue Sep 11, 2014 · 18 comments
Labels
3.8 only security fixes stdlib Python modules in the Lib dir topic-multiprocessing type-feature A feature request or enhancement

Comments

@danoreilly
Copy link
Mannequin

danoreilly mannequin commented Sep 11, 2014

BPO 22393
Nosy @pitrou, @applio, @oesteban, @rkm, @kormang, @shnizzedy
PRs
  • bpo-22393: Fix multiprocessing.Pool hangs if a worker process dies unexpectedly #10441
  • bpo-22393: Fix deadlock from pool worker death without communication #16103
  • Files
  • multiproc_broken_pool.diff: Abort running task and close down a pool if a worker is unexpectedly terminated.
  • Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

    Show more details

    GitHub fields:

    assignee = None
    closed_at = None
    created_at = <Date 2014-09-11.22:33:06.739>
    labels = ['3.8', 'type-feature', 'library']
    title = "multiprocessing.Pool shouldn't hang forever if a worker process dies unexpectedly"
    updated_at = <Date 2021-10-18.03:11:08.813>
    user = 'https://bugs.python.org/danoreilly'

    bugs.python.org fields:

    activity = <Date 2021-10-18.03:11:08.813>
    actor = 'myles.steinhauser'
    assignee = 'none'
    closed = False
    closed_date = None
    closer = None
    components = ['Library (Lib)']
    creation = <Date 2014-09-11.22:33:06.739>
    creator = 'dan.oreilly'
    dependencies = []
    files = ['36603']
    hgrepos = []
    issue_num = 22393
    keywords = ['patch']
    message_count = 11.0
    messages = ['226805', '294968', '315684', '315687', '329381', '329383', '329770', '333895', '351754', '390775', '390780']
    nosy_count = 14.0
    nosy_names = ['pitrou', 'jnoller', 'cvrebert', 'sbt', 'dan.oreilly', 'davin', 'brianboonstra', 'cjmarkie', 'Francis Bolduc', 'oesteban', 'rkm', 'kormang', 'shnizzedy', 'myles.steinhauser']
    pr_nums = ['10441', '16103']
    priority = 'normal'
    resolution = None
    stage = 'patch review'
    status = 'open'
    superseder = None
    type = 'enhancement'
    url = 'https://bugs.python.org/issue22393'
    versions = ['Python 3.8']

    @danoreilly
    Copy link
    Mannequin Author

    danoreilly mannequin commented Sep 11, 2014

    This is essentially a dupe of bpo-9205, but it was suggested I open a new issue, since that one ended up being used to fix this same problem in concurrent.futures, and was subsequently closed.

    Right now, should a worker process in a Pool unexpectedly get terminated while a blocking Pool method is running (e.g. apply, map), the method will hang forever. This isn't a normal occurrence, but it does occasionally happen (either because someone sends a SIGTERM, or because of a bug in the interpreter or a C-extension). It would be preferable for multiprocessing to follow the lead of concurrent.futures.ProcessPoolExecutor when this happens, and abort all running tasks and close down the Pool.

    Attached is a patch that implements this behavior. Should a process in a Pool unexpectedly exit (meaning, *not* because of hitting the maxtasksperchild limit), the Pool will be closed/terminated and all cached/running tasks will raise a BrokenProcessPool exception. These changes also prevent the Pool from going into a bad state if the "initializer" function raises an exception (previously, the pool would end up infinitely starting new processes, which would immediately die because of the exception).

    One concern with the patch: The way timings are altered with these changes, the Pool seems to be particularly susceptible to bpo-6721 in certain cases. If processes in the Pool are being restarted due to maxtasksperchild just as the worker is being closed or joined, there is a chance the worker will be forked while some of the debug logging inside of Pool is running (and holding locks on either sys.stdout or sys.stderr). When this happens, the worker deadlocks on startup, which will hang the whole program. I believe the current implementation is susceptible to this as well, but I could reproduce it much more consistently with this patch. I think its rare enough in practice that it shouldn't prevent the patch from being accepted, but thought I should point it out.

    (I do think bpo-6721 should be addressed, or at the very least internal I/O locks should always reset after forking.)

    @danoreilly danoreilly mannequin added stdlib Python modules in the Lib dir type-feature A feature request or enhancement labels Sep 11, 2014
    @FrancisBolduc
    Copy link
    Mannequin

    FrancisBolduc mannequin commented Jun 1, 2017

    This problem also happens simply by calling sys.exit from one of the child processes.

    The following script exhibits the problem:

    import multiprocessing
    import sys
    def test(value):
        if value:
            sys.exit(123)
    if __name__ == '__main__':
        pool = multiprocessing.Pool(4)
        cases = [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0]
        pool.map(test, cases)

    @oesteban
    Copy link
    Mannequin

    oesteban mannequin commented Apr 24, 2018

    We use multiprocessing to parallelize many tasks that run either python code or call subprocess.run that are memory hungry.

    At times the OOM Killer kicks in. When one of the workers is killed, the queue never returns an error for the task being run by the worker.

    Are there any plans to merge the patch proposed in this issue?

    @pitrou
    Copy link
    Member

    pitrou commented Apr 24, 2018

    Oscar, the patch posted here needs updating for the latest git master.

    If you want to avoid this issue, you can also use concurrent.futures where the issue is fixed.

    @pitrou pitrou added the 3.8 only security fixes label Apr 24, 2018
    @oesteban
    Copy link
    Mannequin

    oesteban mannequin commented Nov 6, 2018

    Hi Antoine,

    I may take a stab at it. Before I start, should I branch from master or from 3.7.1 (as 3.7 is still accepting bugfixes).

    Best,
    Oscar

    @pitrou
    Copy link
    Member

    pitrou commented Nov 6, 2018

    You should start from master. Bugfixes can backported afterwards if appropriate. Thanks!

    @oesteban
    Copy link
    Mannequin

    oesteban mannequin commented Nov 12, 2018

    I tried to reuse as much as I could from the patch, but it didn't solve the issue at first.

    I have changed the responsibility of identifying and prescribing a solution when a worker got killed. In the proposed patch, the thread handling results (i.e. tasks queued by one worker as done) was responsible. In the PR, the responsibility is reassigned to the thread handling workers (since, basically, one or more workers suddenly die).

    The patch defined a new BROKEN state that was assigned to the results handler thread. I transferred this behavior to the worker handler thread. But, I'm guessing that the BROKEN state should be assigned to the Pool object instead, to be fully semantic. Although that would require passing the reference to the object around and complicate unnecessarily the implementation. Happy to reconsider though.

    I added three tests, one that was present with the patch, a variation of it adding some wait before killing the worker, and the one that Francis Bolduc posted here (https://bugs.python.org/issue22393#msg294968).

    Please let me know whether any conversation about this bug should take place in GitHub, with the PR instead of here.

    Thanks a lot for the guidance, Antoine.

    @cjmarkie
    Copy link
    Mannequin

    cjmarkie mannequin commented Jan 17, 2019

    Just a bump to note that the PR (10441) is ready for another round of review.

    @vstinner
    Copy link
    Member

    I just marked bpo-38084 as duplicate of this issue. I manually merged the nosy lists.

    @kormang
    Copy link
    Mannequin

    kormang mannequin commented Apr 11, 2021

    I've created bpo-43805. I think it would be better to have universal solution. And not specific ones, like in bpo-9205.

    Haven't checked the PRs, though.

    @kormang
    Copy link
    Mannequin

    kormang mannequin commented Apr 11, 2021

    Somewhat related bpo-43806 with asyncio.StreamReader

    @ezio-melotti ezio-melotti transferred this issue from another repository Apr 10, 2022
    @bergkvist
    Copy link
    Contributor

    bergkvist commented Jul 6, 2022

    I'm running into this as well - and I can't use concurrent.futures since it requires that inputs can be pickled. With multiprocessing.get_context('fork').Process(...) the process input memory is inherited by the child process, and thus don't need to be pickled.

    My use case:

    I'm writing Python C extensions to speed up processing of data that takes a long time to load into memory. To avoid reloading the data every time I change my processing code, I use a Jupyter notebook - where I can keep this data available in memory and work interactively with it.

    Now - another roadblock: Python C extensions can't be unloaded once they have been loaded by CPython (meaning I can't use autoreload cell magic). To work around this issue, and not have to restart the kernel every time I recompile the C extension - I can perform the import inside of a forked subprocess instead.

    Importantly, using a forked subprocess means that it will have access to the data without having to reload it, and that the C extension import won't propagate up to the parent - essentially allowing me to hot reload the c extension by paying a small price of the fork-syscall and rerunning the import on every invocation of the C extension.

    So all of this works great - except when the C code segfaults or otherwise exits unexpectedly, which inevitably happens every now and then when working iteratively on C-code - execution will just hang, waiting for a result until I trigger a KeyboardInterrupt. This makes rerunning tests/benchmarks inconvenient - and harder to debug.

    data = load_data_for_5_minutes()
    import multiprocessing as mp
    from functools import wraps
    import traceback
    import sys
    
    def forked(fn):
        @wraps(fn)
        def call(*args, **kwargs):
            ctx = mp.get_context('fork')
            q = ctx.Queue(1)
            def target():
                try:
                    q.put((fn(*args, **kwargs), None))
                except BaseException:
                    ex_type, ex_value, tb = sys.exc_info()
                    error = ex_type, ex_value, ''.join(traceback.format_tb(tb.tb_next))
                    q.put((None, error))
            ctx.Process(target=target).start()
            result, error = q.get()
            if error:
                ex_type, ex_value, trace = error
                raise ex_type(f'{ex_value}\n {trace}')
            return result
        return call
    
    @forked
    def aggregate_data(data):
        import mycextension
        return mycextension.aggregate(data)
    
    processed_data = aggregate_data(data)

    @bergkvist
    Copy link
    Contributor

    Actually, I managed to solve it for my particular use case a bit more quickly than I expected:

    def forked(fn):
        @wraps(fn)
        def call(*args, **kwargs):
            ctx = mp.get_context('fork')
            q = ctx.Queue(1)
            def target():
                try:
                    q.put((fn(*args, **kwargs), None))
                except BaseException:
                    ex_type, ex_value, tb = sys.exc_info()
                    error = ex_type, ex_value, ''.join(traceback.format_tb(tb.tb_next))
                    q.put((None, error))
            p = ctx.Process(target=target)    #
            p.start()                         #
            p.join()                          # The join succeeds regardless of whether the process exited normally
            if q.empty():                     #
                raise SystemExit(p.exitcode)  # If the queue is empty, it means the process did not exit normally
            result, error = q.get()
            if error:
                ex_type, ex_value, trace = error
                raise ex_type(f'{ex_value}\n {trace}')
            return result
        return call

    This solution sadly doesn't help with multiprocessing.Pool though, since this uses workers that live longer than a single task - and so join can't be used to wait for a single result.

    I'll leave my solution here in case someone finds it useful - although it doesn't actually solve this issue.

    @urumican
    Copy link

    What is the current status of this issue. Did we solve the parent processing hanging issue?

    @leycec
    Copy link

    leycec commented Jun 24, 2023

    Ditto. In short, should the community continue to prefer concurrent.futures.ProcessPoolExecutor to multiprocessing.Pool? The former is notoriously robust against unexpected process termination under Python ≥ 3.3, where BrokenProcessPool exceptions are gracefully raised.

    We'd all like to believe that this issue has been quietly resolved; it's been nearly a decade and multiprocessing.Pool is increasingly popular. But... this issue is still open, which concerns the justifiably paranoid QA engineer inside of us all. 😬

    @pitrou
    Copy link
    Member

    pitrou commented Jun 24, 2023

    To my knowledge, noone has been working actively on making mp.Pool more robust. If you need an executor that's robust against child process failure, then yes, just use concurrent.futures.

    @dobos
    Copy link

    dobos commented Jul 6, 2023

    I have a solution to this but I'm not really willing to go through the whole process to make it into a pull request. I'm happy to share it in a gist if someone's interested in writing the unit tests and developing it into a PR. Basically, any solution that can report error from a processing pool when a worker dies should account for the partially executed tasks, whether it's a multiprocessing.Pool or concurrent.futures.ProcessPoolExecutor otherwise there's no way of telling which item could not be processed. multiprocessing.Pool has Pool._handle_workers which can and do detect processes that quit unexpectedly. So, for the sake of simplicity, I created an outstanding jobs queue for each worker and pass it to the workers one by one. When a worker picks up a task it puts it right into the outstanding queue and leaves it there until the results are ready to be returned. If the worker succeeds, it takes the task from the outstanding queue, if it fails without a python exception (i.e. OOM or signal from outside or similar), the task will remain there. When Pool._handle_workers gets executed on its own thread in the main process, it checks for any outstanding tasks in the worker's queue and reports and error in outqueue the way the worker does it when it handles python exceptions. There still might be some corner cases that require special attention, like deadlocks.

    @dobos
    Copy link

    dobos commented Jul 6, 2023

    Here's my solution: dobos@281291a

    JanCBrammer added a commit to TUCAN-nest/SDF-pipeline that referenced this issue Feb 16, 2024
    Prior to this commit, `core.run` would wait for results indefinitely
    when a child process terminated with a non-zero exit code (e.g., segfault).
    This was due to the fact that `multiprocessing.Pool` does not raise
    an exception ( python/cpython#66587 ). This commit
    replaces `multiprocessing.Pool` with `concurrent.futures.ProcessPoolExecutor`.
    In contrast to the former, the latter raises an exception in case a
    child process terminates with a non-zero exit code.
    JanCBrammer added a commit to TUCAN-nest/SDF-pipeline that referenced this issue Feb 16, 2024
    Prior to this commit, `core.run` would wait for results indefinitely
    when a child process terminated with a non-zero exit code (e.g., segfault).
    This was due to the fact that `multiprocessing.Pool` does not raise
    an exception ( python/cpython#66587 ). This commit
    replaces `multiprocessing.Pool` with `concurrent.futures.ProcessPoolExecutor`.
    In contrast to the former, the latter raises an exception in case a
    child process terminates with a non-zero exit code.
    JanCBrammer added a commit to TUCAN-nest/SDF-pipeline that referenced this issue Feb 16, 2024
    Prior to this commit, `core.run` would wait for results indefinitely
    when a child process terminated with a non-zero exit code (e.g., segfault).
    This was due to the fact that `multiprocessing.Pool` does not raise
    an exception ( python/cpython#66587 ). This commit
    replaces `multiprocessing.Pool` with `concurrent.futures.ProcessPoolExecutor`.
    In contrast to the former, the latter raises an exception in case a
    child process terminates with a non-zero exit code.
    JanCBrammer added a commit to TUCAN-nest/SDF-pipeline that referenced this issue May 3, 2024
    Revert from `concurrent.futures.ProcessPoolExecutor` to `multiprocessing`.
    The former introduced a significant performance regression (about x2 slowdown).
    See https://stackoverflow.com/questions/18671528/processpoolexecutor-from-concurrent-futures-way-slower-than-multiprocessing-pool?rq=3.
    Unfortunately, `multiprocessing` cannot catch segfaults.
    Catching segfaults was the reason to replace `multiprocessing` with `ProcessPoolExecutor`.
    See python/cpython#66587.
    As a workaround, to avoid indefinite waiting for results in case of a segfault, we use a timeout.
    KelSolaar added a commit to colour-science/colour that referenced this issue Jul 28, 2024
    KelSolaar added a commit to colour-science/colour that referenced this issue Jul 28, 2024
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    3.8 only security fixes stdlib Python modules in the Lib dir topic-multiprocessing type-feature A feature request or enhancement
    Projects
    Status: No status
    Development

    No branches or pull requests

    6 participants