Skip to content

Commit

Permalink
Merge pull request pedrokiefer#4 from krishna-kashyap/master
Browse files Browse the repository at this point in the history
Async handling of frames
  • Loading branch information
pedrokiefer committed May 27, 2018
2 parents a1adaaa + b4e5bb5 commit 6c6dfc0
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 58 deletions.
69 changes: 16 additions & 53 deletions aiostomp/aiostomp.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,24 @@ def __init__(self, frame_handler,
username=None, password=None,
client_id=None,
stats=None):

self.handlers_map = {'MESSAGE': self._handle_message,
'CONNECTED': self._handle_connect,
'ERROR': self._handle_error
}

self.heartbeat = heartbeat
self.heartbeater = None

self._loop = loop
self._frame_handler = frame_handler
self._task_handler = self._loop.create_task(self.start())
self._force_close = False
self._stats = stats

self._waiter = None
self._frames = deque()

self._transport = None
self._protocol = sp()
self._connect_headers = OrderedDict()

Expand All @@ -261,20 +267,16 @@ def __init__(self, frame_handler,
self._connect_headers['passcode'] = password

def close(self):
self._transport = None

# Close the transport only if already connection is made
if self._transport:
# Close the transport to stomp receiving any more data
self._transport.close()

if self.heartbeater:
self.heartbeater.shutdown()
self.heartbeater = None

if self._task_handler:
self._task_handler.cancel()

if self._waiter:
self._task_handler.cancel()

self._task_handler = None

def connect(self):
buf = self._protocol.build_frame(
'CONNECT', headers=self._connect_headers)
Expand Down Expand Up @@ -326,14 +328,6 @@ def connection_lost(self, exc):
self.heartbeater.shutdown()
self.heartbeater = None

if self._task_handler:
self._task_handler.cancel()

if self._waiter:
self._task_handler.cancel()

self._task_handler = None

self._frame_handler.connection_lost(exc)

async def _handle_connect(self, frame):
Expand Down Expand Up @@ -384,45 +378,14 @@ def data_received(self, data):

self._protocol.feed_data(data)

frames = self._protocol.pop_frames()
if frames:
for frame in frames:
self._frames.append(frame)

if self._waiter is not None:
if not self._waiter.done():
self._waiter.set_result(None)
for frame in self._protocol.pop_frames():
if frame.command != 'HEARTBEAT':
self._loop.create_task(self.handlers_map.get(frame.command,
self._handle_exception)(frame))

def eof_received(self):
self.connection_lost(Exception('Got EOF from server'))

async def start(self):
loop = self._loop

while not self._force_close:
if not self._frames:
try:
# wait for next request
self._waiter = loop.create_future()
await self._waiter
except asyncio.CancelledError:
break
finally:
self._waiter = None

frame = self._frames.popleft()

if frame.command == 'MESSAGE':
await self._handle_message(frame)
elif frame.command == 'CONNECTED':
await self._handle_connect(frame)
elif frame.command == 'ERROR':
await self._handle_error(frame)
elif frame.command == 'HEARTBEAT':
pass
else:
await self._handle_exception(frame)


class StompProtocol(object):

Expand Down
7 changes: 2 additions & 5 deletions aiostomp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,11 @@ def reset(self):
self._frames_ready = []

def feed_data(self, data):
pending_data = data
pending_data = self._feed_data(data)

while True:
while pending_data:
pending_data = self._feed_data(pending_data)

if pending_data is None:
return

def _feed_data(self, data):

if data is None:
Expand Down
13 changes: 13 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ async def test_connection_can_be_made(self, connect_mock):

connect_mock.assert_called_once()

@patch('aiostomp.aiostomp.StompReader.connect')
@unittest_run_loop
async def test_transport_is_closed_connection_close(self, connect_mock):
stomp = StompReader(None, self.loop)

transport = Mock()

stomp.connection_made(transport)

connect_mock.assert_called_once()

stomp.close()

def test_connection_can_be_lost(self):
frame_handler = Mock()
heartbeater = Mock()
Expand Down

0 comments on commit 6c6dfc0

Please sign in to comment.