Skip to content

Commit

Permalink
fix easegress-server keep running on invalid yaml (#432)
Browse files Browse the repository at this point in the history
* fix easegress-server keep running on invalid yaml

* update error message

* update mqtt error msg

* fix mqtt test

* turn log off

* fix mqtt test bug
  • Loading branch information
suchen-sci committed Dec 23, 2021
1 parent 8ae49a6 commit fb7d28c
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/object/mqttproxy/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ func (b *Broker) httpGetAllSessionHandler(w http.ResponseWriter, r *http.Request
}

allSession, err := b.sessMgr.store.getPrefix(sessionStoreKey(""), false)
logger.SpanDebugf(span, "httpGetAllSessionHandler current total %v sessions, query %v, topic %v", len(allSession), []int{page, pageSize}, topic)
if err != nil {
logger.SpanErrorf(span, "get all sessions with prefix %v failed, %v", sessionStoreKey(""), err)
api.HandleAPIError(w, r, http.StatusInternalServerError, fmt.Errorf("get all sessions failed, %v", err))
Expand Down
49 changes: 47 additions & 2 deletions pkg/object/mqttproxy/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (t *testMQ) get() *packets.PublishPacket {
}

func init() {
logger.InitMock()
logger.InitNop()
pipeline.Register(&pipeline.MockMQTTFilter{})
}

Expand Down Expand Up @@ -177,16 +177,54 @@ func TestSubUnsub(t *testing.T) {
broker.close()
}

func checkSessionStore(broker *Broker, cid, topic string) error {
checkFn := func() error {
sessStr, err := broker.sessMgr.store.get(sessionStoreKey(cid))
if err != nil {
return err
}
if topic == "" {
return nil
}
sess := Session{
info: &SessionInfo{},
}
sess.decode(*sessStr)

for t := range sess.info.Topics {
if topic == t {
return nil
}
}
return fmt.Errorf("topic %v not in session %v", topic, cid)
}

for i := 0; i < 20; i++ {
if checkFn() == nil {
return nil
}
time.Sleep(50 * time.Millisecond)
}
return fmt.Errorf("session %v with topic %v not been stored", cid, topic)
}

func TestCleanSession(t *testing.T) {
b64passwd := base64.StdEncoding.EncodeToString([]byte("test"))
broker := getBroker("test", "test", b64passwd, 1883)

// client that set cleanSession
cid := "cleanSessionClient"
client := getMQTTClient(t, cid, "test", "test", true)
if err := checkSessionStore(broker, cid, ""); err != nil {
t.Fatal(err)
}
if token := client.Subscribe("test/cleanSession/0", 0, nil); token.Wait() && token.Error() != nil {
t.Errorf("subscribe qos0 error %s", token.Error())
}
if err := checkSessionStore(broker, cid, "test/cleanSession/0"); err != nil {
t.Fatal(err)
}

client.Disconnect(200)

c := broker.getClient(cid)
Expand Down Expand Up @@ -1845,10 +1883,17 @@ func TestHTTPGetAllSession(t *testing.T) {
clients := []paho.Client{}
clientNum := 10
for i := 0; i < clientNum; i++ {
client := getMQTTClient(t, strconv.Itoa(i), "test", "test", true)
cid := strconv.Itoa(i)
client := getMQTTClient(t, cid, "test", "test", true)
if err := checkSessionStore(broker, cid, ""); err != nil {
t.Fatal(err)
}
if token := client.Subscribe("topic", 1, nil); token.Wait() && token.Error() != nil {
t.Errorf("subscribe qos0 error %s", token.Error())
}
if err := checkSessionStore(broker, cid, "topic"); err != nil {
t.Fatal(err)
}
clients = append(clients, client)
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/option/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,12 @@ func (opt *Options) Parse() (string, error) {
opt.viper.Set(key, val)
}

opt.viper.Unmarshal(opt, func(c *mapstructure.DecoderConfig) {
err = opt.viper.Unmarshal(opt, func(c *mapstructure.DecoderConfig) {
c.TagName = "yaml"
})
if err != nil {
return "", fmt.Errorf("yaml file unmarshal failed, please make sure you provide valid yaml file, %v", err)
}

opt.renameLegacyClusterRoles()
err = opt.validate()
Expand Down

0 comments on commit fb7d28c

Please sign in to comment.