Skip to content

Commit

Permalink
rm mqtt client from broker when close, fix double close chan when bro…
Browse files Browse the repository at this point in the history
…ker and client close at same time (#302)
  • Loading branch information
suchen-sci committed Oct 15, 2021
1 parent 5597ba9 commit c0f5200
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
10 changes: 10 additions & 0 deletions pkg/object/mqttproxy/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,16 @@ func (b *Broker) getClient(clientID string) *Client {
return nil
}

func (b *Broker) removeClient(clientID string) {
b.Lock()
defer b.Unlock()
if val, ok := b.clients[clientID]; ok {
if val.disconnected() {
delete(b.clients, clientID)
}
}
}

func (b *Broker) topicsPublishHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
api.HandleAPIError(w, r, http.StatusBadRequest, fmt.Errorf("suppose POST request but got %s", r.Method))
Expand Down
26 changes: 16 additions & 10 deletions pkg/object/mqttproxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"

"github.com/eclipse/paho.mqtt.golang/packets"
Expand Down Expand Up @@ -60,9 +61,9 @@ type (
session *Session
conn net.Conn

info ClientInfo
status int
done chan struct{}
info ClientInfo
statusFlag int32
done chan struct{}
}
)

Expand All @@ -85,11 +86,11 @@ func newClient(connect *packets.ConnectPacket, broker *Broker, conn net.Conn) *C
will: will,
}
client := &Client{
broker: broker,
conn: conn,
info: info,
status: Connected,
done: make(chan struct{}),
broker: broker,
conn: conn,
info: info,
statusFlag: Connected,
done: make(chan struct{}),
}
return client
}
Expand All @@ -100,6 +101,7 @@ func (c *Client) readLoop() {
c.broker.backend.publish(c.info.will)
}
c.closeAndDelSession()
c.broker.removeClient(c.info.cid)
}()
keepAlive := time.Duration(c.info.keepalive) * time.Second
timeOut := keepAlive + keepAlive/2
Expand Down Expand Up @@ -236,13 +238,17 @@ func (c *Client) writePacket(packet packets.ControlPacket) error {
func (c *Client) close() {
c.Lock()
defer c.Unlock()
if c.status == Disconnected {
if c.disconnected() {
return
}
c.status = Disconnected
atomic.StoreInt32(&c.statusFlag, Disconnected)
close(c.done)
}

func (c *Client) disconnected() bool {
return atomic.LoadInt32(&c.statusFlag) == Disconnected
}

func (c *Client) closeAndDelSession() {
c.broker.sessMgr.delLocal(c.info.cid)
c.close()
Expand Down
5 changes: 2 additions & 3 deletions pkg/object/mqttproxy/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ func (sm *SessionManager) get(clientID string) *Session {
}

func (sm *SessionManager) delLocal(clientID string) {
sess := sm.get(clientID)
if sess != nil {
if val, ok := sm.sessionMap.LoadAndDelete(clientID); ok {
sess := val.(*Session)
sess.close()
}
sm.sessionMap.Delete(clientID)
}

0 comments on commit c0f5200

Please sign in to comment.