Skip to content

Commit

Permalink
Raise timeout exception from async stream
Browse files Browse the repository at this point in the history
Using httpx, the `.stream` method does not raise an exception when the
device goes offline if the read timeout is disabled. To get around this,
introduce a new exception that will be thrown if a timeout is hit so it
can more easily be handled and the stream reconnected as needed.  Add
handling logic to the async_event_stream so it behaves like the
non-async method by retrying while read errors are thrown.
  • Loading branch information
flacjacket committed Mar 13, 2022
1 parent 0f91f6e commit 4cbc74a
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 28 deletions.
48 changes: 22 additions & 26 deletions src/amcrest/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from requests import RequestException
from urllib3.exceptions import HTTPError

from .exceptions import CommError
from .exceptions import CommError, ReadTimeoutError
from .http import Http, TimeoutT
from .utils import pretty

Expand Down Expand Up @@ -323,31 +323,27 @@ async def async_event_stream(
self, eventcodes: str, *, timeout_cmd: TimeoutT = None
) -> AsyncIterator[str]:
"""Return a stream of event info lines."""
# If timeout is not specified, then use default, but remove read
# timeout since there's no telling when, if ever, an event will come.
if timeout_cmd is None:
if isinstance(self._timeout_default, tuple):
timeout_cmd = self._timeout_default[0], None
else:
timeout_cmd = self._timeout_default, None

async with self.async_stream_command(
f"eventManager.cgi?action=attach&codes=[{eventcodes}]",
timeout_cmd=timeout_cmd,
) as ret:
it = ret.aiter_text(chunk_size=1)
async for line in _async_event_lines(it):
if line.lower().startswith("content-length:"):
chunk_size = int(line.split(":")[1])
chars = []
async for char in it:
chars.append(char)
if len(chars) == chunk_size:
break
else:
# If we can't get the chunk, then return out
return
yield "".join(chars)
while True:
try:
async with self.async_stream_command(
f"eventManager.cgi?action=attach&codes=[{eventcodes}]",
timeout_cmd=timeout_cmd,
) as ret:
it = ret.aiter_text(chunk_size=1)
async for line in _async_event_lines(it):
if line.lower().startswith("content-length:"):
chunk_size = int(line.split(":")[1])
chars = []
async for char in it:
chars.append(char)
if len(chars) == chunk_size:
break
else:
# If we can't get the chunk, then return out
return
yield "".join(chars)
except ReadTimeoutError:
continue

def event_actions(
self,
Expand Down
4 changes: 4 additions & 0 deletions src/amcrest/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ class AmcrestError(Exception):
"""General Amcrest error occurred."""


class ReadTimeoutError(AmcrestError):
"""A read timeout error occurred."""


class CommError(AmcrestError):
"""A communication error occurred."""

Expand Down
7 changes: 5 additions & 2 deletions src/amcrest/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
MAX_RETRY_HTTP_CONNECTION,
TIMEOUT_HTTP_PROTOCOL,
)
from .exceptions import CommError, LoginError
from .exceptions import CommError, LoginError, ReadTimeoutError
from .utils import clean_url, pretty

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -425,7 +425,10 @@ async def _async_stream_command(
cmd_id,
error,
)
raise CommError(error) from error
if isinstance(error, httpx.ReadTimeout):
raise ReadTimeoutError(error) from error
else:
raise CommError(error) from error

def command_audio(
self,
Expand Down

0 comments on commit 4cbc74a

Please sign in to comment.