Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batching not working as expected #551

Open
dixitsingla opened this issue Feb 20, 2023 · 4 comments
Open

Batching not working as expected #551

dixitsingla opened this issue Feb 20, 2023 · 4 comments

Comments

@dixitsingla
Copy link

Hi Team,

I mainly wanted to use the batching feature of Parallel Consumer so started doing a POC around it.

Currently the Kafka topic has 6 partitions and each partition has around 15k messages.

I wanted to consume the data in batches and each batch will be of 10 messages. Below are the code snippets of current parallel consumer configuration. Wanted the data to be consumed in a ordered way.

ParallelConsumerOptions

final Consumer<String, String> consumer = new KafkaConsumer<>(appProperties);
final ParallelConsumerOptions options = ParallelConsumerOptions.<String, String>builder()
                .consumer(consumer)
                .ordering(PARTITION)
                .batchSize(10)
                .build();
        ParallelStreamProcessor<String, String> eosStreamProcessor = createEosStreamProcessor(options);

appProperties


# Consumer properties
bootstrap.servers=localhost:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
max.poll.interval.ms=300000
enable.auto.commit=false
auto.offset.reset=earliest

# Application-specific properties
input.topic.name=<name of the topic>

Consumer Poll

public void runConsume(final Properties appProperties) {
        String topic = appProperties.getProperty("input.topic.name");

        LOGGER.info("Subscribing Parallel Consumer to consume from {} topic", topic);
        parallelConsumer.subscribe(Collections.singletonList(topic));

        LOGGER.info("Polling for records. This method blocks", topic);
        parallelConsumer.poll(context -> {
            List<String> payload = context.stream().map(this::preparePayload).collect(Collectors.toList());
            System.out.println("********* " + payload.size() + " **********");
        });
    }

preparePayload

private String preparePayload(RecordContext<String, String> stringStringRecordContext) {

        ConsumerRecord<String, String> consumerRecords = stringStringRecordContext.getConsumerRecord();
        int failureCount = stringStringRecordContext.getNumberOfFailedAttempts();
        System.out.println("Value: " + consumerRecords.key() + " - Partition: " + consumerRecords.partition() + " Offset: " + consumerRecords.offset());
        return msg("{}, {}", consumerRecords, failureCount);
    }

Now despite setting the batching to 10, the data is being consumed in random size of batches(1,2,3 < 10). Could someone please help me out.

Thanks in advance.

Regards,
Dixit

@rkolesnev
Copy link
Member

Hi,

The batching logic is not designed to wait for batches to be full before processing - its more optimization for when there is more data to process (i.e. processing is slower than consuming from Kafka) and processing function supports batching - send in batches.

So it looks like potentially your processing is faster than polling from Kafka - in that case you wont have full batches of records as there is not enough records accumulated in the queues yet.

One thing to check is - consumer options for fetch size, max poll records etc - to make sure you are feeding enough records into Parallel Consumer per poll.
Another - what processing are you doing? - if you are only logging them for testing the flow - you will be processing them (logging) faster than you can read from Kafka - you can add a sleep / wait to replicate the processing that you plan to do and time it takes to be more realistic.

Even with fast processing - if Topic partitions have enough data on them already and consumer is configured to return 10+ records per poll - you should get the batch filled.

You could test polling parameters using plain Kafka Consumer and checking how many records per poll it actually returns - in test stub / simple application etc.

@doppelrittberger
Copy link

Hi,
we have the same problem I think: we have a topic with a lot of pre-produced data and use partition ordering with batch size of 1000. Still we just get 1 message per partition per poll. Since we want to process the messages per partition with batched database calls this is much slower if done one-by-one.
Any chance that we change the configuration and get real batches per partition here? Polling itself does not seem to be a problem here since I configured the KafkaConsumer with batch size of 10.000 and I can see that this is fetched in ConsumerManager as well. Something in between reduces the batch to 1 per partition in the poll() call.
Thanks :)

@dixitsingla
Copy link
Author

dixitsingla commented May 10, 2023

In our case, we dropped the idea of using the batching mechanism in parallel consumer. But while doing analysis I have few observations given below.

  • If you are using the order as PARTITION then you will end up 1 message per partition.
  • For other orders(KEY, UNORDER) you can play around these Kafka configs to get more messages in a batch.

max.poll.records
fetch.max.bytes
max.partition.fetch.bytes
fetch.min.bytes
fetch.max.wait.ms

  • We also tried the above properties with normal consumer and batching mechanism was more promising.
    Consume records like this: ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

HTH!

@nscuro
Copy link
Contributor

nscuro commented Dec 28, 2023

The problem appears to be that "work" (i.e., polled records) is being queued based on shards:

// loop over shards, and get work from each
Optional<Map.Entry<ShardKey, ProcessingShard<K, V>>> next = shardQueueIterator.next();
while (workFromAllShards.size() < requestedMaxWorkToRetrieve && next.isPresent()) {
var shardEntry = next;
ProcessingShard<K, V> shard = shardEntry.get().getValue();
//
int remainingToGet = requestedMaxWorkToRetrieve - workFromAllShards.size();
var work = shard.getWorkIfAvailable(remainingToGet);
workFromAllShards.addAll(work);
// next
next = shardQueueIterator.next();
}

For processing order PARTITION, each partition is a shard. For processing order KEY, each key is a shard.

For both PARTITION and KEY, shard.getWorkIfAvailable(remainingToGet) can only ever return a single record:

if (isOrderRestricted()) {
// can't take any more work from this shard, due to ordering restrictions
// processing blocked on this shard, continue to next shard
log.trace("Processing by {}, so have cannot get more messages on this ({}) shardEntry.", this.options.getOrdering(), getKey());
break;
}

This means that, for all modes except UNORDERED, the maximum batch size is limited by:

  • Number of partitions (for PARTITION)
  • Number of unique keys (for KEY)

As @doppelrittberger mentioned, this is counter-intuitive, as batching is most useful for multiple records in the same shard. Batching currently does not help me deal with many records of the same key, or many records in the same partition.

@rkolesnev, are there any major downsides to lifting the above restriction? I think even ordered shards should be able to return multiple records, but I can't fully grasp the impact this may have on the system (e.g. offset tracking).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants