diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index d4541533f477c..f079369e92486 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -177,6 +177,25 @@ public void open(Configuration config) throws Exception { @Override public void close() throws Exception { super.close(); + + try { + if (consumer != null && channel != null) { + channel.basicCancel(consumer.getConsumerTag()); + } + } catch (IOException e) { + throw new RuntimeException("Error while cancelling RMQ consumer on " + queueName + + " at " + rmqConnectionConfig.getHost(), e); + } + + try { + if (channel != null) { + channel.close(); + } + } catch (IOException e) { + throw new RuntimeException("Error while closing RMQ channel with " + queueName + + " at " + rmqConnectionConfig.getHost(), e); + } + try { if (connection != null) { connection.close();