diff --git a/docs/dev/connectors/rabbitmq.md b/docs/dev/connectors/rabbitmq.md index 819979ed9119b..87473e09fc16b 100644 --- a/docs/dev/connectors/rabbitmq.md +++ b/docs/dev/connectors/rabbitmq.md @@ -131,6 +131,40 @@ val stream = env +#### Quality of Service (QoS) / Consumer Prefetch + +The RabbitMQ Source provides a simple way to set the `basicQos` on the source's channel through the `RMQConnectionConfig`. +Since there is one connection/ channel per-parallel source, this prefetch count will effectively be multiplied by the +source's parallelism for how many total unacknowledged messages can be sent to the job at one time. +If more complex configuration is required, `RMQSource#setupChannel(Connection)` can be overridden and manually configured. + +
If URI is NULL we use host, port, vHost, username, password combination * to initialize connection. using {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, - * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}. + * Integer, Boolean, Boolean, Integer, Integer, Integer, Integer, Integer)}. * *
Otherwise the URI will be used to initialize the client connection
- * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer)}
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer, Integer)}
* @return RMQConnectionConfig
*/
public RMQConnectionConfig build(){
if (this.uri != null) {
return new RMQConnectionConfig(this.uri, this.networkRecoveryInterval,
this.automaticRecovery, this.topologyRecovery, this.connectionTimeout, this.requestedChannelMax,
- this.requestedFrameMax, this.requestedHeartbeat);
+ this.requestedFrameMax, this.requestedHeartbeat, this.prefetchCount);
} else {
return new RMQConnectionConfig(this.host, this.port, this.virtualHost, this.username, this.password,
this.networkRecoveryInterval, this.automaticRecovery, this.topologyRecovery,
- this.connectionTimeout, this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat);
+ this.connectionTimeout, this.requestedChannelMax, this.requestedFrameMax, this.requestedHeartbeat,
+ this.prefetchCount);
}
}
}
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index bb9ccf8db0b4a..9e5eedd3f0587 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -41,7 +41,6 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.QueueingConsumer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -63,7 +62,8 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
/**
* Tests for the RMQSource. The source supports two operation modes.
@@ -350,6 +350,56 @@ protected Connection setupConnection() throws Exception {
Mockito.verify(mockConnection, Mockito.times(1)).createChannel();
}
+ @Test
+ public void testSetPrefetchCount() throws Exception {
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setHost("localhost")
+ .setPort(5000)
+ .setUserName("guest")
+ .setPassword("guest")
+ .setVirtualHost("/")
+ .setPrefetchCount(1000)
+ .build();
+ final Connection mockConnection = Mockito.mock(Connection.class);
+ Channel channel = Mockito.mock(Channel.class);
+ Mockito.when(mockConnection.createChannel()).thenReturn(channel);
+
+ RMQMockedRuntimeTestSource source = new RMQMockedRuntimeTestSource(connectionConfig) {
+ @Override
+ protected Connection setupConnection() throws Exception {
+ return mockConnection;
+ }
+ };
+
+ FunctionInitializationContext mockContext = getMockContext();
+ source.initializeState(mockContext);
+ source.open(new Configuration());
+
+ Mockito.verify(mockConnection, Mockito.times(1)).createChannel();
+ Mockito.verify(channel, Mockito.times(1)).basicQos(1000, true);
+ }
+
+ @Test
+ public void testUnsetPrefetchCount() throws Exception {
+ final Connection mockConnection = Mockito.mock(Connection.class);
+ Channel channel = Mockito.mock(Channel.class);
+ Mockito.when(mockConnection.createChannel()).thenReturn(channel);
+
+ RMQMockedRuntimeTestSource source = new RMQMockedRuntimeTestSource() {
+ @Override
+ protected Connection setupConnection() throws Exception {
+ return mockConnection;
+ }
+ };
+
+ FunctionInitializationContext mockContext = getMockContext();
+ source.initializeState(mockContext);
+ source.open(new Configuration());
+
+ Mockito.verify(mockConnection, Mockito.times(1)).createChannel();
+ Mockito.verify(channel, Mockito.times(0)).basicQos(anyInt());
+ }
+
private static class ConstructorTestClass extends RMQSource