Skip to content

Commit

Permalink
fix mqttproxy broker mode bug (#898)
Browse files Browse the repository at this point in the history
  • Loading branch information
suchen-sci committed Jan 3, 2023
1 parent fb4e0eb commit c2d7328
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
18 changes: 11 additions & 7 deletions pkg/object/mqttproxy/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (b *Broker) connectWatcher() {
for k, v := range sessions {
watcherEvent[k] = &v
}
b.processWatcherEvent(watcherEvent)
b.processWatcherEvent(watcherEvent, true)
}

clients := []*Client{}
Expand Down Expand Up @@ -236,13 +236,13 @@ func (b *Broker) watch(ch <-chan map[string]*string, closeFunc func()) {
go b.connectWatcher()
return
}
b.processWatcherEvent(m)
b.processWatcherEvent(m, false)
}
}
}

func (b *Broker) processWatcherEvent(event map[string]*string) {
syncMap := make(map[string]*SessionInfo)
func (b *Broker) processWatcherEvent(event map[string]*string, sync bool) {
sessMap := make(map[string]*SessionInfo)
for k, v := range event {
clientID := strings.TrimPrefix(k, sessionStoreKey(""))
if v != nil {
Expand All @@ -255,7 +255,7 @@ func (b *Broker) processWatcherEvent(event map[string]*string) {
continue
}
info = session.info
syncMap[clientID] = session.info
sessMap[clientID] = session.info
}
// the new session created, the scenario could indicate
// that a device reconnect to a broker of the MQTT cluster,
Expand All @@ -279,8 +279,12 @@ func (b *Broker) processWatcherEvent(event map[string]*string) {
}
}

if b.spec.BrokerMode && len(syncMap) > 0 {
b.sessionCacheMgr.sync(syncMap)
if b.spec.BrokerMode && len(sessMap) > 0 {
if sync {
b.sessionCacheMgr.sync(sessMap)
return
}
b.sessionCacheMgr.update(sessMap)
}
}

Expand Down
23 changes: 13 additions & 10 deletions pkg/object/mqttproxy/sessioncache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import "sync"
// SessionCache is used in broker mode only. It stores session info of clients from
// different easegress instances. It update topic manager when session info of is updated.
type SessionCacheManager interface {
update(clientID string, session *SessionInfo)
update(clients map[string]*SessionInfo)
delete(clientID string)
sync(clients map[string]*SessionInfo)
getEGName(clientID string) string
Expand Down Expand Up @@ -52,7 +52,6 @@ const (
type sessionCacheOp struct {
opType sessionCacheOpType
clientID string
session *SessionInfo
clients map[string]*SessionInfo
}

Expand All @@ -68,19 +67,17 @@ func newSessionCacheManager(sepc *Spec, topicMgr TopicManager) SessionCacheManag
return mgr
}

func (c *sessionCacheManager) update(clientID string, session *SessionInfo) {
func (c *sessionCacheManager) update(clients map[string]*SessionInfo) {
c.writeCh <- &sessionCacheOp{
opType: sessionCacheOpUpdate,
clientID: clientID,
session: session,
opType: sessionCacheOpUpdate,
clients: clients,
}
}

func (c *sessionCacheManager) delete(clientID string) {
c.writeCh <- &sessionCacheOp{
opType: sessionCacheOpDelete,
clientID: clientID,
session: nil,
}
}

Expand All @@ -91,7 +88,13 @@ func (c *sessionCacheManager) sync(clients map[string]*SessionInfo) {
}
}

func (c *sessionCacheManager) processUpdate(clientID string, session *SessionInfo) {
func (c *sessionCacheManager) processUpdate(clients map[string]*SessionInfo) {
for k, v := range clients {
c.processSingleUpdate(k, v)
}
}

func (c *sessionCacheManager) processSingleUpdate(clientID string, session *SessionInfo) {
c.egNameCache.Store(clientID, session.EGName)
if session.EGName == c.egName {
c.cache[clientID] = session
Expand Down Expand Up @@ -149,7 +152,7 @@ func (c *sessionCacheManager) processSync(clients map[string]*SessionInfo) {
}
}
for k, v := range clients {
c.processUpdate(k, v)
c.processSingleUpdate(k, v)
}
}

Expand All @@ -161,7 +164,7 @@ func (c *sessionCacheManager) run() {
case op := <-c.writeCh:
switch op.opType {
case sessionCacheOpUpdate:
c.processUpdate(op.clientID, op.session)
c.processUpdate(op.clients)
case sessionCacheOpDelete:
c.processDelete(op.clientID)
case sessionCacheOpSync:
Expand Down

0 comments on commit c2d7328

Please sign in to comment.