Skip to content

Commit

Permalink
[FLINK-14562] Let RabbitMQ source close consumer and channel on close
Browse files Browse the repository at this point in the history
Closing method of RabbitMQ source must close consumer and channel in order to prevent leaving idle consumer

This closes apache#10036.
  • Loading branch information
Nicolas Deslandes authored and tillrohrmann committed Nov 3, 2019
1 parent 15f8f3c commit 0828e29
Showing 1 changed file with 19 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,25 @@ public void open(Configuration config) throws Exception {
@Override
public void close() throws Exception {
super.close();

try {
if (consumer != null && channel != null) {
channel.basicCancel(consumer.getConsumerTag());
}
} catch (IOException e) {
throw new RuntimeException("Error while cancelling RMQ consumer on " + queueName
+ " at " + rmqConnectionConfig.getHost(), e);
}

try {
if (channel != null) {
channel.close();
}
} catch (IOException e) {
throw new RuntimeException("Error while closing RMQ channel with " + queueName
+ " at " + rmqConnectionConfig.getHost(), e);
}

try {
if (connection != null) {
connection.close();
Expand Down

0 comments on commit 0828e29

Please sign in to comment.