Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka filter support sync producer and message key #1239

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/07.Reference/7.02.Filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
- [validator.OAuth2TokenIntrospect](#validatoroauth2tokenintrospect)
- [validator.OAuth2JWT](#validatoroauth2jwt)
- [kafka.Topic](#kafkatopic)
- [kafka.Key](#kafkakey)
- [headertojson.HeaderMap](#headertojsonheadermap)
- [headerlookup.HeaderSetterSpec](#headerlookupheadersetterspec)
- [requestadaptor.SignerSpec](#requestadaptorsignerspec)
Expand Down Expand Up @@ -1019,18 +1020,32 @@ Below is an example configuration.
kind: Kafka
name: kafka-example
backend: [":9093"]
# sync determines the usage of AsyncProducer or SyncProducer for the Kafka filter.
# default is false. If set to true, encountering a message error will cause the Kafka filter
# to respond with a status code of 503 and a body containing "{err: "error message"}".
sync: false
topic:
# default topic for Kafka message
default: kafka-topic
# dynamic topic for Kafka message, get from http header
dynamic:
header: X-Kafka-Topic
key:
# default key for Kafka message
default: kafka-key
# dynamic key for Kafka message, get from http header
dynamic:
header: X-Kafka-Key
```

### Configuration

| Name | Type | Description | Required |
| ------------ | -------- | -------------------------------- | -------- |
| backend | []string | Addresses of Kafka backend | Yes |
| sync | bool | Usage of AsyncProducer or SyncProducer, default is false | No |
| topic | [Kafka.Topic](#kafkatopic) | the topic is Spec used to get Kafka topic used to send message to the backend | Yes |
| key | [Kafka.Key](#kafkakey) | the key is Spec used to get Kafka message key | No |


### Results
Expand Down Expand Up @@ -2029,6 +2044,13 @@ The relationship between `methods` and `url` is `AND`.
| default | string | Default topic for Kafka backend | Yes |
| dynamic.header | string | The HTTP header that contains Kafka topic | Yes |

### kafka.Key

| Name | Type | Description | Required |
| --------- | ------ | ------------------------------------------------------------------------ | -------- |
| default | string | Default key for Kafka message | Yes |
| dynamic.header | string | The HTTP header that contains Kafka key | No |

### headertojson.HeaderMap

| Name | Type | Description | Required |
Expand Down
116 changes: 96 additions & 20 deletions pkg/filters/kafkabackend/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/megaease/easegress/v2/pkg/filters"
"github.com/megaease/easegress/v2/pkg/logger"
"github.com/megaease/easegress/v2/pkg/protocols/httpprot"
"github.com/megaease/easegress/v2/pkg/util/codectool"
)

const (
Expand Down Expand Up @@ -56,10 +57,19 @@ func init() {
type (
// Kafka is a kafka proxy for HTTP requests.
Kafka struct {
spec *Spec
producer sarama.AsyncProducer
done chan struct{}
header string
spec *Spec
asyncProducer sarama.AsyncProducer
syncProcuder sarama.SyncProducer

headerTopic string
headerKey string
done chan struct{}
}

// Err is the error of Kafka
Err struct {
Err string `json:"err"`
Code int `json:"code"`
}
)

Expand All @@ -80,11 +90,17 @@ func (k *Kafka) Spec() filters.Spec {
return k.spec
}

func (k *Kafka) setHeader(spec *Spec) {
func (k *Kafka) setHeader() {
if k.spec.Topic.Dynamic != nil {
k.header = http.CanonicalHeaderKey(k.spec.Topic.Dynamic.Header)
if k.header == "" {
panic("empty header")
k.headerTopic = http.CanonicalHeaderKey(k.spec.Topic.Dynamic.Header)
if k.headerTopic == "" {
panic("empty header topic")
}
}
if k.spec.Key.Dynamic != nil {
k.headerKey = http.CanonicalHeaderKey(k.spec.Key.Dynamic.Header)
if k.headerKey == "" {
panic("empty header key")
}
}
}
Expand All @@ -93,30 +109,39 @@ func (k *Kafka) setHeader(spec *Spec) {
func (k *Kafka) Init() {
spec := k.spec
k.done = make(chan struct{})
k.setHeader(k.spec)
k.setHeader()

config := sarama.NewConfig()
config.ClientID = spec.Name()
config.Version = sarama.V1_0_0_0
producer, err := sarama.NewAsyncProducer(k.spec.Backend, config)
if err != nil {
panic(fmt.Errorf("start sarama producer with address %v failed: %v", k.spec.Backend, err))
}
if spec.Sync {
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(k.spec.Backend, config)
if err != nil {
panic(fmt.Errorf("start sarama sync producer with address %v failed: %v", spec.Backend, err))
}
k.syncProcuder = producer
} else {
producer, err := sarama.NewAsyncProducer(k.spec.Backend, config)
if err != nil {
panic(fmt.Errorf("start sarama async producer with address %v failed: %v", k.spec.Backend, err))
}

k.producer = producer
go k.checkProduceError()
k.asyncProducer = producer
go k.checkProduceError()
}
}

func (k *Kafka) checkProduceError() {
for {
select {
case <-k.done:
err := k.producer.Close()
err := k.asyncProducer.Close()
if err != nil {
logger.Errorf("close kafka producer failed: %v", err)
}
return
case err, ok := <-k.producer.Errors():
case err, ok := <-k.asyncProducer.Errors():
if !ok {
return
}
Expand All @@ -141,20 +166,32 @@ func (k *Kafka) Status() interface{} {
}

func (k *Kafka) getTopic(req *httpprot.Request) string {
if k.header == "" {
if k.headerTopic == "" {
return k.spec.Topic.Default
}
topic := req.Std().Header.Get(k.header)
topic := req.Std().Header.Get(k.headerTopic)
if topic == "" {
return k.spec.Topic.Default
}
return topic
}

func (k *Kafka) getKey(req *httpprot.Request) string {
if k.headerKey == "" {
return k.spec.Key.Default
}
key := req.Std().Header.Get(k.headerKey)
if key == "" {
return k.spec.Key.Default
}
return key
}

// Handle handles the context.
func (k *Kafka) Handle(ctx *context.Context) (result string) {
req := ctx.GetInputRequest().(*httpprot.Request)
topic := k.getTopic(req)
key := k.getKey(req)

body, err := io.ReadAll(req.GetPayload())
if err != nil {
Expand All @@ -165,6 +202,45 @@ func (k *Kafka) Handle(ctx *context.Context) (result string) {
Topic: topic,
Value: sarama.ByteEncoder(body),
}
k.producer.Input() <- msg
if key != "" {
msg.Key = sarama.StringEncoder(key)
}

if k.spec.Sync {
_, _, err = k.syncProcuder.SendMessage(msg)
if err != nil {
logger.Errorf("send message to kafka failed: %v", err)
setErrResponse(ctx, err)
} else {
setSuccessResponse(ctx)
}
return ""
}

k.asyncProducer.Input() <- msg
return ""
}

func setSuccessResponse(ctx *context.Context) {
resp, _ := ctx.GetOutputResponse().(*httpprot.Response)
if resp == nil {
resp, _ = httpprot.NewResponse(nil)
}
resp.SetStatusCode(http.StatusOK)
ctx.SetOutputResponse(resp)
}

func setErrResponse(ctx *context.Context, err error) {
resp, _ := ctx.GetOutputResponse().(*httpprot.Response)
if resp == nil {
resp, _ = httpprot.NewResponse(nil)
}
resp.SetStatusCode(http.StatusInternalServerError)
errMsg := &Err{
Err: err.Error(),
Code: http.StatusInternalServerError,
}
data, _ := codectool.MarshalJSON(errMsg)
resp.SetPayload(data)
ctx.SetOutputResponse(resp)
}