Skip to content

Commit

Permalink
add GetOffsets API
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed Apr 23, 2018
1 parent 101aa68 commit 9c3742e
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@

public interface PositionsStorage {

CompletionStage<Map<Integer, Long>> fetch(String topic, String groupId, Set<Integer> partitions, Map<Integer, Long> externalPositions);
CompletionStage<Map<Integer, Long>> fetch(String topic, String groupId, Set<Integer> partitions);

CompletionStage<Void> update(String topic, String groupId, int partition, long position);

Publisher<Positions> findAll();

CompletionStage<Map<Integer, Long>> findAll(String topic, String groupId);

@Value
class Positions {

String topic;

String groupId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

public interface RecordsStorage {

CompletionStage<Void> publish(String topic, ByteBuffer key, ByteBuffer value);
CompletionStage<OffsetInfo> publish(String topic, ByteBuffer key, ByteBuffer value);

Subscription subscribe(String topic, String groupId, Optional<String> autoOffsetReset);

Expand All @@ -20,6 +20,16 @@ interface Subscription {
Publisher<? extends GroupedPublisher<Integer, Record>> getPublisher();
}

@Value
class OffsetInfo {

String topic;

int partition;

long offset;
}

@Value
class Record {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -47,7 +48,11 @@ public Mono<PublishReply> publish(Mono<PublishRequest> requestMono) {
request.getKey().asReadOnlyByteBuffer(),
request.getValue().asReadOnlyByteBuffer()
)))
.then(Mono.just(PublishReply.getDefaultInstance()));
.map(it -> PublishReply.newBuilder()
.setPartition(it.getPartition())
.setOffset(it.getOffset())
.build()
);
}

@Override
Expand Down Expand Up @@ -161,6 +166,15 @@ public Mono<Empty> ack(Mono<AckRequest> request) {
.log("ack", Level.WARNING, SignalType.ON_ERROR);
}

@Override
public Mono<GetOffsetsReply> getOffsets(Mono<GetOffsetsRequest> request) {
return request.flatMap(getOffsets -> Mono
.fromCompletionStage(positionsStorage.findAll(getOffsets.getTopic(), getOffsets.getGroup()))
.defaultIfEmpty(Collections.emptyMap())
.map(offsets -> GetOffsetsReply.newBuilder().putAllOffsets(offsets).build())
);
}

@Value
private static class StoredSubscription {

Expand Down
28 changes: 18 additions & 10 deletions app/src/test/java/com/github/bsideup/liiklus/AckTest.java
Original file line number Diff line number Diff line change
@@ -1,29 +1,21 @@
package com.github.bsideup.liiklus;

import com.github.bsideup.liiklus.positions.PositionsStorage;
import com.github.bsideup.liiklus.protocol.*;
import com.github.bsideup.liiklus.test.AbstractIntegrationTest;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;

public class AckTest extends AbstractIntegrationTest {

@Autowired
PositionsStorage positionsStorage;

SubscribeRequest subscribeRequest;

@Before
Expand Down Expand Up @@ -53,7 +45,15 @@ public void testManualAck() throws Exception {
.map(it -> it.getAssignment().getPartition())
.blockFirst(Duration.ofSeconds(30));

Map<Integer, Long> positions = positionsStorage.fetch(subscribeRequest.getTopic(), subscribeRequest.getGroup(), Sets.newHashSet(partition), emptyMap()).toCompletableFuture().get();
Map<Integer, Long> positions = stub
.getOffsets(Mono.just(GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(subscribeRequest.getGroup())
.build())
)
.map(GetOffsetsReply::getOffsetsMap)
.block(Duration.ofSeconds(10));

assertThat(positions)
.isNotNull()
.containsEntry(partition, 100L);
Expand All @@ -73,7 +73,15 @@ public void testAlwaysLatest() throws Exception {
.map(Assignment::getPartition)
.blockFirst(Duration.ofSeconds(10));

Map<Integer, Long> positions = positionsStorage.fetch(subscribeRequest.getTopic(), subscribeRequest.getGroup(), Sets.newHashSet(partition), emptyMap()).toCompletableFuture().get();
Map<Integer, Long> positions = stub
.getOffsets(Mono.just(GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(subscribeRequest.getGroup())
.build())
)
.map(GetOffsetsReply::getOffsetsMap)
.block(Duration.ofSeconds(10));

assertThat(positions)
.isNotNull()
.containsEntry(partition, 100L);
Expand Down
64 changes: 61 additions & 3 deletions app/src/test/java/com/github/bsideup/liiklus/PositionsTest.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.github.bsideup.liiklus;

import com.github.bsideup.liiklus.kafka.config.KafkaRecordsStorageConfiguration.KafkaProperties;
import com.github.bsideup.liiklus.protocol.PublishRequest;
import com.github.bsideup.liiklus.protocol.ReceiveRequest;
import com.github.bsideup.liiklus.protocol.SubscribeRequest;
import com.github.bsideup.liiklus.protocol.*;
import com.github.bsideup.liiklus.test.AbstractIntegrationTest;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
Expand All @@ -21,6 +19,7 @@

import java.time.Duration;
import java.util.HashMap;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -77,4 +76,63 @@ val record = stub
assertThat(record.getRecord().getOffset())
.isEqualTo(6);
}

@Test
public void testGetOffsets() throws Exception {
val key = UUID.randomUUID().toString();
val partition = getPartitionByKey(key);

val publishReply = stub.publish(Mono.just(
PublishRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setKey(ByteString.copyFromUtf8(key))
.setValue(ByteString.copyFromUtf8("bar"))
.build()
)).block(Duration.ofSeconds(10));

assertThat(publishReply)
.hasFieldOrPropertyWithValue("partition", partition);

val reportedOffset = publishReply.getOffset();

stub
.subscribe(Mono.just(subscribeRequest))
.filter(it -> it.getAssignment().getPartition() == partition)
.flatMap(it -> stub.receive(Mono.just(ReceiveRequest.newBuilder().setAssignment(it.getAssignment()).build()))
.map(ReceiveReply::getRecord)
.filter(record -> key.equals(record.getKey().toStringUtf8()))
.delayUntil(record -> stub.ack(Mono.just(AckRequest.newBuilder()
.setAssignment(it.getAssignment())
.setOffset(record.getOffset())
.build()
)))
)
.blockFirst(Duration.ofSeconds(10));

val getOffsetsReply = stub
.getOffsets(Mono.just(GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(subscribeRequest.getGroup())
.build())
)
.block(Duration.ofSeconds(10));

assertThat(getOffsetsReply.getOffsetsMap())
.containsEntry(partition, reportedOffset);
}

@Test
public void testGetEmptyOffsets() throws Exception {
val getOffsetsReply = stub
.getOffsets(Mono.just(GetOffsetsRequest.newBuilder()
.setTopic(subscribeRequest.getTopic())
.setGroup(UUID.randomUUID().toString())
.build())
)
.block(Duration.ofSeconds(10));

assertThat(getOffsetsReply.getOffsetsMap())
.isEmpty();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import reactor.core.publisher.SignalType;

import java.time.Duration;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
Expand Down Expand Up @@ -69,15 +72,12 @@ public void onSuccess(ScanRequest request, ScanResult scanResult) {
.map(item -> new Positions(
item.get("topic").getS(),
item.get("groupId").getS(),
item.get("positions").getM().entrySet().stream().collect(Collectors.toMap(
it -> Integer.parseInt(it.getKey()),
it -> Long.parseLong(it.getValue().getN())
))
toPositions(item)
));
}

@Override
public CompletionStage<Map<Integer, Long>> fetch(String topic, String groupId, Set<Integer> partitions, Map<Integer, Long> externalPositions) {
public CompletionStage<Map<Integer, Long>> findAll(String topic, String groupId) {
val request = new GetItemRequest()
.withTableName(tableName)
.withConsistentRead(true)
Expand All @@ -94,24 +94,13 @@ public void onError(Exception exception) {
@Override
public void onSuccess(GetItemRequest request, GetItemResult getItemResult) {
try {
Map<String, AttributeValue> item = getItemResult.getItem();

val positions = item != null ? item.get("positions").getM() : null;

sink.success(
partitions.stream().collect(Collectors.toMap(
it -> it,
it -> {
AttributeValue attributeValue = positions != null ? positions.get(it.toString()) : null;

if (attributeValue != null) {
return Long.parseLong(attributeValue.getN());
} else {
return externalPositions.get(it);
}
}
))
);
val positions = toPositions(getItemResult.getItem());

if (positions == null) {
sink.success();
} else {
sink.success(positions);
}
} catch (Exception e) {
sink.error(e);
}
Expand All @@ -125,6 +114,11 @@ public void onSuccess(GetItemRequest request, GetItemResult getItemResult) {
.toFuture();
}

@Override
public CompletionStage<Map<Integer, Long>> fetch(String topic, String groupId, Set<Integer> __) {
return findAll(topic, groupId);
}

@Override
public CompletionStage<Void> update(String topic, String groupId, int partition, long position) {
Map<String, AttributeValue> key = toKey(topic, groupId);
Expand Down Expand Up @@ -205,4 +199,22 @@ Map<String, AttributeValue> toKey(String topic, String groupId) {
result.put(RANGE_KEY_FIELD, new AttributeValue(groupId));
return result;
}

Map<Integer, Long> toPositions(Map<String, AttributeValue> item) {
if (item == null) {
return null;
}

AttributeValue positions = item.get("positions");

if (positions == null || positions.getM() == null) {
return null;
}

return positions.getM().entrySet().stream()
.collect(Collectors.toMap(
it -> Integer.parseInt(it.getKey()),
it -> Long.parseLong(it.getValue().getN())
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -36,18 +35,13 @@ public Publisher<Positions> findAll() {
}

@Override
public CompletionStage<Map<Integer, Long>> fetch(String topic, String groupId, Set<Integer> partitions, Map<Integer, Long> externalPositions) {
ConcurrentMap<Integer, Long> positions = storage.get(Key.of(topic, groupId));

if (positions == null) {
return CompletableFuture.completedFuture(externalPositions);
}

Map<Integer, Long> result = new HashMap<>();
result.putAll(externalPositions);
result.putAll(positions);
public CompletionStage<Map<Integer, Long>> findAll(String topic, String groupId) {
return CompletableFuture.completedFuture(storage.get(Key.of(topic, groupId)));
}

return CompletableFuture.completedFuture(result);
@Override
public CompletionStage<Map<Integer, Long>> fetch(String topic, String groupId, Set<Integer> __) {
return findAll(topic, groupId);
}

@Override
Expand Down
Loading

0 comments on commit 9c3742e

Please sign in to comment.