Skip to content

Commit

Permalink
Shutdown the loop's default thread executor at the end of a test (#504)
Browse files Browse the repository at this point in the history
The same approach is adopted in IsolatedAsyncioTestCase. It prevents
leaking asyncio threads as loop.close() doesn't join them.

BaseEventLoop.shutdown_default_executor was introduced in Python 3.9.
For earlier versions, this commit introduces an adapted version of
the method: _shutdown_default_executor. However, it doesn't work
with uvloop < 0.15.0 because, uvloop doesn't expose
_default_executor attribute. It's rather a minor issue as
uvloop 0.15.0 is 2 years old - in such case this commit is a no-op.

Fixes #503.
  • Loading branch information
marcinbarczynski committed Nov 26, 2022
1 parent 1b57189 commit ff5c811
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
43 changes: 42 additions & 1 deletion src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import math
import socket
import sys
import threading
from asyncio import (
AbstractEventLoop,
CancelledError,
Expand Down Expand Up @@ -98,7 +99,6 @@ def get_coro(task: asyncio.Task) -> Generator | Awaitable[Any]:
# Check whether there is native support for task names in asyncio (3.8+)
_native_task_names = hasattr(asyncio.Task, "get_name")


_root_task: RunVar[asyncio.Task | None] = RunVar("_root_task")


Expand Down Expand Up @@ -1675,6 +1675,39 @@ def _create_task_info(task: asyncio.Task) -> TaskInfo:
return TaskInfo(id(task), parent_id, name, get_coro(task))


async def _shutdown_default_executor(loop: asyncio.BaseEventLoop) -> None:
"""Schedule the shutdown of the default executor.
BaseEventLoop.shutdown_default_executor was introduced in Python 3.9.
This function is an adapted version of the method from Python 3.11.
It's used in TestRunner.close only if python < 3.9.
"""

def _do_shutdown(
loop_: asyncio.BaseEventLoop, future: asyncio.futures.Future
) -> None:
try:
loop_._default_executor.shutdown(wait=True) # type: ignore[attr-defined]
loop_.call_soon_threadsafe(future.set_result, None)
except Exception as ex:
loop_.call_soon_threadsafe(future.set_exception, ex)

if loop._default_executor is None: # type: ignore[attr-defined]
return
future = loop.create_future()
thread = threading.Thread(
target=_do_shutdown,
args=(
loop,
future,
),
)
thread.start()
try:
await future
finally:
thread.join()


class TestRunner(abc.TestRunner):
_send_stream: MemoryObjectSendStream[tuple[Awaitable[Any], asyncio.Future[Any]]]

Expand Down Expand Up @@ -1769,6 +1802,14 @@ def close(self) -> None:

self._cancel_all_tasks()
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
if hasattr(self._loop, "shutdown_default_executor"):
# asyncio in Python >= 3.9 or uvloop >= 0.15.0
self._loop.run_until_complete(self._loop.shutdown_default_executor())
elif isinstance(self._loop, asyncio.BaseEventLoop) and hasattr(
self._loop, "_default_executor"
):
# asyncio in Python < 3.9
self._loop.run_until_complete(_shutdown_default_executor(self._loop))
finally:
asyncio.set_event_loop(None)
self._loop.close()
Expand Down
17 changes: 16 additions & 1 deletion tests/test_pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def test_asyncio(testdir: Pytester, caplog: LogCaptureFixture) -> None:
"""
import asyncio
import pytest
import threading
@pytest.fixture(scope='class')
Expand All @@ -115,6 +116,16 @@ def callback():
yield None
asyncio.get_running_loop().call_soon(callback)
await asyncio.sleep(0)
@pytest.fixture
def no_thread_leaks_fixture():
# this has to be non-async fixture so that it wraps up
# after the event loop gets closed
threads_before = threading.enumerate()
yield
threads_after = threading.enumerate()
leaked_threads = set(threads_after) - set(threads_before)
assert not leaked_threads
"""
)

Expand Down Expand Up @@ -163,11 +174,15 @@ async def test_exception_handler_no_exception():
{"message": "bogus error"}
)
await asyncio.sleep(0.1)
async def test_shutdown_default_executor(no_thread_leaks_fixture):
# Test for github #503
asyncio.get_event_loop().run_in_executor(None, lambda: 1)
"""
)

result = testdir.runpytest(*pytest_args)
result.assert_outcomes(passed=3, failed=1, errors=2)
result.assert_outcomes(passed=4, failed=1, errors=2)
assert len(caplog.messages) == 1
assert caplog.messages[0] == "bogus error"

Expand Down

0 comments on commit ff5c811

Please sign in to comment.