Skip to content

Commit

Permalink
Switched to a trampoline-style test runner in the pytest plugin (agro…
Browse files Browse the repository at this point in the history
…nholm#497)

Also documented how context variables propagate within the test suite, and how the pytest plugin compares to its competition.
  • Loading branch information
agronholm committed Nov 23, 2022
1 parent 891c0d6 commit 1b57189
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 104 deletions.
45 changes: 29 additions & 16 deletions docs/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,32 @@ The fixtures and tests are run by a "test runner", implemented separately for ea
The test runner keeps an event loop open during the request, making it possible for code in
fixtures to communicate with the code in the tests (and each other).

The test runner is created when the first matching async test or fixture is about to be run, and
shut down when that same fixture is being torn down or the test has finished running. As such,
if no async fixtures are used, a separate test runner is created for each test. Conversely, if
even one async fixture (scoped higher than ``function``) is shared across all tests, only one test
runner will be created during the test session.

For async generator based fixtures, the test runner spawns a task that handles both the setup and
teardown phases to enable context-sensitive code to work properly. A common example of this is
providing a task group as a fixture.

Since each test and fixture run in their own separate tasks, no changes to any context
variables will propagate out of them to tests or other fixtures. This is in line with
``pytest-asyncio``, but in contrast to ``pytest-trio`` where all fixtures and tests
`share the same context`_.

.. _share the same context: https://pytest-trio.readthedocs.io/en/stable/reference.html#handling-of-contextvars
The test runner is created when the first matching async test or fixture is about to be
run, and shut down when that same fixture is being torn down or the test has finished
running. As such, if no higher-order (scoped ``class`` or higher) async fixtures are
used, a separate test runner is created for each matching test. Conversely, if even one
async fixture, scoped higher than ``function``, is shared across all tests, only one
test runner will be created during the test session.

Context variable propagation
++++++++++++++++++++++++++++

The asynchronous test runner runs all async fixtures and tests in the same task, so
context variables set in async fixtures or tests, within an async test runner, will
affect other async fixtures and tests within the same runner. However, these context
variables are **not** carried over to synchronous tests and fixtures, or to other async
test runners.

Comparison with other async test runners
++++++++++++++++++++++++++++++++++++++++

The ``pytest-asyncio`` library only works with asyncio code. Like the AnyIO pytest
plugin, it can be made to support higher order fixtures (by specifying a higher order
``event_loop`` fixture). However, it runs the setup and teardown phases of each async
fixture in a new async task per operation, making context variable propagation
impossible and preventing task groups and cancel scopes from functioning properly.

The ``pytest-trio`` library, made for testing Trio projects, works only with Trio code.
Additionally, it only supports function scoped async fixtures. Another significant
difference with the AnyIO pytest plugin is that attempts to run the setup and teardown
for async fixtures concurrently when their dependency graphs allow that.
2 changes: 2 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ This library adheres to `Semantic Versioning 2.0 <http:https://semver.org/>`_.

- **BACKWARDS INCOMPATIBLE** Replaced AnyIO's own ``ExceptionGroup`` class with the PEP
654 ``BaseExceptionGroup`` and ``ExceptionGroup``
- **BACKWARDS INCOMPATIBLE** Changes the pytest plugin to run all tests and fixtures in
the same task, allowing fixtures to set context variables for tests and other fixtures
- Bumped minimum version of trio to v0.22
- Added ``create_unix_datagram_socket`` and ``create_connected_unix_datagram_socket`` to
create UNIX datagram sockets (PR by Jean Hominal)
Expand Down
92 changes: 63 additions & 29 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
WouldBlock,
)
from .._core._sockets import GetAddrInfoReturnType, convert_ipv6_sockaddr
from .._core._streams import create_memory_object_stream
from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
from .._core._synchronization import Event as BaseEvent
from .._core._synchronization import ResourceGuard
Expand All @@ -76,6 +77,7 @@
UNIXDatagramPacketType,
)
from ..lowlevel import RunVar
from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

if sys.version_info < (3, 11):
from exceptiongroup import BaseExceptionGroup, ExceptionGroup
Expand Down Expand Up @@ -1674,6 +1676,8 @@ def _create_task_info(task: asyncio.Task) -> TaskInfo:


class TestRunner(abc.TestRunner):
_send_stream: MemoryObjectSendStream[tuple[Awaitable[Any], asyncio.Future[Any]]]

def __init__(
self,
debug: bool = False,
Expand All @@ -1685,6 +1689,7 @@ def __init__(
self._loop = asyncio.new_event_loop()
self._loop.set_debug(debug)
self._loop.set_exception_handler(self._exception_handler)
self._runner_task: asyncio.Task | None = None
asyncio.set_event_loop(self._loop)

def _cancel_all_tasks(self) -> None:
Expand Down Expand Up @@ -1724,8 +1729,44 @@ def _raise_async_exceptions(self) -> None:
"Multiple exceptions occurred in asynchronous callbacks", exceptions
)

@staticmethod
async def _run_tests_and_fixtures(
receive_stream: MemoryObjectReceiveStream[
tuple[Coroutine[Any, Any, T_Retval], Future[T_Retval]]
],
) -> None:
with receive_stream:
async for coro, future in receive_stream:
try:
retval = await coro
except BaseException as exc:
if not future.cancelled():
future.set_exception(exc)
else:
if not future.cancelled():
future.set_result(retval)

async def _call_in_runner_task(
self, func: Callable[..., Awaitable[T_Retval]], *args: object, **kwargs: object
) -> T_Retval:
if not self._runner_task:
self._send_stream, receive_stream = create_memory_object_stream(1)
self._runner_task = self._loop.create_task(
self._run_tests_and_fixtures(receive_stream)
)

coro = func(*args, **kwargs)
future: asyncio.Future[T_Retval] = self._loop.create_future()
self._send_stream.send_nowait((coro, future))
return await future

def close(self) -> None:
try:
if self._runner_task is not None:
self._runner_task = None
self._loop.run_until_complete(self._send_stream.aclose())
del self._send_stream

self._cancel_all_tasks()
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
finally:
Expand All @@ -1737,49 +1778,42 @@ def run_asyncgen_fixture(
fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
kwargs: dict[str, Any],
) -> Iterable[T_Retval]:
async def fixture_runner() -> None:
agen = fixture_func(**kwargs)
try:
retval = await agen.asend(None)
self._raise_async_exceptions()
except BaseException as exc:
f.set_exception(exc)
return
else:
f.set_result(retval)

await event.wait()
try:
await agen.asend(None)
except StopAsyncIteration:
pass
else:
await agen.aclose()
raise RuntimeError("Async generator fixture did not stop")

f = self._loop.create_future()
event = asyncio.Event()
fixture_task = self._loop.create_task(fixture_runner())
self._loop.run_until_complete(f)
yield f.result()
event.set()
self._loop.run_until_complete(fixture_task)
asyncgen = fixture_func(**kwargs)
fixturevalue: T_Retval = self._loop.run_until_complete(
self._call_in_runner_task(asyncgen.asend, None)
)
self._raise_async_exceptions()

yield fixturevalue

try:
self._loop.run_until_complete(
self._call_in_runner_task(asyncgen.asend, None)
)
except StopAsyncIteration:
self._raise_async_exceptions()
else:
self._loop.run_until_complete(asyncgen.aclose())
raise RuntimeError("Async generator fixture did not stop")

def run_fixture(
self,
fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
kwargs: dict[str, Any],
) -> T_Retval:
retval = self._loop.run_until_complete(fixture_func(**kwargs))
retval = self._loop.run_until_complete(
self._call_in_runner_task(fixture_func, **kwargs)
)
self._raise_async_exceptions()
return retval

def run_test(
self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
) -> None:
try:
self._loop.run_until_complete(test_func(**kwargs))
self._loop.run_until_complete(
self._call_in_runner_task(test_func, **kwargs)
)
except Exception as exc:
self._exceptions.append(exc)

Expand Down
98 changes: 40 additions & 58 deletions src/anyio/_backends/_trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
from types import TracebackType
from typing import (
IO,
TYPE_CHECKING,
Any,
AsyncGenerator,
Awaitable,
Callable,
Collection,
ContextManager,
Coroutine,
Deque,
Generic,
Mapping,
NoReturn,
Expand Down Expand Up @@ -54,15 +52,14 @@
EndOfStream,
)
from .._core._sockets import GetAddrInfoReturnType, convert_ipv6_sockaddr
from .._core._streams import create_memory_object_stream
from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
from .._core._synchronization import Event as BaseEvent
from .._core._synchronization import ResourceGuard
from .._core._tasks import CancelScope as BaseCancelScope
from ..abc import IPSockAddrType, UDPPacketType, UNIXDatagramPacketType
from ..abc._eventloop import AsyncBackend

if TYPE_CHECKING:
from trio_typing import TaskStatus
from ..streams.memory import MemoryObjectSendStream

T = TypeVar("T")
T_Retval = TypeVar("T_Retval")
Expand Down Expand Up @@ -708,96 +705,81 @@ async def __anext__(self) -> Signals:

class TestRunner(abc.TestRunner):
def __init__(self, **options: Any) -> None:
from collections import deque
from queue import Queue

self._call_queue: Queue[Callable[..., object]] = Queue()
self._result_queue: Deque[Outcome] = deque()
self._stop_event: trio.Event | None = None
self._nursery: trio.Nursery | None = None
self._send_stream: MemoryObjectSendStream | None = None
self._options = options

async def _trio_main(self) -> None:
self._stop_event = trio.Event()
async with trio.open_nursery() as self._nursery:
await self._stop_event.wait()

async def _call_func(
self, func: Callable[..., Awaitable[object]], args: tuple, kwargs: dict
) -> None:
try:
retval = await func(*args, **kwargs)
except BaseException as exc:
self._result_queue.append(Error(exc))
else:
self._result_queue.append(Value(retval))
async def _run_tests_and_fixtures(self) -> None:
self._send_stream, receive_stream = create_memory_object_stream(1)
with receive_stream:
async for coro, outcome_holder in receive_stream:
try:
retval = await coro
except BaseException as exc:
outcome_holder.append(Error(exc))
else:
outcome_holder.append(Value(retval))

def _main_task_finished(self, outcome: object) -> None:
self._nursery = None
self._send_stream = None

def _get_nursery(self) -> trio.Nursery:
if self._nursery is None:
def _call_in_runner_task(
self, func: Callable[..., Awaitable[T_Retval]], *args: object, **kwargs: object
) -> T_Retval:
if self._send_stream is None:
trio.lowlevel.start_guest_run(
self._trio_main,
self._run_tests_and_fixtures,
run_sync_soon_threadsafe=self._call_queue.put,
done_callback=self._main_task_finished,
**self._options,
)
while self._nursery is None:
while self._send_stream is None:
self._call_queue.get()()

return self._nursery

def _call(
self, func: Callable[..., Awaitable[T_Retval]], *args: object, **kwargs: object
) -> T_Retval:
self._get_nursery().start_soon(self._call_func, func, args, kwargs)
while not self._result_queue:
outcome_holder: list[Outcome] = []
self._send_stream.send_nowait((func(*args, **kwargs), outcome_holder))
while not outcome_holder:
self._call_queue.get()()

outcome = self._result_queue.pop()
return outcome.unwrap()
return outcome_holder[0].unwrap()

def close(self) -> None:
if self._stop_event:
self._stop_event.set()
while self._nursery is not None:
if self._send_stream:
self._send_stream.close()
while self._send_stream is not None:
self._call_queue.get()()

def run_asyncgen_fixture(
self,
fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
kwargs: dict[str, Any],
) -> Iterable[T_Retval]:
async def fixture_runner(*, task_status: TaskStatus[T_Retval]) -> None:
agen = fixture_func(**kwargs)
retval = await agen.asend(None)
task_status.started(retval)
await teardown_event.wait()
try:
await agen.asend(None)
except StopAsyncIteration:
pass
else:
await agen.aclose()
raise RuntimeError("Async generator fixture did not stop")
asyncgen = fixture_func(**kwargs)
fixturevalue: T_Retval = self._call_in_runner_task(asyncgen.asend, None)

yield fixturevalue

teardown_event = trio.Event()
fixture_value = self._call(lambda: self._get_nursery().start(fixture_runner))
yield fixture_value
teardown_event.set()
try:
self._call_in_runner_task(asyncgen.asend, None)
except StopAsyncIteration:
pass
else:
self._call_in_runner_task(asyncgen.aclose)
raise RuntimeError("Async generator fixture did not stop")

def run_fixture(
self,
fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
kwargs: dict[str, Any],
) -> T_Retval:
return self._call(fixture_func, **kwargs)
return self._call_in_runner_task(fixture_func, **kwargs)

def run_test(
self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
) -> None:
self._call(test_func, **kwargs)
self._call_in_runner_task(test_func, **kwargs)


class TrioBackend(AsyncBackend):
Expand Down
Loading

0 comments on commit 1b57189

Please sign in to comment.