Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-95] Smart Listener Selection with Multiple Bind Addresses #12040

Closed
EronWright opened this issue Sep 14, 2021 · 6 comments
Closed

[PIP-95] Smart Listener Selection with Multiple Bind Addresses #12040

EronWright opened this issue Sep 14, 2021 · 6 comments
Labels
Milestone

Comments

@EronWright
Copy link
Contributor

EronWright commented Sep 14, 2021

  • Status: Accepted
  • Author: Eron Wright
  • Pull Request:
  • Mailing List discussion: (discussion) (vote)
  • Release:

Motivation

The Pulsar broker has the concept of advertised listeners representing broker endpoints that are discoverable by, and accessible to, Pulsar clients. In addition, one listener may be designated as the internal listener, to be used for broker-to-broker communication. Listeners may be understood as named routes to the broker.

Each broker stores its listener data into ZooKeeper to support topic lookup requests. A lookup request may contain a listener name parameter to obtain a specific endpoint. For example, a lookup request for the listener named external may return an endpoint of pulsar+ssl:https://broker-1.cluster.example.dev:6651 (an external address). If a listener name is not specified, the response contains the endpoint for the internal listener.

This PIP seeks to improve the experience when a listener name is not specified, to select an appropriate endpoint automatically without requiring explicit configuration on the client. The proposed approach is to use information that the client naturally has, that is the Pulsar service endpoint. The broker shall select a listener based on which service endpoint was used to make the lookup request. For example, a client that makes a lookup request via an ingress gateway would use that same gateway for the subsequent broker connection.

A secondary goal is to improve the interoperability of KOP by implementing smart endpoint selection for ordinary Kafka clients. This shall be based on the semantics of KIP-103 and is consistent with this proposal. Similarly for other protocol handlers.

Public Interfaces

Configuration: Bind Addresses

It is proposed that a new configuration property be defined to specify a map of listener name to server bind address. The broker shall bind a server socket for each address, and use the associated listener name (by default) for any lookup request that comes through that address. The property shall be named bindAddresses and have the form:

<listener_name>:<scheme>:https://<host>:<port>,[...]

Where listener_name is a configured listener via the advertisedListeners configuration property.

Where scheme selects a protocol handler, e.g. pulsar or pulsar+ssl for the Pulsar protocol.

Where host has a local interface address to bind to, such as 0.0.0.0 to bind to all interfaces.

Where port is a unique server port number, such as 6652.

See the compatibility section for further information.

Admin API: Lookup Request: Listener Name Header

It is proposed that a new header-based parameter be added to the Topic Lookup operation on the Admin API (/v2/topic/{topic-domain}/{tenant}/{namespace}/{topic}). The header name shall be X-Pulsar-ListenerName and the value shall correspond to the name of a configured listener.

The operation shall return 400 Bad Request if the listener name doesn't match the configuration.

Proposed Changes

The main technical challenge is to know which advertised listener a lookup request came from, and the proposed solution is two-fold:

  1. for the Pulsar binary protocol, use a unique bind address for each listener.
  2. for the Pulsar HTTP-based admin protocol, use an HTTP header to specify a listener name.

Multiple Bind Addresses

The broker shall be configurable to open a server socket on numerous bind addresses, and to associate each socket with a listener name. When a lookup request arrives on a particular socket, the broker shall use the associated listener by default. Note that an explicit listener name in the request parameters would take precedence.

For example, port 6652 may be associated with the external listener to establish the default response for a lookup request received on that port.

The scheme information shall be used to determine whether to terminate TLS on the server socket. Note that all TLS sockets would use the same TLS configuration (a per-address configuration is out-of-scope).

Listener Name Header

The broker shall look for the X-Pulsar-ListenerName header as an additional parameter to the Topic Lookup operation. The value shall be used when a listener name is not supplied via the similar listenerName query parameter.

The rationale for supporting a header-based alternative to the query parameter is to allow an HTTP-aware gateway to inject the header value as a hint for requests that comes thru the gateway. Istio, for example, is able to inject headers but is not able to set query parameters (docs).

Relax Validation of Listener Addresses

The validation logic for advertised listeners shall be relaxed to allow certain configurations that are currently rejected. The root issue is that the broker erroneously assumes that TLS is always terminated at the broker, as opposed to at the gateway. This precludes a configuration where a gateway terminates TLS and then forwards the request as plaintext.

TLS-Only Configurations

The validation and lookup logic shall be updated to allow for pure TLS configurations, that is where an advertised listener consists only of a TLS endpoint (no plaintext endpoint). Currently, the broker insists that all listeners have a plaintext endpoint. This makes little sense, e.g. for an Internet-facing gateway endpoint where TLS is a hard requirement.

The behavior of the Pulsar client is to use the same scheme for the service endpoint and for the broker endpoint. For example, if a client uses pulsar+ssl to perform a lookup request, it then uses pulsar+ssl to connect to the broker to produce messages. No change is anticipated.

Compatibility, Deprecation, and Migration Plan

For compatibility, the primary bind addresses shall not be associated with any particular listener. The behavior of lookup requests to those addresses shall be unchanged. The primary bind addresses are configured using bindAddress, brokerServicePort, and brokerServicePortTls as normal.

It is assumed that any bind addresses that are configured with bindAddresses are in addition to the primary bind addresses and do not conflict. If one wishes to purely use bindAddresses, one must unset the brokerServicePort property (note that it has a default value of 6650).

Test Plan

Unit Tests

New unit tests will test the parsing and validation logic for the bindAddresses configuration property.

Adhoc Tests

Pulsar will be tested on an adhoc basis in various combinations, primarily using Pulsar standalone mode. Specific cases:

  • migration scenarios involving bindAddress, brokerServicePort, brokerServicePortTls, webServicePort, and webServicePortTls.
  • TLS and/or non-TLS channels
  • end-to-end testing with multiple listeners

Rejected Alternatives

None.

Diagrams

image

image

image

@wangjialing218
Copy link
Contributor

A secondary goal is to improve the interoperability of KOP by implementing smart endpoint selection for ordinary Kafka clients. This shall be based on the semantics of KIP-103 and is consistent with this proposal.

If we only add multiple bind addresses of pulsar binary port in broker, I think we could not make smart endpoint selection for ordinary Kafka clients, we need to add multiple bind addresses of Kafka port in KoP.
I already have a desgin to add multiple bind addresses in KoP and working on it. streamnative/kop#669

@EronWright
Copy link
Contributor Author

EronWright commented Sep 16, 2021

@wangjialing218 thanks for sharing, I believe our efforts are complimentary. It would be reasonable to use the new bindAddresses property to define KOP ports; each protocol handler would filter on the scheme component. For example, the Pulsar broker service would act on the bindings for pulsar and pulsar+ssl.

Please take a look at the linked PR. My expectation is that the KOP handler obtain its server ports using:

List<BindAddress> bindAddresses = BindAddressValidator.validateBindAddresses(serviceConfig,
                Arrays.asList("kafka", "kafka+ssl"));

@eolivelli
Copy link
Contributor

@EronWright can you please start a discussion on [email protected] mailing list ?
this is required in order to accept this PIP

@eolivelli
Copy link
Contributor

btw the PIP looks good to me.
I have no comments to make.

@merlimat merlimat added this to the 2.9.0 milestone Sep 24, 2021
@wangjialing218
Copy link
Contributor

Please take a look at the linked PR. My expectation is that the KOP handler obtain its server ports using:

List<BindAddress> bindAddresses = BindAddressValidator.validateBindAddresses(serviceConfig,
                Arrays.asList("kafka", "kafka+ssl"));

@EronWright
Let me know if I understand correctly, The scope of this PIP is only for Pulsar bind address.
KOP bind kafka address through KafkaChannelInitializer when ProtocolHandler.newChannelInitializers() is called.

@eolivelli eolivelli modified the milestones: 2.9.0, 2.10.0 Oct 6, 2021
codelipenghui pushed a commit that referenced this issue Oct 14, 2021
…ocol (#12056)

Master Issue: #12040

### Motivation

Add a new configuration setting `bindAddresses` to open additional server ports on the broker.  Note that these are in addition to `brokerServicePort` / `brokerServicePortTls` for compatibility reasons.

Each new-style bind address is associated with an advertised listener, to be used as the default listener for topic lookup requests.   See #12040 for details.  A given listener may be associated with numerous bindings.

The scheme indicates the protocol handler and whether to use TLS on the server channel.  This PR is focused on the Pulsar protocol handler, but it is anticipated that other protocols may be supported in future.

For example:
```
bindAddresses=external:pulsar:https://0.0.0.0:6652,external:pulsar+ssl:https://0.0.0.0:6653
bindAddress=0.0.0.0
brokerServicePort=6650
brokerServicePortTls=6651
advertisedListeners=cluster:pulsar:https://broker-1.local:6650,cluster:pulsar+ssl:https://broker-1.local:6651,external:pulsar:https://broker-1.example.dev:6652,external:pulsar+ssl:https://broker-1.example.dev:6653
internalListenerName=cluster
```

The above would produce three server sockets, with `6650` having no associated listener name (thus retaining existing lookup behavior of returning the internal listener), and with `6652` and `6653` having an association with listener name `external`.  Given a lookup request on `6652` or `6653`, the `external` listener address would be returned.

### Modifications

- added configuration property `bindAddresses`
- implementing parsing and validation logic
- factored some utility code for formatting broker/web addresses
codelipenghui pushed a commit that referenced this issue Oct 14, 2021
Master Issue: #12040

### Motivation

This PR introduces an optional HTTP header `X-Pulsar-ListenerName` to the `TopicLookup` operation of the Pulsar REST API.  The header supplies the listener name to use when the broker has multiple advertised listeners.  Today the `TopicLookup` operation accepts a query parameter `listenerName` for that same purpose.

The motivation for adding a header-based alternative is to improve support smart listener selection via HTTP gateways or proxies that are capable of rewriting headers (see: [Istio VirtualService](https://istio.io/latest/docs/reference/config/networking/virtual-service/#Headers)).

See #12040 for more background.

### Modifications

- Modify `TopicLookup` operation (V1 + V2) to use a header (query string takes precedence).
@EronWright
Copy link
Contributor Author

Update: the core features were implemented into Pulsar 2.9, and KIP-103 compatibility was achieved in streamnative/kop#574.

Note that support for multiple web addresses was not implemented via this PIP and is left to future enhancement.

@merlimat merlimat modified the milestones: 2.10.0, 2.9.0 Oct 20, 2021
BewareMyPower added a commit to streamnative/kop that referenced this issue Oct 29, 2021
…teners (#864)

Fixes #818

### Motivation

#742 adds the multiple listeners support for KoP 2.8.x. However, there're some flaws. 

First, the KoP advertised listeners must be added to `advertisedListeners` config. This config should be treated as the advertised listeners for broker, not protocol handlers. What `KafkaProtocolHandler#getProtocolDataToAdvertise` returns is not used though it has been written to ZK. It also makes `kafkaAdvertisedListeners` meaningless and it was marked as deprecated.

### Modification

Benefit by [PIP 95](apache/pulsar#12040), Pulsar client doesn't need to configure listener name for topic lookup. So this PR simplifies the implementation of `LookupClient` by using a shared `PulsarClient` without configuring listener name.

Then this PR uses the `MetadataStoreCacheLoader` introduced from #820 to get the `kafkaAdvertisedListeners` from ZK. Since it has already been cached, this PR removes the `KOP_ADDRESS_CACHE`.

To verify the feature, this PR adds a test `testMetadataRequestForMultiListeners`. The KoP config is like

```properties
kafkaListeners=PLAINTEXT:https://0.0.0.0:<port1>,GW:https://0.0.0.0:<port2>
kafkaProtocolMap=PLAINTEXT:PLAINTEXT,GW:PLAINTEXT
kafkaAdvertisedListeners=PLAINTEXT:https://192.168.0.1:<port3>,GW:https://192.168.0.1:<port4>
```

And verify that each `KafkaRequestHandler` can handle the METADATA requests well.
BewareMyPower added a commit to streamnative/kop that referenced this issue Oct 29, 2021
…teners (#864)

Fixes #818

### Motivation

#742 adds the multiple listeners support for KoP 2.8.x. However, there're some flaws. 

First, the KoP advertised listeners must be added to `advertisedListeners` config. This config should be treated as the advertised listeners for broker, not protocol handlers. What `KafkaProtocolHandler#getProtocolDataToAdvertise` returns is not used though it has been written to ZK. It also makes `kafkaAdvertisedListeners` meaningless and it was marked as deprecated.

### Modification

Benefit by [PIP 95](apache/pulsar#12040), Pulsar client doesn't need to configure listener name for topic lookup. So this PR simplifies the implementation of `LookupClient` by using a shared `PulsarClient` without configuring listener name.

Then this PR uses the `MetadataStoreCacheLoader` introduced from #820 to get the `kafkaAdvertisedListeners` from ZK. Since it has already been cached, this PR removes the `KOP_ADDRESS_CACHE`.

To verify the feature, this PR adds a test `testMetadataRequestForMultiListeners`. The KoP config is like

```properties
kafkaListeners=PLAINTEXT:https://0.0.0.0:<port1>,GW:https://0.0.0.0:<port2>
kafkaProtocolMap=PLAINTEXT:PLAINTEXT,GW:PLAINTEXT
kafkaAdvertisedListeners=PLAINTEXT:https://192.168.0.1:<port3>,GW:https://192.168.0.1:<port4>
```

And verify that each `KafkaRequestHandler` can handle the METADATA requests well.
gaozhangmin pushed a commit to gaozhangmin/kop that referenced this issue Nov 29, 2021
…teners (streamnative#864)

Fixes streamnative#818

### Motivation

streamnative#742 adds the multiple listeners support for KoP 2.8.x. However, there're some flaws.

First, the KoP advertised listeners must be added to `advertisedListeners` config. This config should be treated as the advertised listeners for broker, not protocol handlers. What `KafkaProtocolHandler#getProtocolDataToAdvertise` returns is not used though it has been written to ZK. It also makes `kafkaAdvertisedListeners` meaningless and it was marked as deprecated.

### Modification

Benefit by [PIP 95](apache/pulsar#12040), Pulsar client doesn't need to configure listener name for topic lookup. So this PR simplifies the implementation of `LookupClient` by using a shared `PulsarClient` without configuring listener name.

Then this PR uses the `MetadataStoreCacheLoader` introduced from streamnative#820 to get the `kafkaAdvertisedListeners` from ZK. Since it has already been cached, this PR removes the `KOP_ADDRESS_CACHE`.

To verify the feature, this PR adds a test `testMetadataRequestForMultiListeners`. The KoP config is like

```properties
kafkaListeners=PLAINTEXT:https://0.0.0.0:<port1>,GW:https://0.0.0.0:<port2>
kafkaProtocolMap=PLAINTEXT:PLAINTEXT,GW:PLAINTEXT
kafkaAdvertisedListeners=PLAINTEXT:https://192.168.0.1:<port3>,GW:https://192.168.0.1:<port4>
```

And verify that each `KafkaRequestHandler` can handle the METADATA requests well.

(cherry picked from commit 50699b3)
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this issue Mar 18, 2022
…ocol (apache#12056)

Master Issue: apache#12040

### Motivation

Add a new configuration setting `bindAddresses` to open additional server ports on the broker.  Note that these are in addition to `brokerServicePort` / `brokerServicePortTls` for compatibility reasons.

Each new-style bind address is associated with an advertised listener, to be used as the default listener for topic lookup requests.   See apache#12040 for details.  A given listener may be associated with numerous bindings.

The scheme indicates the protocol handler and whether to use TLS on the server channel.  This PR is focused on the Pulsar protocol handler, but it is anticipated that other protocols may be supported in future.

For example:
```
bindAddresses=external:pulsar:https://0.0.0.0:6652,external:pulsar+ssl:https://0.0.0.0:6653
bindAddress=0.0.0.0
brokerServicePort=6650
brokerServicePortTls=6651
advertisedListeners=cluster:pulsar:https://broker-1.local:6650,cluster:pulsar+ssl:https://broker-1.local:6651,external:pulsar:https://broker-1.example.dev:6652,external:pulsar+ssl:https://broker-1.example.dev:6653
internalListenerName=cluster
```

The above would produce three server sockets, with `6650` having no associated listener name (thus retaining existing lookup behavior of returning the internal listener), and with `6652` and `6653` having an association with listener name `external`.  Given a lookup request on `6652` or `6653`, the `external` listener address would be returned.

### Modifications

- added configuration property `bindAddresses`
- implementing parsing and validation logic
- factored some utility code for formatting broker/web addresses
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this issue Mar 18, 2022
…12072)

Master Issue: apache#12040

### Motivation

This PR introduces an optional HTTP header `X-Pulsar-ListenerName` to the `TopicLookup` operation of the Pulsar REST API.  The header supplies the listener name to use when the broker has multiple advertised listeners.  Today the `TopicLookup` operation accepts a query parameter `listenerName` for that same purpose.

The motivation for adding a header-based alternative is to improve support smart listener selection via HTTP gateways or proxies that are capable of rewriting headers (see: [Istio VirtualService](https://istio.io/latest/docs/reference/config/networking/virtual-service/#Headers)).

See apache#12040 for more background.

### Modifications

- Modify `TopicLookup` operation (V1 + V2) to use a header (query string takes precedence).
@lhotari lhotari changed the title [PIP 95] Smart Listener Selection with Multiple Bind Addresses [PIP-95] Smart Listener Selection with Multiple Bind Addresses Feb 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants