Skip to content

Commit

Permalink
Improve event data handling
Browse files Browse the repository at this point in the history
It looks like the first two characters after the ContentLength is
specified are a carriage return. This has thrown off the json string
parsing leading to hand rolling regex which fails in odd ways (see #204).
Now, pull out a couple extra characters to get past this leading
carriage return. If the leading characters are not there, we should have
a trailing \r\n\r\n which is safe to add, and will just be stripped out.
With this change in place, we are able to use json parsing on data, with
a fallback to sending the data string.
  • Loading branch information
flacjacket committed Mar 13, 2022
1 parent 4cbc74a commit cea551d
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions src/amcrest/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# GNU General Public License for more details.
#
# vim:sw=4:ts=4:et
import json
import logging
import re
from typing import (
Expand All @@ -31,9 +32,6 @@

_LOGGER = logging.getLogger(__name__)
_REG_PARSE_KEY_VALUE = re.compile(r"(?P<key>.+?)(?:=)(?P<value>.+?)(?:;|$)")
_REG_PARSE_MALFORMED_JSON = re.compile(
r'(?P<key>"[^"\\]*(?:\\.[^"\\]*)*"|[^\s"]+)\s:\s(?P<value>"[^"\\]*(?:\\.[^"\\]*)*"|[^\s"]+)'
)


def _event_lines(ret: Iterable[str]) -> Iterator[str]:
Expand All @@ -54,6 +52,14 @@ async def _async_event_lines(ret: AsyncIterable[str]) -> AsyncIterator[str]:
line.clear()


async def _async_event_info(ret: AsyncIterable[str], length: int) -> str:
line = []
async for char in ret:
line.append(char)
if len(line) == length:
return "".join(line)


class Event(Http):
def event_handler_config(self, handlername: str) -> str:
return self._get_config(handlername)
Expand Down Expand Up @@ -304,7 +310,10 @@ def event_stream(
try:
for line in _event_lines(ret.iter_content(decode_unicode=True)):
if line.lower().startswith("content-length:"):
chunk_size = int(line.split(":")[1])
# There tends to be a leading \r\n, so add 2 to the length.
# If this is not present, there is a trailing \r\n\r\n at
# the end that will just be stripped out
chunk_size = int(line.split(":")[1]) + 2
try:
yield next(
ret.iter_content(
Expand Down Expand Up @@ -332,16 +341,12 @@ async def async_event_stream(
it = ret.aiter_text(chunk_size=1)
async for line in _async_event_lines(it):
if line.lower().startswith("content-length:"):
chunk_size = int(line.split(":")[1])
chars = []
async for char in it:
chars.append(char)
if len(chars) == chunk_size:
break
else:
# If we can't get the chunk, then return out
return
yield "".join(chars)
# There tends to be a leading \r\n, so add 2 to the
# length. If this is not present, there is a
# trailing \r\n\r\n at the end that will just be
# stripped out
chunk_size = int(line.split(":")[1]) + 2
yield await _async_event_info(it, chunk_size)
except ReadTimeoutError:
continue

Expand Down Expand Up @@ -375,12 +380,10 @@ def _build_payload(self, event_info: str) -> Dict[str, Any]:
event_info.strip().replace("\n", "")
):
if key == "data":
value = {
data_key.replace('"', ""): data_value.replace('"', "")
for data_key, data_value in _REG_PARSE_MALFORMED_JSON.findall(
value
)
}
try:
value = json.loads(value)
except json.JSONDecodeError:
pass
payload[key] = value
_LOGGER.debug(
"%s generate new event, code: %s , payload: %s",
Expand Down

0 comments on commit cea551d

Please sign in to comment.