Skip to content

Commit

Permalink
Add wait cycles in queuedMessagesNotProcessedOrCommittedIfSubmittedDu…
Browse files Browse the repository at this point in the history
…ringShutdown test (#795)
  • Loading branch information
rkolesnev authored Jun 10, 2024
1 parent 1819a8b commit 80128ee
Showing 1 changed file with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
* Copyright (C) 2020-2024 Confluent, Inc.
*/

import io.confluent.csid.utils.JavaUtils;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.Range;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.csid.utils.*;
import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -163,27 +159,34 @@ public void queuedMessagesNotProcessedOrCommittedIfSubmittedDuringShutdown(Commi
consumerSpy.addRecord(ktu.makeRecord("0", "v2"));
consumerSpy.addRecord(ktu.makeRecord("1", "v3"));
consumerSpy.addRecord(ktu.makeRecord("0", "v4"));

AtomicBoolean gotK0 = new AtomicBoolean(false);
AtomicBoolean gotK1 = new AtomicBoolean(false);
parallelConsumer.poll((record) -> {
if(record.getSingleConsumerRecord().value().equals("v1")) {
if (record.getSingleConsumerRecord().value().equals("v1")) {
gotK0.set(true);
try {
latch.await();
ThreadUtils.sleepQuietly(100);
} catch (InterruptedException interruptedException) {
interrupted.set(true);
Thread.interrupted(); //reset interrupted flag.
}
} else if (record.getSingleConsumerRecord().key().equals("1")) {
gotK1.set(true);
}
});

// let it process
while (!gotK0.get() && !gotK1.get()) {
awaitForSomeLoopCycles(1);
}
// a bit more time to help with flakiness on slower CI
awaitForSomeLoopCycles(2);

latch.countDown();
parallelConsumer.close();

//
assertCommits(of(1,2), "primed record and first key=0 record completed only, followup key 0 records skipped");
assertCommits(of(1, 2), "primed record and first key=0 record completed only, followup key 0 records skipped");
assertCommits().encodedIncomplete(2); //first blocked/skipped key 0 record (value v2).
assertThat(interrupted).isFalse();
}
Expand Down

0 comments on commit 80128ee

Please sign in to comment.