Skip to content

Commit

Permalink
Add AMQP TLS EXTERNAL auth support
Browse files Browse the repository at this point in the history
Also switch to the RabbitMQ team official Go client library for
AMQP 0.91.
  • Loading branch information
mihaitodor committed Oct 15, 2021
1 parent b7f1e1e commit 399127a
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 17 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ All notable changes to this project will be documented in this file.

## Unreleased

- amqp_0_9 components now support TLS EXTERNAL auth.

## 3.57.0 - 2021-10-14

### Added
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ require (
github.com/prometheus/client_golang v1.8.0
github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc
github.com/quipo/statsd v0.0.0-20180118161217-3d6a5565f314
github.com/rabbitmq/amqp091-go v1.1.1-0.20211014165712-60a96d570ece
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/robfig/cron/v3 v3.0.1
github.com/satori/go.uuid v1.2.0 // indirect
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/smira/go-statsd v1.3.1
github.com/spf13/cast v1.3.1
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.7.0
github.com/tilinna/z85 v1.0.0
github.com/uber/jaeger-client-go v2.25.0+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,8 @@ github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc h1:hK577yxE
github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc/go.mod h1:OQt6Zo5B3Zs+C49xul8kcHo+fZ1mCLPvd0LFxiZ2DHc=
github.com/quipo/statsd v0.0.0-20180118161217-3d6a5565f314 h1:86XpVGN4oVnVheHik6ioWg+1fOnWu1GgyNzV6cr2ifs=
github.com/quipo/statsd v0.0.0-20180118161217-3d6a5565f314/go.mod h1:1COUodqytMiv/GkAVUGhc0CA6e8xak5U4551TY7iEe0=
github.com/rabbitmq/amqp091-go v1.1.1-0.20211014165712-60a96d570ece h1:U1adrvQdnIH4tOW4hwb9KCNnG4asww7Q4faC81pu9KQ=
github.com/rabbitmq/amqp091-go v1.1.1-0.20211014165712-60a96d570ece/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
Expand Down Expand Up @@ -814,8 +816,6 @@ github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
21 changes: 17 additions & 4 deletions lib/input/reader/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package reader
import (
"crypto/tls"
"fmt"
"net/url"
"strconv"
"strings"
"sync"
Expand All @@ -13,7 +14,7 @@ import (
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/types"
btls "github.com/Jeffail/benthos/v3/lib/util/tls"
"github.com/streadway/amqp"
amqp "github.com/rabbitmq/amqp091-go"
)

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -112,10 +113,22 @@ func (a *AMQP) Connect() (err error) {
var amqpChan *amqp.Channel
var consumerChan <-chan amqp.Delivery

u, err := url.Parse(a.conf.URL)
if err != nil {
return fmt.Errorf("invalid amqp URL: %v", err)
}

if a.conf.TLS.Enabled {
conn, err = amqp.DialTLS(a.conf.URL, a.tlsConf)
if err != nil {
return fmt.Errorf("AMQP Connect: %s", err)
if u.User != nil {
conn, err = amqp.DialTLS(a.conf.URL, a.tlsConf)
if err != nil {
return fmt.Errorf("AMQP Connect: %s", err)
}
} else {
conn, err = amqp.DialTLS_ExternalAuth(a.conf.URL, a.tlsConf)
if err != nil {
return fmt.Errorf("AMQP Connect: %s", err)
}
}
} else {
conn, err = amqp.Dial(a.conf.URL)
Expand Down
21 changes: 17 additions & 4 deletions lib/input/reader/amqp_0_9.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
"net/url"
"sync"
"time"

Expand All @@ -13,7 +14,7 @@ import (
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/types"
btls "github.com/Jeffail/benthos/v3/lib/util/tls"
"github.com/streadway/amqp"
amqp "github.com/rabbitmq/amqp091-go"
)

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -116,10 +117,22 @@ func (a *AMQP09) ConnectWithContext(ctx context.Context) (err error) {
var amqpChan *amqp.Channel
var consumerChan <-chan amqp.Delivery

u, err := url.Parse(a.conf.URL)
if err != nil {
return fmt.Errorf("invalid amqp URL: %v", err)
}

if a.conf.TLS.Enabled {
conn, err = amqp.DialTLS(a.conf.URL, a.tlsConf)
if err != nil {
return fmt.Errorf("AMQP 0.9 Connect: %s", err)
if u.User != nil {
conn, err = amqp.DialTLS(a.conf.URL, a.tlsConf)
if err != nil {
return fmt.Errorf("AMQP 0.9 Connect: %s", err)
}
} else {
conn, err = amqp.DialTLS_ExternalAuth(a.conf.URL, a.tlsConf)
if err != nil {
return fmt.Errorf("AMQP 0.9 Connect: %s", err)
}
}
} else {
conn, err = amqp.Dial(a.conf.URL)
Expand Down
2 changes: 1 addition & 1 deletion lib/input/reader/amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/ory/dockertest/v3"
"github.com/streadway/amqp"
amqp "github.com/rabbitmq/amqp091-go"
)

func TestAMQPIntegration(t *testing.T) {
Expand Down
21 changes: 17 additions & 4 deletions lib/output/writer/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
"net/url"
"strconv"
"strings"
"sync"
Expand All @@ -16,7 +17,7 @@ import (
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/types"
btls "github.com/Jeffail/benthos/v3/lib/util/tls"
"github.com/streadway/amqp"
amqp "github.com/rabbitmq/amqp091-go"
)

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -159,10 +160,22 @@ func (a *AMQP) Connect() error {
var conn *amqp.Connection
var err error

u, err := url.Parse(a.conf.URL)
if err != nil {
return fmt.Errorf("invalid amqp URL: %v", err)
}

if a.conf.TLS.Enabled {
conn, err = amqp.DialTLS(a.conf.URL, a.tlsConf)
if err != nil {
return fmt.Errorf("amqp failed to connect: %v", err)
if u.User != nil {
conn, err = amqp.DialTLS(a.conf.URL, a.tlsConf)
if err != nil {
return fmt.Errorf("amqp failed to connect: %v", err)
}
} else {
conn, err = amqp.DialTLS_ExternalAuth(a.conf.URL, a.tlsConf)
if err != nil {
return fmt.Errorf("amqp failed to connect: %v", err)
}
}
} else {
conn, err = amqp.Dial(a.conf.URL)
Expand Down
2 changes: 1 addition & 1 deletion lib/test/integration/amqp_0_9_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/ory/dockertest/v3"
"github.com/streadway/amqp"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down

0 comments on commit 399127a

Please sign in to comment.