Skip to content

Commit

Permalink
Refactor and add necessary comments
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinav-nath committed Aug 14, 2022
1 parent 7885b2c commit 3b35dc0
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
public class RunApp {

public static void main(String[] args) {
String topics = "TestTopic5";
String consumerGroup = "ConsumerGroup";
String diffConsumerGroup = "DiffConsumerGroup";
final String topics = "TestTopic1";
final String consumerGroup = "ConsumerGroup";
// final String diffConsumerGroup = "DiffConsumerGroup";

Producer producer = new Producer(BOOTSTRAP_SERVERS, topics);
Consumer consumer1 = new Consumer("Consumer1", BOOTSTRAP_SERVERS, consumerGroup, topics);
Consumer consumer2 = new Consumer("Consumer2", BOOTSTRAP_SERVERS, diffConsumerGroup, topics);
// Consumer consumer2 = new Consumer("Consumer2", BOOTSTRAP_SERVERS, diffConsumerGroup, topics);

(new Thread(producer::produceMessages)).start();

(new Thread(consumer1::consumeMessages)).start();
(new Thread(consumer2::consumeMessages)).start();
// (new Thread(consumer2::consumeMessages)).start();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ private void setConsumerProperties() {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

// if we want to adjust defaults
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // default is latest
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) // default 5000 - change how often to commit offsets
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // default is latest
// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) // default 5000 - change how often to commit offsets
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000) // default 5000 - change how often to commit offsets
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ private void setProducerProperties() {
public void produceMessages() {
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
// producer.send(new ProducerRecord<>(topics, key, value));
producer.send(new ProducerRecord<>(topics, "apple", "this is message #" + i));
}
producer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ public class Constants {
private Constants() {
}

public static final String BOOTSTRAP_SERVERS = ":29092,localhost:29092";
public static final String BOOTSTRAP_SERVERS = ":29092, localhost:29092";

}

0 comments on commit 3b35dc0

Please sign in to comment.