-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][feature] New proxy to use the multi bundles feature (part1) #716
base: master
Are you sure you want to change the base?
Conversation
Add a new proxy to use multi bundles feature. - support exchange declare - support queue declare - support queue bind - support queue unbind - support basic publish - support basic consume
bindingsJson = JSON_MAPPER.writeValueAsString(this.bindings); | ||
} catch (JsonProcessingException e) { | ||
log.error("Failed to bind queue {} to exchange {}", queue, exchangeName, e); | ||
return CompletableFuture.failedFuture(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This failedFuture method was introduced in jdk9, we should support jdk8.
bindingsJson = JSON_MAPPER.writeValueAsString(this.bindings); | ||
} catch (JsonProcessingException e) { | ||
log.error("Failed to unbind queue {} to exchange {}", queue, exchangeName, e); | ||
return CompletableFuture.failedFuture(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
support jdk8.
try { | ||
client = pulsar.getClient(); | ||
} catch (PulsarServerException e) { | ||
return CompletableFuture.failedFuture(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
support jdk8.
data = new byte[message.getDataBuffer().readableBytes()]; | ||
message.getDataBuffer().readBytes(data); | ||
for (Destination des : destinations) { | ||
futures.add(sendMessage(des, data, props)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might loop.
public void deleteFailed(ManagedLedgerException exception, Object ctx) { | ||
log.error("{} Failed to delete message at {}", exchange.getName(), ctx, exception); | ||
} | ||
}, entry.getPosition()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The entry has been released, and the 190 line position variable should be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice cache, fixed
log.error("Deserialize entry dataBuffer failed. exchangeName: {}, skip it first.", | ||
exchange.getName(), e); | ||
PENDING_SIZE_UPDATER.decrementAndGet(this); | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The entry is not released.
} | ||
|
||
@Override | ||
public void receiveExchangeDeleteOk() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client needs to be notified of the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method will be supported later.
|
||
@Override | ||
public void receiveQueueDeleteOk(long messageCount) { | ||
// nothing to do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client needs to be notified of the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method will be supported later.
}) | ||
.thenAccept(position -> { | ||
if (log.isDebugEnabled()) { | ||
log.debug("Publish message success, position {}", position.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
open Publish confirm, BasicAckBody not handled.
Motivation
Currently, the AoP can't leverage the multi bundles feature, exchanges and queues belonging to one AMQP virtual host are owned by one namespace bundle, they must work on one broker. So the broker can't scale up simply to support increasing exchanges and queues, if users want to use all brokers of a cluster, they need to use different virtual hosts for different exchanges and queues, it's very hard to adjust existing applications and it's not friendly to use.
Modifications
Verifying this change
Add new test.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below.
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
(Please explain why)
doc
(If this PR contains doc changes)