Skip to content

Commit

Permalink
speed improvements on topic matching
Browse files Browse the repository at this point in the history
- Faster topic matching for non-wildcard topics (which appears
  to be the bulk used by VRM now).
- Check the already-published set before applying the matching rules: much
  faster
- When adding a new topic match, send matched data immediately: drastically
  improves response.
- If new topic match isn't new, don't republish all the data matched by it.
  • Loading branch information
izak committed Sep 23, 2021
1 parent 1c8b730 commit 8ac9578
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions dbus_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ def __eq__(self, other):
def __hash__(self):
return hash('/'.join(self.topic))

class ExactTopic(Topic):
""" This is here because it is faster for matches without
wildcards. """
def match(self, topic):
return self.topic == topic.split('/')

class Subscriptions(object):
def __init__(self):
self.topics = []
Expand All @@ -96,13 +102,18 @@ def subscribe_all(self, ttl=MAX_TOPIC_AGE):
self.topics.insert(0, w)

def subscribe(self, topic, ttl=MAX_TOPIC_AGE):
t = Topic(topic, ttl)
t = Topic(topic, ttl) if '+' in topic or '#' in topic else ExactTopic(topic, ttl)
# Removing and re-adding updates timestamp and potentially also ttl
try:
self.topics.remove(t)
except ValueError:
pass
# topic wasn't in the list, add it
self.topics.append(t)
return t

# Topic was in the list, but removed. Re-add it.
self.topics.append(t)
return None

def match(self, t):
return any(topic.match(t) for topic in self.topics)
Expand Down Expand Up @@ -187,7 +198,9 @@ def publish(self, topic, value):
self._publish(topic, value)
else:
_topic = topic.split('/', 2)[2]
if self._subscriptions.match(_topic):
if PublishedTopic(topic) in self._published:
self._publish(topic, value)
elif self._subscriptions.match(_topic):
self._published.add(PublishedTopic(topic, _topic))
self._publish(topic, value)

Expand Down Expand Up @@ -277,10 +290,19 @@ def _handle_keepalive(self, payload):
if payload:
topics = json.loads(payload)
for topic in topics:
self._subscriptions.subscribe(topic, self._keep_alive_interval)
ob = self._subscriptions.subscribe(topic, self._keep_alive_interval)
# Publish only those that are directly matched by the newly
# added match. If we end up with overlap, it is no biggie. It
# is queued and rate-limited anyway.
if ob is not None:
for k, v in self._values.items():
short = k.split('/', 2)[2]
if ob.match(short):
self._published.add(PublishedTopic(k, short))
self._publish(k, v)
else:
self._subscriptions.subscribe_all(self._keep_alive_interval)
self._publish_all()
self._publish_all()

def _handle_write(self, topic, payload):
logging.debug('[Write] Writing {} to {}'.format(payload, topic))
Expand Down

0 comments on commit 8ac9578

Please sign in to comment.