Skip to content

Commit

Permalink
Use Log4j2 instead of SLF4J for our own loggers (strimzi#864)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj committed Jan 31, 2024
1 parent 2f6bcb3 commit 6479490
Show file tree
Hide file tree
Showing 29 changed files with 178 additions and 187 deletions.
7 changes: 3 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
<slf4j.version>1.7.36</slf4j.version>
<vertx.version>4.5.2</vertx.version>
<vertx-testing.version>4.5.2</vertx-testing.version>
<netty.version>4.1.106.Final</netty.version>
Expand Down Expand Up @@ -193,9 +192,9 @@
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/io/strimzi/kafka/bridge/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.management.MalformedObjectNameException;
import java.io.BufferedReader;
Expand All @@ -44,8 +44,7 @@
* Apache Kafka bridge main application class
*/
public class Application {

private static final Logger log = LoggerFactory.getLogger(Application.class);
private static final Logger LOGGER = LogManager.getLogger(Application.class);

private static final String KAFKA_BRIDGE_METRICS_ENABLED = "KAFKA_BRIDGE_METRICS_ENABLED";

Expand All @@ -55,12 +54,12 @@ public class Application {
* @param args command line arguments
*/
public static void main(String[] args) {
log.info("Strimzi Kafka Bridge {} is starting", Application.class.getPackage().getImplementationVersion());
LOGGER.info("Strimzi Kafka Bridge {} is starting", Application.class.getPackage().getImplementationVersion());
try {
VertxOptions vertxOptions = new VertxOptions();
JmxCollectorRegistry jmxCollectorRegistry = null;
if (Boolean.parseBoolean(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) {
log.info("Metrics enabled and exposed on the /metrics endpoint");
LOGGER.info("Metrics enabled and exposed on the /metrics endpoint");
// setup Micrometer metrics options
vertxOptions.setMetricsOptions(metricsOptions());
jmxCollectorRegistry = getJmxCollectorRegistry();
Expand All @@ -75,7 +74,7 @@ public static void main(String[] args) {

Map<String, Object> config = ConfigRetriever.getConfig(absoluteFilePath(commandLine.getOptionValue("config-file")));
BridgeConfig bridgeConfig = BridgeConfig.fromMap(config);
log.info("Bridge configuration {}", bridgeConfig);
LOGGER.info("Bridge configuration {}", bridgeConfig);

deployHttpBridge(vertx, bridgeConfig, metricsReporter).onComplete(done -> {
if (done.succeeded()) {
Expand All @@ -84,7 +83,7 @@ public static void main(String[] args) {
}
});
} catch (RuntimeException | MalformedObjectNameException | IOException | ParseException e) {
log.error("Error starting the bridge", e);
LOGGER.error("Error starting the bridge", e);
System.exit(1);
}
}
Expand Down Expand Up @@ -122,10 +121,10 @@ private static Future<HttpBridge> deployHttpBridge(Vertx vertx, BridgeConfig bri
HttpBridge httpBridge = new HttpBridge(bridgeConfig, metricsReporter);
vertx.deployVerticle(httpBridge, done -> {
if (done.succeeded()) {
log.info("HTTP verticle instance deployed [{}]", done.result());
LOGGER.info("HTTP verticle instance deployed [{}]", done.result());
httpPromise.complete(httpBridge);
} else {
log.error("Failed to deploy HTTP verticle instance", done.cause());
LOGGER.error("Failed to deploy HTTP verticle instance", done.cause());
httpPromise.fail(done.cause());
}
});
Expand Down
30 changes: 15 additions & 15 deletions src/main/java/io/strimzi/kafka/bridge/KafkaBridgeAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
Expand All @@ -27,7 +27,7 @@
* Represents a Kafka bridge admin client
*/
public class KafkaBridgeAdmin {
private final Logger log = LoggerFactory.getLogger(KafkaBridgeAdmin.class);
private static final Logger LOGGER = LogManager.getLogger(KafkaBridgeAdmin.class);

private final KafkaConfig kafkaConfig;
private AdminClient adminClient;
Expand Down Expand Up @@ -68,13 +68,13 @@ public void close() {
* @return a CompletionStage bringing the set of topics
*/
public CompletionStage<Set<String>> listTopics() {
log.trace("List topics thread {}", Thread.currentThread());
log.info("List topics");
LOGGER.trace("List topics thread {}", Thread.currentThread());
LOGGER.info("List topics");
CompletableFuture<Set<String>> promise = new CompletableFuture<>();
this.adminClient.listTopics()
.names()
.whenComplete((topics, exception) -> {
log.trace("List topics callback thread {}", Thread.currentThread());
LOGGER.trace("List topics callback thread {}", Thread.currentThread());
if (exception == null) {
promise.complete(topics);
} else {
Expand All @@ -91,13 +91,13 @@ public CompletionStage<Set<String>> listTopics() {
* @return a CompletionStage bringing the description of the specified topics.
*/
public CompletionStage<Map<String, TopicDescription>> describeTopics(List<String> topicNames) {
log.trace("Describe topics thread {}", Thread.currentThread());
log.info("Describe topics {}", topicNames);
LOGGER.trace("Describe topics thread {}", Thread.currentThread());
LOGGER.info("Describe topics {}", topicNames);
CompletableFuture<Map<String, TopicDescription>> promise = new CompletableFuture<>();
this.adminClient.describeTopics(topicNames)
.allTopicNames()
.whenComplete((topics, exception) -> {
log.trace("Describe topics callback thread {}", Thread.currentThread());
LOGGER.trace("Describe topics callback thread {}", Thread.currentThread());
if (exception == null) {
promise.complete(topics);
} else {
Expand All @@ -114,13 +114,13 @@ public CompletionStage<Map<String, TopicDescription>> describeTopics(List<String
* @return a CompletionStage bringing the configuration of the specified resources.
*/
public CompletionStage<Map<ConfigResource, Config>> describeConfigs(List<ConfigResource> configResources) {
log.trace("Describe configs thread {}", Thread.currentThread());
log.info("Describe configs {}", configResources);
LOGGER.trace("Describe configs thread {}", Thread.currentThread());
LOGGER.info("Describe configs {}", configResources);
CompletableFuture<Map<ConfigResource, Config>> promise = new CompletableFuture<>();
this.adminClient.describeConfigs(configResources)
.all()
.whenComplete((configs, exception) -> {
log.trace("Describe configs callback thread {}", Thread.currentThread());
LOGGER.trace("Describe configs callback thread {}", Thread.currentThread());
if (exception == null) {
promise.complete(configs);
} else {
Expand All @@ -137,13 +137,13 @@ public CompletionStage<Map<ConfigResource, Config>> describeConfigs(List<ConfigR
* @return a CompletionStage bringing the offset spec for the given partition.
*/
public CompletionStage<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
log.trace("Get offsets thread {}", Thread.currentThread());
log.info("Get the offset spec for partition {}", topicPartitionOffsets);
LOGGER.trace("Get offsets thread {}", Thread.currentThread());
LOGGER.info("Get the offset spec for partition {}", topicPartitionOffsets);
CompletableFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> promise = new CompletableFuture<>();
this.adminClient.listOffsets(topicPartitionOffsets)
.all()
.whenComplete((offsets, exception) -> {
log.trace("Get offsets callback thread {}", Thread.currentThread());
LOGGER.trace("Get offsets callback thread {}", Thread.currentThread());
if (exception == null) {
promise.complete(offsets);
} else {
Expand Down
39 changes: 19 additions & 20 deletions src/main/java/io/strimzi/kafka/bridge/KafkaBridgeConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.Duration;
import java.util.HashSet;
Expand All @@ -33,8 +33,7 @@
* @param <V> type of Kafka message payload
*/
public class KafkaBridgeConsumer<K, V> {

private final Logger log = LoggerFactory.getLogger(KafkaBridgeConsumer.class);
private static final Logger LOGGER = LogManager.getLogger(KafkaBridgeConsumer.class);

private final KafkaConfig kafkaConfig;
private final Deserializer<K> keyDeserializer;
Expand Down Expand Up @@ -102,18 +101,18 @@ public void subscribe(List<SinkTopicSubscription> topicSubscriptions) {
return;
}

log.info("Subscribe to topics {}", topicSubscriptions);
LOGGER.info("Subscribe to topics {}", topicSubscriptions);
Set<String> topics = topicSubscriptions.stream().map(SinkTopicSubscription::getTopic).collect(Collectors.toSet());
log.trace("Subscribe thread {}", Thread.currentThread());
LOGGER.trace("Subscribe thread {}", Thread.currentThread());
this.consumer.subscribe(topics, loggingPartitionsRebalance);
}

/**
* Unsubscribe all the topics which the consumer currently subscribes
*/
public void unsubscribe() {
log.info("Unsubscribe from topics");
log.trace("Unsubscribe thread {}", Thread.currentThread());
LOGGER.info("Unsubscribe from topics");
LOGGER.trace("Unsubscribe thread {}", Thread.currentThread());
this.consumer.unsubscribe();
}

Expand All @@ -123,8 +122,8 @@ public void unsubscribe() {
* @return set of topic partitions to which the consumer is subscribed
*/
public Set<TopicPartition> listSubscriptions() {
log.info("Listing subscribed topics");
log.trace("ListSubscriptions thread {}", Thread.currentThread());
LOGGER.info("Listing subscribed topics");
LOGGER.trace("ListSubscriptions thread {}", Thread.currentThread());
return this.consumer.assignment();
}

Expand All @@ -134,8 +133,8 @@ public Set<TopicPartition> listSubscriptions() {
* @param pattern Java regex for topics subscription
*/
public void subscribe(Pattern pattern) {
log.info("Subscribe to topics with pattern {}", pattern);
log.trace("Subscribe thread {}", Thread.currentThread());
LOGGER.info("Subscribe to topics with pattern {}", pattern);
LOGGER.trace("Subscribe thread {}", Thread.currentThread());
this.consumer.subscribe(pattern, loggingPartitionsRebalance);
}

Expand All @@ -149,7 +148,7 @@ public void assign(List<SinkTopicSubscription> topicSubscriptions) {
throw new IllegalArgumentException("Topic subscriptions cannot be null");
}

log.info("Assigning to topics partitions {}", topicSubscriptions);
LOGGER.info("Assigning to topics partitions {}", topicSubscriptions);
// TODO: maybe we don't need the SinkTopicSubscription class anymore? Removing "offset" field, it's now the same as TopicPartition class?
Set<TopicPartition> topicPartitions = new HashSet<>();
for (SinkTopicSubscription topicSubscription : topicSubscriptions) {
Expand All @@ -161,7 +160,7 @@ public void assign(List<SinkTopicSubscription> topicSubscriptions) {
return;
}

log.trace("Assign thread {}", Thread.currentThread());
LOGGER.trace("Assign thread {}", Thread.currentThread());
this.consumer.assign(topicPartitions);
}

Expand All @@ -172,7 +171,7 @@ public void assign(List<SinkTopicSubscription> topicSubscriptions) {
* @return records polled from the Kafka cluster
*/
public ConsumerRecords<K, V> poll(long timeout) {
log.trace("Poll thread {}", Thread.currentThread());
LOGGER.trace("Poll thread {}", Thread.currentThread());
return this.consumer.poll(Duration.ofMillis(timeout));
}

Expand All @@ -183,7 +182,7 @@ public ConsumerRecords<K, V> poll(long timeout) {
* @return map containing topic partitions and corresponding committed offsets
*/
public Map<TopicPartition, OffsetAndMetadata> commit(Map<TopicPartition, OffsetAndMetadata> offsetsData) {
log.trace("Commit thread {}", Thread.currentThread());
LOGGER.trace("Commit thread {}", Thread.currentThread());
// TODO: doesn't it make sense to change using the commitAsync?
// does it still make sense to return the offsets we get as parameter?
this.consumer.commitSync(offsetsData);
Expand All @@ -194,7 +193,7 @@ public Map<TopicPartition, OffsetAndMetadata> commit(Map<TopicPartition, OffsetA
* Commit offsets returned on the last poll() for all the subscribed list of topics and partitions
*/
public void commitLastPolledOffsets() {
log.trace("Commit thread {}", Thread.currentThread());
LOGGER.trace("Commit thread {}", Thread.currentThread());
// TODO: doesn't it make sense to change using the commitAsync?
this.consumer.commitSync();
}
Expand All @@ -206,7 +205,7 @@ public void commitLastPolledOffsets() {
* @param offset offset to seek to on the topic partition
*/
public void seek(TopicPartition topicPartition, long offset) {
log.trace("Seek thread {}", Thread.currentThread());
LOGGER.trace("Seek thread {}", Thread.currentThread());
this.consumer.seek(topicPartition, offset);
}

Expand All @@ -216,7 +215,7 @@ public void seek(TopicPartition topicPartition, long offset) {
* @param topicPartitionSet set of topic partition on which to seek at the beginning
*/
public void seekToBeginning(Set<TopicPartition> topicPartitionSet) {
log.trace("SeekToBeginning thread {}", Thread.currentThread());
LOGGER.trace("SeekToBeginning thread {}", Thread.currentThread());
this.consumer.seekToBeginning(topicPartitionSet);
}

Expand All @@ -226,7 +225,7 @@ public void seekToBeginning(Set<TopicPartition> topicPartitionSet) {
* @param topicPartitionSet set of topic partition on which to seek at the end
*/
public void seekToEnd(Set<TopicPartition> topicPartitionSet) {
log.trace("SeekToEnd thread {}", Thread.currentThread());
LOGGER.trace("SeekToEnd thread {}", Thread.currentThread());
this.consumer.seekToEnd(topicPartitionSet);
}
}
19 changes: 9 additions & 10 deletions src/main/java/io/strimzi/kafka/bridge/KafkaBridgeProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Properties;
import java.util.concurrent.CompletableFuture;
Expand All @@ -24,8 +24,7 @@
* Represents a Kafka bridge producer client
*/
public class KafkaBridgeProducer<K, V> {

private final Logger log = LoggerFactory.getLogger(KafkaBridgeProducer.class);
private static final Logger LOGGER = LogManager.getLogger(KafkaBridgeProducer.class);

private final KafkaConfig kafkaConfig;
private final Serializer<K> keySerializer;
Expand Down Expand Up @@ -56,11 +55,11 @@ public KafkaBridgeProducer(KafkaConfig kafkaConfig, Serializer<K> keySerializer,
*/
public CompletionStage<RecordMetadata> send(ProducerRecord<K, V> record) {
CompletableFuture<RecordMetadata> promise = new CompletableFuture<>();
log.trace("Send thread {}", Thread.currentThread());
log.debug("Sending record {}", record);
LOGGER.trace("Send thread {}", Thread.currentThread());
LOGGER.debug("Sending record {}", record);
this.producer.send(record, (metadata, exception) -> {
log.trace("Kafka client callback thread {}", Thread.currentThread());
log.debug("Sent record {} at offset {}", record, metadata.offset());
LOGGER.trace("Kafka client callback thread {}", Thread.currentThread());
LOGGER.debug("Sent record {} at offset {}", record, metadata.offset());
if (exception == null) {
promise.complete(metadata);
} else {
Expand All @@ -76,8 +75,8 @@ public CompletionStage<RecordMetadata> send(ProducerRecord<K, V> record) {
* @param record Kafka record to send
*/
public void sendIgnoreResult(ProducerRecord<K, V> record) {
log.trace("Send ignore result thread {}", Thread.currentThread());
log.debug("Sending record {}", record);
LOGGER.trace("Send ignore result thread {}", Thread.currentThread());
LOGGER.debug("Sending record {}", record);
this.producer.send(record);
}

Expand Down
Loading

0 comments on commit 6479490

Please sign in to comment.