Skip to content

Commit

Permalink
add GetEndOffsets API (#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed Jul 4, 2019
1 parent 2079ab1 commit 8e00578
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.github.bsideup.liiklus.records;

import java.util.Map;
import java.util.concurrent.CompletionStage;

public interface FiniteRecordsStorage extends RecordsStorage {

/**
* Returns a {@link Map} where key is partition's number and value is the latest offset.
* The offset can be zero. Offset -1 means that there is no offset for this partition.
*/
CompletionStage<Map<Integer, Long>> getEndOffsets(String topic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.github.bsideup.liiklus.positions.GroupId;
import com.github.bsideup.liiklus.positions.PositionsStorage;
import com.github.bsideup.liiklus.protocol.*;
import com.github.bsideup.liiklus.records.FiniteRecordsStorage;
import com.github.bsideup.liiklus.records.RecordPostProcessor;
import com.github.bsideup.liiklus.records.RecordPreProcessor;
import com.github.bsideup.liiklus.records.RecordsStorage;
Expand Down Expand Up @@ -302,6 +303,24 @@ public Mono<GetOffsetsReply> getOffsets(Mono<GetOffsetsRequest> request) {
);
}

@Override
public Mono<GetEndOffsetsReply> getEndOffsets(GetEndOffsetsRequest message, ByteBuf metadata) {
return getEndOffsets(Mono.just(message));
}

@Override
public Mono<GetEndOffsetsReply> getEndOffsets(Mono<GetEndOffsetsRequest> request) {
return request.flatMap(getEndOffsets -> {
if (!(recordsStorage instanceof FiniteRecordsStorage)) {
return Mono.error(Status.INTERNAL.withDescription("The record storage is not finite").asException());
}

var topic = getEndOffsets.getTopic();
return Mono.fromCompletionStage(((FiniteRecordsStorage) recordsStorage).getEndOffsets(topic))
.map(endOffsets -> GetEndOffsetsReply.newBuilder().putAllOffsets(endOffsets).build());
});
}

private Mono<Map<Integer, Optional<Long>>> getLatestOffsetsOfGroup(String topic, String groupName) {
return getOffsetsByGroupName(topic, groupName)
.map(ackedOffsets -> ackedOffsets.values().stream()
Expand Down
57 changes: 57 additions & 0 deletions app/src/test/java/com/github/bsideup/liiklus/EndOffsetsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.github.bsideup.liiklus;

import com.github.bsideup.liiklus.protocol.GetEndOffsetsRequest;
import com.github.bsideup.liiklus.protocol.PublishRequest;
import com.github.bsideup.liiklus.test.AbstractIntegrationTest;
import com.google.protobuf.ByteString;
import org.junit.Before;
import org.junit.Test;

import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

public class EndOffsetsTest extends AbstractIntegrationTest {

private String topic;

@Before
public final void setUpEndOffsetsTest() {
topic = testName.getMethodName();
}

@Test
public void testEndOffsets() {
var value = ByteString.copyFromUtf8("foo");

for (int partition = 0; partition < NUM_PARTITIONS; partition++) {
for (int i = 0; i < partition + 1; i++) {
stub.publish(PublishRequest.newBuilder()
.setTopic(topic)
.setKey(ByteString.copyFromUtf8(PARTITION_KEYS.get(partition)))
.setValue(value)
.build()
).block();
}
}

var reply = stub.getEndOffsets(GetEndOffsetsRequest.newBuilder().setTopic(topic).build()).block();

assertThat(reply.getOffsetsMap())
.hasSize(NUM_PARTITIONS)
.allSatisfy((partition, offset) -> {
assertThat(offset)
.as("offset of p" + partition)
.isEqualTo(partition.longValue());
});
}

@Test
public void testEndOffsets_unknownTopic() {
var randomTopic = UUID.randomUUID().toString();
var reply = stub.getEndOffsets(GetEndOffsetsRequest.newBuilder().setTopic(randomTopic).build()).block();

assertThat(reply.getOffsetsMap()).isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ public interface LiiklusClient {
Mono<Empty> ack(AckRequest message);

Mono<GetOffsetsReply> getOffsets(GetOffsetsRequest message);

Mono<GetEndOffsetsReply> getEndOffsets(GetEndOffsetsRequest message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ public Mono<Empty> ack(AckRequest message) {
public Mono<GetOffsetsReply> getOffsets(GetOffsetsRequest message) {
return liiklusServiceClient.getOffsets(message);
}

@Override
public Mono<GetEndOffsetsReply> getEndOffsets(GetEndOffsetsRequest message) {
return liiklusServiceClient.getEndOffsets(message);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.bsideup.liiklus.records.inmemory;

import com.github.bsideup.liiklus.records.RecordsStorage;
import com.github.bsideup.liiklus.records.FiniteRecordsStorage;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.Value;
Expand Down Expand Up @@ -33,7 +33,7 @@
*/
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class InMemoryRecordsStorage implements RecordsStorage {
public class InMemoryRecordsStorage implements FiniteRecordsStorage {

public static int partitionByKey(String key, int numberOfPartitions) {
return partitionByKey(ByteBuffer.wrap(key.getBytes()), numberOfPartitions);
Expand Down Expand Up @@ -74,6 +74,20 @@ public CompletionStage<OffsetInfo> publish(Envelope envelope) {
));
}

@Override
public CompletionStage<Map<Integer, Long>> getEndOffsets(String topic) {
var partitions = state.getOrDefault(topic, new StoredTopic(numberOfPartitions)).getPartitions();
return CompletableFuture.completedFuture(
partitions.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
it -> Math.max(
0,
it.getValue().getNextOffset().get() - 1
)
))
);
}

@Override
public Subscription subscribe(String topic, String groupName, Optional<String> autoOffsetReset) {
var storedTopic = state.computeIfAbsent(topic, __ -> new StoredTopic(numberOfPartitions));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.bsideup.liiklus.kafka;

import com.github.bsideup.liiklus.records.RecordsStorage;
import com.github.bsideup.liiklus.records.FiniteRecordsStorage;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.experimental.FieldDefaults;
Expand Down Expand Up @@ -33,11 +33,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@FieldDefaults(makeFinal = true)
@Slf4j
public class KafkaRecordsStorage implements RecordsStorage {
public class KafkaRecordsStorage implements FiniteRecordsStorage {

private static final Scheduler KAFKA_POLL_SCHEDULER = Schedulers.elastic();

Expand All @@ -60,6 +61,39 @@ public KafkaRecordsStorage(String bootstrapServers) {
);
}

@Override
public CompletionStage<Map<Integer, Long>> getEndOffsets(String topic) {
return Mono.fromCallable(() -> {
var properties = new HashMap<String, Object>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "0");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");

try (
var consumer = new KafkaConsumer<ByteBuffer, ByteBuffer>(
properties,
new ByteBufferDeserializer(),
new ByteBufferDeserializer()
)
) {
consumer.subscribe(List.of(topic));

var endOffsets = consumer.endOffsets(
consumer.partitionsFor(topic).stream()
.map(it -> new TopicPartition(topic, it.partition()))
.collect(Collectors.toSet())
);

return endOffsets.entrySet().stream().collect(Collectors.toMap(
it -> it.getKey().partition(),
it -> it.getValue() - 1
));
}
}).subscribeOn(Schedulers.elastic()).toFuture();
}

@Override
public CompletionStage<OffsetInfo> publish(Envelope envelope) {
String topic = envelope.getTopic();
Expand Down
12 changes: 12 additions & 0 deletions protocol/src/main/proto/LiiklusService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ service LiiklusService {
rpc GetOffsets(GetOffsetsRequest) returns (GetOffsetsReply) {

}

rpc GetEndOffsets(GetEndOffsetsRequest) returns (GetEndOffsetsReply) {

}
}

message PublishRequest {
Expand Down Expand Up @@ -122,4 +126,12 @@ message GetOffsetsRequest {

message GetOffsetsReply {
map<uint32, uint64> offsets = 1;
}

message GetEndOffsetsRequest {
string topic = 1;
}

message GetEndOffsetsReply {
map<uint32, uint64> offsets = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import com.github.bsideup.liiklus.records.tests.BackPressureTest;
import com.github.bsideup.liiklus.records.tests.ConsumerGroupTest;
import com.github.bsideup.liiklus.records.tests.EndOffsetsTest;
import com.github.bsideup.liiklus.records.tests.PublishTest;
import com.github.bsideup.liiklus.records.tests.SubscribeTest;

public interface RecordStorageTests extends PublishTest, SubscribeTest, ConsumerGroupTest, BackPressureTest {
public interface RecordStorageTests extends PublishTest, SubscribeTest, ConsumerGroupTest, BackPressureTest, EndOffsetsTest {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.github.bsideup.liiklus.records.tests;

import com.github.bsideup.liiklus.records.FiniteRecordsStorage;
import com.github.bsideup.liiklus.records.RecordStorageTestSupport;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

public interface EndOffsetsTest extends RecordStorageTestSupport {

default FiniteRecordsStorage getFiniteTarget() {
return (FiniteRecordsStorage) getTarget();
}

int getNumberOfPartitions();

String keyByPartition(int partition);

@BeforeEach
default void blah(TestInfo testInfo) {
if (EndOffsetsTest.class == testInfo.getTestMethod().map(Method::getDeclaringClass).orElse(null)) {
Assumptions.assumeTrue(getTarget() instanceof FiniteRecordsStorage, "target is finite");
}
}

@Test
default void testEndOffsets() throws Exception {
var topic = getTopic();

for (int partition = 0; partition < getNumberOfPartitions(); partition++) {
for (int i = 0; i < partition + 1; i++) {
publish(keyByPartition(partition).getBytes(), new byte[1]);
}
}

var offsets = getFiniteTarget().getEndOffsets(topic).toCompletableFuture().get(10, TimeUnit.SECONDS);

assertThat(offsets)
.hasSize(getNumberOfPartitions())
.allSatisfy((partition, offset) -> {
assertThat(offset)
.as("offset of p" + partition)
.isEqualTo(partition.longValue());
});
}

@Test
default void testEndOffsets_unknownTopic() throws Exception {
var topic = UUID.randomUUID().toString();

var offsets = getFiniteTarget().getEndOffsets(topic).toCompletableFuture().get(10, TimeUnit.SECONDS);

assertThat(offsets)
.allSatisfy((partition, offset) -> {
assertThat(offset)
.as("offset of p" + partition)
.isEqualTo(-1L);
});
}
}

0 comments on commit 8e00578

Please sign in to comment.