Skip to content

Commit

Permalink
[FLINK-7267][connectors/rabbitmq] Allow overriding RMQSink connection
Browse files Browse the repository at this point in the history
This closes apache#12185
  • Loading branch information
austince authored and StephanEwen committed May 16, 2020
1 parent cce715b commit 6fa85fe
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,28 @@ public void setLogFailuresOnly(boolean logFailuresOnly) {
this.logFailuresOnly = logFailuresOnly;
}

/**
* Initializes the connection to RMQ with a default connection factory. The user may override
* this method to setup and configure their own {@link ConnectionFactory}.
*/
protected ConnectionFactory setupConnectionFactory() throws Exception {
return rmqConnectionConfig.getConnectionFactory();
}

/**
* Initializes the connection to RMQ using the default connection factory from {@link #setupConnectionFactory()}.
* The user may override this method to setup and configure their own {@link Connection}.
*/
protected Connection setupConnection() throws Exception {
return setupConnectionFactory().newConnection();
}

@Override
public void open(Configuration config) throws Exception {
ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory();
schema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));

try {
connection = factory.newConnection();
connection = setupConnection();
channel = connection.createChannel();
if (channel == null) {
throw new RuntimeException("None of RabbitMQ channels are available");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -97,6 +98,24 @@ public void openCallDontDeclaresQueueInWithOptionsMode() throws Exception {
verify(channel, never()).queueDeclare(null, true, false, false, null);
}

@Test
public void testOverrideConnection() throws Exception {
final Connection mockConnection = mock(Connection.class);
Channel channel = mock(Channel.class);
when(mockConnection.createChannel()).thenReturn(channel);

RMQSink<String> rmqSink = new RMQSink<String>(rmqConnectionConfig, QUEUE_NAME, serializationSchema) {
@Override
protected Connection setupConnection() throws Exception {
return mockConnection;
}
};

rmqSink.open(new Configuration());

verify(mockConnection, times(1)).createChannel();
}

@Test
public void throwExceptionIfChannelIsNull() throws Exception {
when(connection.createChannel()).thenReturn(null);
Expand Down

0 comments on commit 6fa85fe

Please sign in to comment.