diff --git a/lib/input/amqp_0_9.go b/lib/input/amqp_0_9.go index bc81cdcb3f..9ce3ff5132 100644 --- a/lib/input/amqp_0_9.go +++ b/lib/input/amqp_0_9.go @@ -8,6 +8,7 @@ import ( "github.com/Jeffail/benthos/v3/lib/metrics" "github.com/Jeffail/benthos/v3/lib/types" "github.com/Jeffail/benthos/v3/lib/util/tls" + "github.com/Jeffail/gabs/v2" ) //------------------------------------------------------------------------------ @@ -53,11 +54,16 @@ You can access these metadata fields using CategoryServices, }, FieldSpecs: docs.FieldSpecs{ - docs.FieldCommon("url", - "A URL to connect to.", - "amqp://localhost:5672/", - "amqps://guest:guest@localhost:5672/", - ), + docs.FieldCommon("urls", + "A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs.", + []string{"amqp://127.0.0.1:5672/"}, + []string{"amqp://127.0.0.1:5672/,amqp://127.0.0.2:5672/"}, + []string{"amqp://127.0.0.1:5672/", "amqp://127.0.0.2:5672/"}, + ).Array(), + docs.FieldDeprecated("url").OmitWhen(func(field, parent interface{}) (string, bool) { + return "field url is deprecated and should be omitted when urls is used", + len(gabs.Wrap(parent).S("urls").Children()) > 0 + }), docs.FieldCommon("queue", "An AMQP queue to consume from."), docs.FieldAdvanced("queue_declare", ` Allows you to passively declare the target queue. If the queue already exists diff --git a/lib/input/reader/amqp_0_9.go b/lib/input/reader/amqp_0_9.go index a4c507270b..1e98d5b545 100644 --- a/lib/input/reader/amqp_0_9.go +++ b/lib/input/reader/amqp_0_9.go @@ -3,8 +3,10 @@ package reader import ( "context" "crypto/tls" + "errors" "fmt" "net/url" + "strings" "sync" "time" @@ -17,6 +19,8 @@ import ( amqp "github.com/rabbitmq/amqp091-go" ) +var errConnect = errors.New("AMQP 0.9 Connect") + //------------------------------------------------------------------------------ // AMQP09QueueDeclareConfig contains fields indicating whether the target AMQP09 @@ -37,6 +41,7 @@ type AMQP09BindingConfig struct { // AMQP09Config contains configuration for the AMQP09 input type. type AMQP09Config struct { URL string `json:"url" yaml:"url"` + URLs []string `json:"urls" yaml:"urls"` Queue string `json:"queue" yaml:"queue"` QueueDeclare AMQP09QueueDeclareConfig `json:"queue_declare" yaml:"queue_declare"` BindingsDeclare []AMQP09BindingConfig `json:"bindings_declare" yaml:"bindings_declare"` @@ -53,7 +58,7 @@ type AMQP09Config struct { // NewAMQP09Config creates a new AMQP09Config with default values. func NewAMQP09Config() AMQP09Config { return AMQP09Config{ - URL: "amqp://guest:guest@localhost:5672/", + URLs: []string{"amqp://guest:guest@localhost:5672/"}, Queue: "benthos-queue", QueueDeclare: AMQP09QueueDeclareConfig{ Enabled: false, @@ -93,6 +98,19 @@ func NewAMQP09(conf AMQP09Config, log log.Modular, stats metrics.Type) (*AMQP09, stats: stats, log: log, } + + if conf.URL != "" { + a.conf.URLs = []string{} + } else { + for _, u := range conf.URLs { + for _, splitURL := range strings.Split(u, ",") { + if trimmed := strings.TrimSpace(splitURL); len(trimmed) > 0 { + a.conf.URLs = append(a.conf.URLs, trimmed) + } + } + } + } + if conf.TLS.Enabled { var err error if a.tlsConf, err = conf.TLS.Get(); err != nil { @@ -117,27 +135,13 @@ func (a *AMQP09) ConnectWithContext(ctx context.Context) (err error) { var amqpChan *amqp.Channel var consumerChan <-chan amqp.Delivery - u, err := url.Parse(a.conf.URL) - if err != nil { - return fmt.Errorf("invalid amqp URL: %v", err) - } - - if a.conf.TLS.Enabled { - if u.User != nil { - conn, err = amqp.DialTLS(a.conf.URL, a.tlsConf) - if err != nil { - return fmt.Errorf("AMQP 0.9 Connect: %s", err) - } - } else { - conn, err = amqp.DialTLS_ExternalAuth(a.conf.URL, a.tlsConf) - if err != nil { - return fmt.Errorf("AMQP 0.9 Connect: %s", err) - } + if len(a.conf.URLs) >= 1 { + if conn, err = a.reDial(a.conf.URLs); err != nil { + return err } - } else { - conn, err = amqp.Dial(a.conf.URL) - if err != nil { - return fmt.Errorf("AMQP 0.9 Connect: %s", err) + } else { // for backwards compatibility with `url` + if conn, err = a.dial(a.conf.URL); err != nil { + return err } } @@ -302,4 +306,48 @@ func (a *AMQP09) WaitForClose(timeout time.Duration) error { return nil } +// reDial connection to amqp with one or more fallback URLs +func (a *AMQP09) reDial(urls []string) (conn *amqp.Connection, err error) { + for _, u := range urls { + conn, err = a.dial(u) + if err != nil { + if errors.Is(err, errConnect) { + continue + } + break + } + return conn, nil + } + return nil, err +} + +// dial attempts to connect to amqp URL +func (a *AMQP09) dial(amqpURL string) (conn *amqp.Connection, err error) { + u, err := url.Parse(amqpURL) + if err != nil { + return nil, fmt.Errorf("invalid amqp URL: %v", err) + } + + if a.conf.TLS.Enabled { + if u.User != nil { + conn, err = amqp.DialTLS(amqpURL, a.tlsConf) + if err != nil { + return nil, fmt.Errorf("%w: %s", errConnect, err) + } + } else { + conn, err = amqp.DialTLS_ExternalAuth(amqpURL, a.tlsConf) + if err != nil { + return nil, fmt.Errorf("%w: %s", errConnect, err) + } + } + } else { + conn, err = amqp.Dial(amqpURL) + if err != nil { + return nil, fmt.Errorf("%w: %s", errConnect, err) + } + } + + return conn, nil +} + //------------------------------------------------------------------------------ diff --git a/lib/test/integration/amqp_0_9_test.go b/lib/test/integration/amqp_0_9_test.go index da5ef5248a..75e22b9169 100644 --- a/lib/test/integration/amqp_0_9_test.go +++ b/lib/test/integration/amqp_0_9_test.go @@ -48,6 +48,34 @@ output: metadata: exclude_prefixes: [ $OUTPUT_META_EXCLUDE_PREFIX ] +input: + amqp_0_9: + urls: + - amqp://guest:guest@localhost:5672/ + - amqp://guest:guest@localhost:$PORT/ # fallback URL + auto_ack: $VAR1 + queue: queue-$ID + queue_declare: + durable: true + enabled: true + bindings_declare: + - exchange: exchange-$ID + key: benthos-key +` + backwardsCompatibilityTemplate := ` +output: + amqp_0_9: + url: amqp://guest:guest@localhost:$PORT/ + max_in_flight: $MAX_IN_FLIGHT + exchange: exchange-$ID + key: benthos-key + exchange_declare: + enabled: true + type: direct + durable: true + metadata: + exclude_prefixes: [ $OUTPUT_META_EXCLUDE_PREFIX ] + input: amqp_0_9: url: amqp://guest:guest@localhost:$PORT/ @@ -77,6 +105,13 @@ input: testOptPort(resource.GetPort("5672/tcp")), testOptVarOne("false"), ) + suite.Run( + t, backwardsCompatibilityTemplate, + testOptSleepAfterInput(500*time.Millisecond), + testOptSleepAfterOutput(500*time.Millisecond), + testOptPort(resource.GetPort("5672/tcp")), + testOptVarOne("false"), + ) t.Run("with max in flight", func(t *testing.T) { t.Parallel() suite.Run( diff --git a/website/docs/components/inputs/amqp_0_9.md b/website/docs/components/inputs/amqp_0_9.md index ccc07f734d..2519525046 100644 --- a/website/docs/components/inputs/amqp_0_9.md +++ b/website/docs/components/inputs/amqp_0_9.md @@ -32,7 +32,8 @@ message brokers, including RabbitMQ. input: label: "" amqp_0_9: - url: amqp://guest:guest@localhost:5672/ + urls: + - amqp://guest:guest@localhost:5672/ queue: benthos-queue consumer_tag: benthos-consumer prefetch_count: 10 @@ -46,7 +47,8 @@ input: input: label: "" amqp_0_9: - url: amqp://guest:guest@localhost:5672/ + urls: + - amqp://guest:guest@localhost:5672/ queue: benthos-queue queue_declare: enabled: false @@ -101,20 +103,26 @@ You can access these metadata fields using ## Fields -### `url` +### `urls` -A URL to connect to. +A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs. -Type: `string` -Default: `"amqp://guest:guest@localhost:5672/"` +Type: `array` +Default: `["amqp://guest:guest@localhost:5672/"]` ```yaml # Examples -url: amqp://localhost:5672/ +urls: + - amqp://127.0.0.1:5672/ + +urls: + - amqp://127.0.0.1:5672/,amqp://127.0.0.2:5672/ -url: amqps://guest:guest@localhost:5672/ +urls: + - amqp://127.0.0.1:5672/ + - amqp://127.0.0.2:5672/ ``` ### `queue`