From b5b9682827f0698d9b4d24215b4dd2daaf25ec30 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Sat, 6 Mar 2021 12:51:52 +0800 Subject: [PATCH] [hotfix][connector/kafka] Reduce the offset commit logging verbosity from INFO to DEBUG. --- .../connector/kafka/source/reader/KafkaSourceReader.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java index 8ca03370d25e5..bf3d42e280e4e 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java @@ -109,11 +109,13 @@ public List snapshotState(long checkpointId) { @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - LOG.info("Committing offsets for checkpoint {}", checkpointId); + LOG.debug("Committing offsets for checkpoint {}", checkpointId); ((KafkaSourceFetcherManager) splitFetcherManager) .commitOffsets( offsetsToCommit.get(checkpointId), (ignored, e) -> { + // The offset commit here is needed by the external monitoring. It won't + // break Flink job's correctness if we fail to commit the offset here. if (e != null) { LOG.warn( "Failed to commit consumer offsets for checkpoint {}", @@ -124,7 +126,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { "Successfully committed offsets for checkpoint {}", checkpointId); // If the finished topic partition has been committed, we remove it - // from the offsets of finsihed splits map. + // from the offsets of the finished splits map. Map committedPartitions = offsetsToCommit.get(checkpointId); offsetsOfFinishedSplits