Skip to content

Commit

Permalink
kafka filter support sync producer and message key (#1239)
Browse files Browse the repository at this point in the history
  • Loading branch information
suchen-sci committed Mar 11, 2024
1 parent 3dad560 commit 15fcaa0
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 26 deletions.
22 changes: 22 additions & 0 deletions docs/07.Reference/7.02.Filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
- [validator.OAuth2TokenIntrospect](#validatoroauth2tokenintrospect)
- [validator.OAuth2JWT](#validatoroauth2jwt)
- [kafka.Topic](#kafkatopic)
- [kafka.Key](#kafkakey)
- [headertojson.HeaderMap](#headertojsonheadermap)
- [headerlookup.HeaderSetterSpec](#headerlookupheadersetterspec)
- [requestadaptor.SignerSpec](#requestadaptorsignerspec)
Expand Down Expand Up @@ -1019,18 +1020,32 @@ Below is an example configuration.
kind: Kafka
name: kafka-example
backend: [":9093"]
# sync determines the usage of AsyncProducer or SyncProducer for the Kafka filter.
# default is false. If set to true, encountering a message error will cause the Kafka filter
# to respond with a status code of 503 and a body containing "{err: "error message"}".
sync: false
topic:
# default topic for Kafka message
default: kafka-topic
# dynamic topic for Kafka message, get from http header
dynamic:
header: X-Kafka-Topic
key:
# default key for Kafka message
default: kafka-key
# dynamic key for Kafka message, get from http header
dynamic:
header: X-Kafka-Key
```

### Configuration

| Name | Type | Description | Required |
| ------------ | -------- | -------------------------------- | -------- |
| backend | []string | Addresses of Kafka backend | Yes |
| sync | bool | Usage of AsyncProducer or SyncProducer, default is false | No |
| topic | [Kafka.Topic](#kafkatopic) | the topic is Spec used to get Kafka topic used to send message to the backend | Yes |
| key | [Kafka.Key](#kafkakey) | the key is Spec used to get Kafka message key | No |


### Results
Expand Down Expand Up @@ -2029,6 +2044,13 @@ The relationship between `methods` and `url` is `AND`.
| default | string | Default topic for Kafka backend | Yes |
| dynamic.header | string | The HTTP header that contains Kafka topic | Yes |

### kafka.Key

| Name | Type | Description | Required |
| --------- | ------ | ------------------------------------------------------------------------ | -------- |
| default | string | Default key for Kafka message | Yes |
| dynamic.header | string | The HTTP header that contains Kafka key | No |

### headertojson.HeaderMap

| Name | Type | Description | Required |
Expand Down
116 changes: 96 additions & 20 deletions pkg/filters/kafkabackend/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/megaease/easegress/v2/pkg/filters"
"github.com/megaease/easegress/v2/pkg/logger"
"github.com/megaease/easegress/v2/pkg/protocols/httpprot"
"github.com/megaease/easegress/v2/pkg/util/codectool"
)

const (
Expand Down Expand Up @@ -56,10 +57,19 @@ func init() {
type (
// Kafka is a kafka proxy for HTTP requests.
Kafka struct {
spec *Spec
producer sarama.AsyncProducer
done chan struct{}
header string
spec *Spec
asyncProducer sarama.AsyncProducer
syncProcuder sarama.SyncProducer

headerTopic string
headerKey string
done chan struct{}
}

// Err is the error of Kafka
Err struct {
Err string `json:"err"`
Code int `json:"code"`
}
)

Expand All @@ -80,11 +90,17 @@ func (k *Kafka) Spec() filters.Spec {
return k.spec
}

func (k *Kafka) setHeader(spec *Spec) {
func (k *Kafka) setHeader() {
if k.spec.Topic.Dynamic != nil {
k.header = http.CanonicalHeaderKey(k.spec.Topic.Dynamic.Header)
if k.header == "" {
panic("empty header")
k.headerTopic = http.CanonicalHeaderKey(k.spec.Topic.Dynamic.Header)
if k.headerTopic == "" {
panic("empty header topic")
}
}
if k.spec.Key.Dynamic != nil {
k.headerKey = http.CanonicalHeaderKey(k.spec.Key.Dynamic.Header)
if k.headerKey == "" {
panic("empty header key")
}
}
}
Expand All @@ -93,30 +109,39 @@ func (k *Kafka) setHeader(spec *Spec) {
func (k *Kafka) Init() {
spec := k.spec
k.done = make(chan struct{})
k.setHeader(k.spec)
k.setHeader()

config := sarama.NewConfig()
config.ClientID = spec.Name()
config.Version = sarama.V1_0_0_0
producer, err := sarama.NewAsyncProducer(k.spec.Backend, config)
if err != nil {
panic(fmt.Errorf("start sarama producer with address %v failed: %v", k.spec.Backend, err))
}
if spec.Sync {
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(k.spec.Backend, config)
if err != nil {
panic(fmt.Errorf("start sarama sync producer with address %v failed: %v", spec.Backend, err))
}
k.syncProcuder = producer
} else {
producer, err := sarama.NewAsyncProducer(k.spec.Backend, config)
if err != nil {
panic(fmt.Errorf("start sarama async producer with address %v failed: %v", k.spec.Backend, err))
}

k.producer = producer
go k.checkProduceError()
k.asyncProducer = producer
go k.checkProduceError()
}
}

func (k *Kafka) checkProduceError() {
for {
select {
case <-k.done:
err := k.producer.Close()
err := k.asyncProducer.Close()
if err != nil {
logger.Errorf("close kafka producer failed: %v", err)
}
return
case err, ok := <-k.producer.Errors():
case err, ok := <-k.asyncProducer.Errors():
if !ok {
return
}
Expand All @@ -141,20 +166,32 @@ func (k *Kafka) Status() interface{} {
}

func (k *Kafka) getTopic(req *httpprot.Request) string {
if k.header == "" {
if k.headerTopic == "" {
return k.spec.Topic.Default
}
topic := req.Std().Header.Get(k.header)
topic := req.Std().Header.Get(k.headerTopic)
if topic == "" {
return k.spec.Topic.Default
}
return topic
}

func (k *Kafka) getKey(req *httpprot.Request) string {
if k.headerKey == "" {
return k.spec.Key.Default
}
key := req.Std().Header.Get(k.headerKey)
if key == "" {
return k.spec.Key.Default
}
return key
}

// Handle handles the context.
func (k *Kafka) Handle(ctx *context.Context) (result string) {
req := ctx.GetInputRequest().(*httpprot.Request)
topic := k.getTopic(req)
key := k.getKey(req)

body, err := io.ReadAll(req.GetPayload())
if err != nil {
Expand All @@ -165,6 +202,45 @@ func (k *Kafka) Handle(ctx *context.Context) (result string) {
Topic: topic,
Value: sarama.ByteEncoder(body),
}
k.producer.Input() <- msg
if key != "" {
msg.Key = sarama.StringEncoder(key)
}

if k.spec.Sync {
_, _, err = k.syncProcuder.SendMessage(msg)
if err != nil {
logger.Errorf("send message to kafka failed: %v", err)
setErrResponse(ctx, err)
} else {
setSuccessResponse(ctx)
}
return ""
}

k.asyncProducer.Input() <- msg
return ""
}

func setSuccessResponse(ctx *context.Context) {
resp, _ := ctx.GetOutputResponse().(*httpprot.Response)
if resp == nil {
resp, _ = httpprot.NewResponse(nil)
}
resp.SetStatusCode(http.StatusOK)
ctx.SetOutputResponse(resp)
}

func setErrResponse(ctx *context.Context, err error) {
resp, _ := ctx.GetOutputResponse().(*httpprot.Response)
if resp == nil {
resp, _ = httpprot.NewResponse(nil)
}
resp.SetStatusCode(http.StatusInternalServerError)
errMsg := &Err{
Err: err.Error(),
Code: http.StatusInternalServerError,
}
data, _ := codectool.MarshalJSON(errMsg)
resp.SetPayload(data)
ctx.SetOutputResponse(resp)
}
Loading

0 comments on commit 15fcaa0

Please sign in to comment.