Skip to content

Commit

Permalink
Add KVMap for MQTTContext and update Kafka filter (#462)
Browse files Browse the repository at this point in the history
* add kvmap to mqttcontext, allow kafka filter get data from mqtt context kvmap

* update kafka spec to eliminate ambiguity

* remove meanless constant
  • Loading branch information
suchen-sci committed Jan 14, 2022
1 parent bc74307 commit 735656a
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 192 deletions.
27 changes: 18 additions & 9 deletions pkg/context/mqttcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ type (
DisconnectPacket() *packets.DisconnectPacket // read only
SubscribePacket() *packets.SubscribePacket // read only
UnsubscribePacket() *packets.UnsubscribePacket // read only

PublishPacket() MQTTPublishPacket
PublishPacket() *packets.PublishPacket // read only

SetDrop() // set drop value to true
Drop() bool // if true, this mqtt packet will be dropped
SetDisconnect() // set disconnect value to true
Disconnect() bool // if true, this mqtt client will be disconnected
SetEarlyStop() // set early stop value to true
EarlyStop() bool // if early stop is true, pipeline will skip following filters and return

SetKV(string, interface{})
GetKV(string) interface{}
}

// MQTTClient contains client info that send this packet
Expand All @@ -74,8 +76,7 @@ type (
client MQTTClient
packet packets.ControlPacket
packetType MQTTPacketType

publishPacket MQTTPublishPacket
kvMap map[string]interface{}

err error
drop int32
Expand All @@ -85,7 +86,7 @@ type (

// MQTTResult is result for handling mqtt request
MQTTResult struct {
Err error
ErrString string
}
)

Expand Down Expand Up @@ -120,14 +121,14 @@ func NewMQTTContext(ctx stdcontext.Context, client MQTTClient, packet packets.Co
cancelFunc: cancelFunc,
startTime: startTime,
client: client,
kvMap: make(map[string]interface{}),
}

switch packet := packet.(type) {
switch packet.(type) {
case *packets.ConnectPacket:
mqttCtx.packetType = MQTTConnect
case *packets.PublishPacket:
mqttCtx.packetType = MQTTPublish
mqttCtx.publishPacket = newMQTTPublishPacket(packet)
case *packets.DisconnectPacket:
mqttCtx.packetType = MQTTDisconnect
case *packets.SubscribePacket:
Expand Down Expand Up @@ -223,8 +224,8 @@ func (ctx *mqttContext) ConnectPacket() *packets.ConnectPacket {
return ctx.packet.(*packets.ConnectPacket)
}

func (ctx *mqttContext) PublishPacket() MQTTPublishPacket {
return ctx.publishPacket
func (ctx *mqttContext) PublishPacket() *packets.PublishPacket {
return ctx.packet.(*packets.PublishPacket)
}

func (ctx *mqttContext) DisconnectPacket() *packets.DisconnectPacket {
Expand Down Expand Up @@ -262,3 +263,11 @@ func (ctx *mqttContext) SetEarlyStop() {
func (ctx *mqttContext) EarlyStop() bool {
return atomic.LoadInt32(&ctx.earlyStop) == 1
}

func (ctx *mqttContext) SetKV(key string, value interface{}) {
ctx.kvMap[key] = value
}

func (ctx *mqttContext) GetKV(key string) interface{} {
return ctx.kvMap[key]
}
90 changes: 0 additions & 90 deletions pkg/context/mqttpublish.go

This file was deleted.

6 changes: 2 additions & 4 deletions pkg/filter/connectcontrol/connectcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package connectcontrol

import (
"errors"
"regexp"

"github.com/megaease/easegress/pkg/context"
Expand All @@ -34,7 +33,6 @@ const (
)

// ErrBannedClientOrTopic is error for banned client or topic
var ErrBannedClientOrTopic = errors.New(resultBannedClientOrTopic)

func init() {
pipeline.Register(&ConnectControl{})
Expand Down Expand Up @@ -162,7 +160,7 @@ func (cc *ConnectControl) checkBan(ctx context.MQTTContext) bool {
if _, ok := cc.bannedClients[cid]; ok {
return true
}
topic := ctx.PublishPacket().Topic()
topic := ctx.PublishPacket().TopicName
if cc.bannedTopicRe != nil && cc.bannedTopicRe.MatchString(topic) {
return true
}
Expand All @@ -183,7 +181,7 @@ func (cc *ConnectControl) HandleMQTT(ctx context.MQTTContext) *context.MQTTResul
if cc.spec.EarlyStop {
ctx.SetEarlyStop()
}
return &context.MQTTResult{Err: ErrBannedClientOrTopic}
return &context.MQTTResult{ErrString: resultBannedClientOrTopic}
}
return &context.MQTTResult{}
}
38 changes: 19 additions & 19 deletions pkg/filter/connectcontrol/connectcontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestConnectControl(t *testing.T) {
type testCase struct {
cid string
topic string
err error
errString string
disconnect bool
earlyStop bool
}
Expand All @@ -110,7 +110,7 @@ func doTest(t *testing.T, spec *Spec, testCases []testCase) {
for _, test := range testCases {
ctx := newContext(test.cid, test.topic)
res := cc.HandleMQTT(ctx)
assert.Equal(res.Err, test.err)
assert.Equal(res.ErrString, test.errString)
assert.Equal(ctx.Disconnect(), test.disconnect)
assert.Equal(ctx.EarlyStop(), test.earlyStop)
}
Expand All @@ -128,11 +128,11 @@ func TestHandleMQTT(t *testing.T) {
EarlyStop: true,
}
testCases := []testCase{
{cid: "ban1", topic: "unban", err: errors.New(resultBannedClientOrTopic), disconnect: true, earlyStop: true},
{cid: "ban2", topic: "unban", err: errors.New(resultBannedClientOrTopic), disconnect: true, earlyStop: true},
{cid: "unban1", topic: "ban/sport/ball", err: nil, disconnect: false, earlyStop: false},
{cid: "unban2", topic: "ban/sport/run", err: nil, disconnect: false, earlyStop: false},
{cid: "unban", topic: "unban", err: nil, disconnect: false, earlyStop: false},
{cid: "ban1", topic: "unban", errString: resultBannedClientOrTopic, disconnect: true, earlyStop: true},
{cid: "ban2", topic: "unban", errString: resultBannedClientOrTopic, disconnect: true, earlyStop: true},
{cid: "unban1", topic: "ban/sport/ball", errString: "", disconnect: false, earlyStop: false},
{cid: "unban2", topic: "ban/sport/run", errString: "", disconnect: false, earlyStop: false},
{cid: "unban", topic: "unban", errString: "", disconnect: false, earlyStop: false},
}
doTest(t, spec, testCases)

Expand All @@ -142,10 +142,10 @@ func TestHandleMQTT(t *testing.T) {
EarlyStop: true,
}
testCases = []testCase{
{cid: "unban1", topic: "ban/sport/ball", err: errors.New(resultBannedClientOrTopic), disconnect: true, earlyStop: true},
{cid: "unban2", topic: "ban/sport/run", err: errors.New(resultBannedClientOrTopic), disconnect: true, earlyStop: true},
{cid: "unban3", topic: "unban/sport", err: nil, disconnect: false, earlyStop: false},
{cid: "unban4", topic: "unban", err: nil, disconnect: false, earlyStop: false},
{cid: "unban1", topic: "ban/sport/ball", errString: resultBannedClientOrTopic, disconnect: true, earlyStop: true},
{cid: "unban2", topic: "ban/sport/run", errString: resultBannedClientOrTopic, disconnect: true, earlyStop: true},
{cid: "unban3", topic: "unban/sport", errString: "", disconnect: false, earlyStop: false},
{cid: "unban4", topic: "unban", errString: "", disconnect: false, earlyStop: false},
}
doTest(t, spec, testCases)

Expand All @@ -155,10 +155,10 @@ func TestHandleMQTT(t *testing.T) {
EarlyStop: true,
}
testCases = []testCase{
{cid: "phone123", topic: "ban/sport/ball", err: errors.New(resultBannedClientOrTopic), disconnect: true, earlyStop: true},
{cid: "phone256", topic: "ban/sport/run", err: errors.New(resultBannedClientOrTopic), disconnect: true, earlyStop: true},
{cid: "tv", topic: "unban/sport", err: nil, disconnect: false, earlyStop: false},
{cid: "tv", topic: "unban", err: nil, disconnect: false, earlyStop: false},
{cid: "phone123", topic: "ban/sport/ball", errString: resultBannedClientOrTopic, disconnect: true, earlyStop: true},
{cid: "phone256", topic: "ban/sport/run", errString: resultBannedClientOrTopic, disconnect: true, earlyStop: true},
{cid: "tv", topic: "unban/sport", errString: "", disconnect: false, earlyStop: false},
{cid: "tv", topic: "unban", errString: "", disconnect: false, earlyStop: false},
}
doTest(t, spec, testCases)

Expand All @@ -168,10 +168,10 @@ func TestHandleMQTT(t *testing.T) {
EarlyStop: true,
}
testCases = []testCase{
{cid: "phone123", topic: "ban/sport/ball", err: errors.New(resultBannedClientOrTopic), disconnect: true, earlyStop: true},
{cid: "phone256", topic: "ban/sport/run", err: errors.New(resultBannedClientOrTopic), disconnect: true, earlyStop: true},
{cid: "tv", topic: "unban", err: nil, disconnect: false, earlyStop: false},
{cid: "tv", topic: "unban", err: nil, disconnect: false, earlyStop: false},
{cid: "phone123", topic: "ban/sport/ball", errString: resultBannedClientOrTopic, disconnect: true, earlyStop: true},
{cid: "phone256", topic: "ban/sport/run", errString: resultBannedClientOrTopic, disconnect: true, earlyStop: true},
{cid: "tv", topic: "unban", errString: "", disconnect: false, earlyStop: false},
{cid: "tv", topic: "unban", errString: "", disconnect: false, earlyStop: false},
}
doTest(t, spec, testCases)

Expand Down
Loading

0 comments on commit 735656a

Please sign in to comment.