Skip to content

Commit

Permalink
[FLINK-10195][connectors/rabbitmq] Allow setting QoS
Browse files Browse the repository at this point in the history
  • Loading branch information
austince authored and dawidwys committed Jul 29, 2020
1 parent 4343e00 commit 3b6ca3c
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 14 deletions.
34 changes: 34 additions & 0 deletions docs/dev/connectors/rabbitmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,40 @@ val stream = env
</div>
</div>

#### Quality of Service (QoS) / Consumer Prefetch

The RabbitMQ Source provides a simple way to set the `basicQos` on the source's channel through the `RMQConnectionConfig`.
Since there is one connection/ channel per-parallel source, this prefetch count will effectively be multiplied by the
source's parallelism for how many total unacknowledged messages can be sent to the job at one time.
If more complex configuration is required, `RMQSource#setupChannel(Connection)` can be overridden and manually configured.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setPrefetchCount(30_000)
...
.build();

{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val connectionConfig = new RMQConnectionConfig.Builder()
.setPrefetchCount(30000)
...
.build
{% endhighlight %}
</div>
</div>

The prefetch count is unset by default, meaning the RabbitMQ server will send unlimited messages. In production, it
is best to set this value. For high volume queues and checkpointing enabled, some tuning may be required to reduce
wasted cycles, as messages are only acknowledged on checkpoints if enabled.

More about QoS and prefetch can be found [here](https://www.rabbitmq.com/confirms.html#channel-qos-prefetch)
and more about the options available in AMQP 0-9-1 [here](https://www.rabbitmq.com/consumer-prefetch.html).

### RabbitMQ Sink
This connector provides a `RMQSink` class for sending messages to a RabbitMQ
queue. Below is a code example for setting up a RabbitMQ sink.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Changes made to the source, taken from com.rabbitmq:amqp-client:4.2.0:
// - brought this class out of com.rabbitmq.client.QueueingConsumer

package org.apache.flink.streaming.connectors.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

/**
* Encapsulates an arbitrary message - simple "bean" holder structure.
* TODO: replace this with `com.rabbitmq.client.Delivery` in RMQ v5.x
*/
public class Delivery {
private final Envelope envelope;
private final AMQP.BasicProperties properties;
private final byte[] body;

public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
this.envelope = envelope;
this.properties = properties;
this.body = body;
}

/**
* Retrieve the message envelope.
*
* @return the message envelope
*/
public Envelope getEnvelope() {
return envelope;
}

/**
* Retrieve the message properties.
*
* @return the message properties
*/
public AMQP.BasicProperties getProperties() {
return properties;
}

/**
* Retrieve the message body.
*
* @return the message body
*/
public byte[] getBody() {
return body;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Changes made to the source, taken from com.rabbitmq:amqp-client:4.2.0:
// - copied from com.rabbitmq.client.QueueingConsumer
// - updated naming conventions for the Apache Flink standards

package org.apache.flink.streaming.connectors.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

class QueueingConsumer extends DefaultConsumer {
private final BlockingQueue<Delivery> queue;

// When this is non-null the queue is in shutdown mode and nextDelivery should
// throw a shutdown signal exception.
private volatile ShutdownSignalException shutdown;
private volatile ConsumerCancelledException cancelled;

// Marker object used to signal the queue is in shutdown mode.
// It is only there to wake up consumers. The canonical representation
// of shutting down is the presence of shutdown.
// Invariant: This is never on queue unless shutdown != null.
private static final Delivery POISON = new Delivery(null, null, null);

public QueueingConsumer(Channel channel) {
this(channel, Integer.MAX_VALUE);
}

public QueueingConsumer(Channel channel, int capacity) {
super(channel);
this.queue = new LinkedBlockingQueue<>(capacity);
}

/**
* Check if we are in shutdown mode and if so throw an exception.
*/
private void checkShutdown() {
if (shutdown != null) {
throw Utility.fixStackTrace(shutdown);
}
}

/**
* If delivery is not POISON nor null, return it.
* <p/>
* If delivery, shutdown and cancelled are all null, return null.
* <p/>
* If delivery is POISON re-insert POISON into the queue and
* throw an exception if POISONed for no reason.
* <p/>
* Otherwise, if we are in shutdown mode or cancelled,
* throw a corresponding exception.
*/
private Delivery handle(Delivery delivery) {
if (delivery == POISON ||
delivery == null && (shutdown != null || cancelled != null)) {
if (delivery == POISON) {
queue.add(POISON);
if (shutdown == null && cancelled == null) {
throw new IllegalStateException(
"POISON in queue, but null shutdown and null cancelled. " +
"This should never happen, please report as a BUG");
}
}
if (null != shutdown) {
throw Utility.fixStackTrace(shutdown);
}
if (null != cancelled) {
throw Utility.fixStackTrace(cancelled);
}
}
return delivery;
}

/**
* Main application-side API: wait for the next message delivery and return it.
*
* @return the next message
* @throws InterruptedException if an interrupt is received while waiting
* @throws ShutdownSignalException if the connection is shut down while waiting
* @throws ConsumerCancelledException if this consumer is cancelled while waiting
*/
public Delivery nextDelivery() throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
return handle(queue.take());
}

/**
* Main application-side API: wait for the next message delivery and return it.
*
* @param timeout timeout in millisecond
* @return the next message or null if timed out
* @throws InterruptedException if an interrupt is received while waiting
* @throws ShutdownSignalException if the connection is shut down while waiting
* @throws ConsumerCancelledException if this consumer is cancelled while waiting
*/
public Delivery nextDelivery(long timeout)
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
return nextDelivery(timeout, TimeUnit.MILLISECONDS);
}

/**
* Main application-side API: wait for the next message delivery and return it.
*
* @param timeout timeout
* @param unit timeout unit
* @return the next message or null if timed out
* @throws InterruptedException if an interrupt is received while waiting
* @throws ShutdownSignalException if the connection is shut down while waiting
* @throws ConsumerCancelledException if this consumer is cancelled while waiting
*/
public Delivery nextDelivery(long timeout, TimeUnit unit)
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException {
return handle(queue.poll(timeout, unit));
}

@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
shutdown = sig;
queue.add(POISON);
}

@Override
public void handleCancel(String consumerTag) throws IOException {
cancelled = new ConsumerCancelledException();
queue.add(POISON);
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
checkShutdown();
this.queue.add(new Delivery(envelope, properties, body));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.rabbitmq;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -32,7 +33,6 @@
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -137,15 +137,36 @@ protected ConnectionFactory setupConnectionFactory() throws Exception {
* Initializes the connection to RMQ using the default connection factory from {@link #setupConnectionFactory()}.
* The user may override this method to setup and configure their own {@link Connection}.
*/
@VisibleForTesting
protected Connection setupConnection() throws Exception {
return setupConnectionFactory().newConnection();
}

/**
* Initializes the consumer's {@link Channel}. If a prefetch count has been set in {@link RMQConnectionConfig},
* the new channel will be use it for {@link Channel#basicQos(int)}.
*
* @param connection the consumer's {@link Connection}.
* @return the channel.
* @throws Exception if there is an issue creating or configuring the channel.
*/
private Channel setupChannel(Connection connection) throws Exception {
Channel chan = connection.createChannel();
if (rmqConnectionConfig.getPrefetchCount().isPresent()) {
// set the global flag for the entire channel, though shouldn't make a difference
// since there is only one consumer, and each parallel instance of the source will
// create a new connection (and channel)
chan.basicQos(rmqConnectionConfig.getPrefetchCount().get(), true);
}
return chan;
}

/**
* Sets up the queue. The default implementation just declares the queue. The user may override
* this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
* defining custom queue parameters)
*/
@VisibleForTesting
protected void setupQueue() throws IOException {
Util.declareQueueDefaults(channel, queueName);
}
Expand All @@ -155,7 +176,7 @@ public void open(Configuration config) throws Exception {
super.open(config);
try {
connection = setupConnection();
channel = connection.createChannel();
channel = setupChannel(connection);
if (channel == null) {
throw new RuntimeException("None of RabbitMQ channels are available");
}
Expand Down Expand Up @@ -219,7 +240,7 @@ public void close() throws Exception {
public void run(SourceContext<OUT> ctx) throws Exception {
final RMQCollector collector = new RMQCollector(ctx);
while (running) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Delivery delivery = consumer.nextDelivery();

synchronized (ctx.getCheckpointLock()) {
if (!autoAck) {
Expand Down
Loading

0 comments on commit 3b6ca3c

Please sign in to comment.