Skip to content

Commit

Permalink
implement external offsets storage, modular structure
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed Feb 23, 2018
1 parent 26c21e0 commit 69324c4
Show file tree
Hide file tree
Showing 23 changed files with 656 additions and 227 deletions.
11 changes: 11 additions & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
plugins {
id "java"
}

dependencies {
compileOnly 'org.projectlombok:lombok'

compile 'org.reactivestreams:reactive-streams'

testCompileOnly 'org.projectlombok:lombok'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.github.bsideup.liiklus.positions;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;

public interface PositionsStorage {

CompletionStage<Map<Integer, Long>> fetch(String topic, String groupId, Set<Integer> partitions, Map<Integer, Long> externalPositions);

CompletionStage<Void> update(String topic, String groupId, int partition, long position);
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
package com.github.bsideup.liiklus.source;
package com.github.bsideup.liiklus.records;

import lombok.Value;
import lombok.experimental.Delegate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

public interface KafkaSource {
public interface RecordsStorage {

Mono<Void> publish(String topic, ByteBuffer key, ByteBuffer value);
CompletionStage<Void> publish(String topic, ByteBuffer key, ByteBuffer value);

Subscription subscribe(Map<String, Object> props, String topic);
Subscription subscribe(String topic, String groupId, Optional<String> autoOffsetReset);

interface Subscription {

Publisher<? extends GroupedPublisher<Integer, KafkaRecord>> getPublisher();

Mono<Void> acknowledge(int partition, long offset);
Publisher<? extends GroupedPublisher<Integer, Record>> getPublisher();
}

@Value
class KafkaRecord {
class Record {

ByteBuffer key;

Expand Down
9 changes: 7 additions & 2 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,25 @@ test.testLogging {
dependencies {
compileOnly 'org.projectlombok:lombok'

compile project(":api")
compile project(":protocol")

runtime project(":kafka-records-storage")
runtime project(":dynamodb-positions-storage")

compile 'org.lognet:grpc-spring-boot-starter'
compile 'io.grpc:grpc-netty'
compile 'io.grpc:grpc-core'

compile 'org.springframework.boot:spring-boot-starter-actuator'
compile 'org.springframework.boot:spring-boot-starter-web'

compile 'io.projectreactor.kafka:reactor-kafka'

testCompileOnly 'org.projectlombok:lombok'
testCompile 'org.springframework.boot:spring-boot-starter-test'
testCompile 'org.testcontainers:kafka'
testCompile 'org.assertj:assertj-core'
testCompile 'org.awaitility:awaitility'

testCompile project(":kafka-records-storage")
testCompile project(":dynamodb-positions-storage")
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package com.github.bsideup.liiklus.service;

import com.github.bsideup.liiklus.positions.PositionsStorage;
import com.github.bsideup.liiklus.protocol.*;
import com.github.bsideup.liiklus.protocol.ReceiveReply.Record;
import com.github.bsideup.liiklus.source.KafkaSource;
import com.github.bsideup.liiklus.source.KafkaSource.KafkaRecord;
import com.github.bsideup.liiklus.source.KafkaSource.Subscription;
import com.github.bsideup.liiklus.records.RecordsStorage;
import com.github.bsideup.liiklus.records.RecordsStorage.Record;
import com.github.bsideup.liiklus.records.RecordsStorage.Subscription;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.Timestamp;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.lognet.springboot.grpc.GRpcService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -30,77 +29,76 @@
@GRpcService
public class ReactorLiiklusServiceImpl extends ReactorLiiklusServiceGrpc.LiiklusServiceImplBase {

ConcurrentMap<String, Subscription> subscriptions = new ConcurrentHashMap<>();
ConcurrentMap<String, StoredSubscription> subscriptions = new ConcurrentHashMap<>();

ConcurrentMap<String, ConcurrentMap<Integer, Flux<KafkaRecord>>> sources = new ConcurrentHashMap<>();
ConcurrentMap<String, ConcurrentMap<Integer, Flux<Record>>> sources = new ConcurrentHashMap<>();

KafkaSource kafkaSource;
RecordsStorage recordsStorage;

PositionsStorage positionsStorage;

@Override
public Mono<PublishReply> publish(Mono<PublishRequest> requestMono) {
return requestMono
.flatMap(request -> kafkaSource.publish(
.flatMap(request -> Mono.fromCompletionStage(recordsStorage.publish(
request.getTopic(),
request.getKey().asReadOnlyByteBuffer(),
request.getValue().asReadOnlyByteBuffer()
))
)))
.then(Mono.just(PublishReply.getDefaultInstance()));
}

@Override
public Flux<SubscribeReply> subscribe(Mono<SubscribeRequest> requestFlux) {
return requestFlux
.flatMapMany(subscribe -> {
Map<String, Object> props = new HashMap<>();
String groupId = subscribe.getGroup();
String topic = subscribe.getTopic();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

Optional<String> autoOffsetReset;
switch (subscribe.getAutoOffsetReset()) {
case EARLIEST:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
autoOffsetReset = Optional.of("earliest");
break;
case LATEST:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
autoOffsetReset = Optional.of("latest");
break;
case UNRECOGNIZED:
// ignore
default:
autoOffsetReset = Optional.empty();
}

Subscription subscription = kafkaSource.subscribe(props, topic);
Subscription subscription = recordsStorage.subscribe(topic, groupId, autoOffsetReset);

String sessionId = UUID.randomUUID().toString();

subscriptions.put(sessionId, subscription);
subscriptions.put(sessionId, new StoredSubscription(subscription, topic, groupId));

ConcurrentMap<Integer, Flux<Record>> sourcesByPartition = sources
.computeIfAbsent(sessionId, __ -> new ConcurrentHashMap<>());

return Flux.from(subscription.getPublisher())
.<SubscribeReply>handle((group, sink) -> {
.map(group -> {
int partition = group.getGroup();

ConcurrentMap<Integer, Flux<KafkaRecord>> sourcesByPartition = sources
.computeIfAbsent(sessionId, __ -> new ConcurrentHashMap<>());

sourcesByPartition.put(
partition,
Flux.from(group)
.log("partition-" + partition, Level.WARNING, SignalType.ON_ERROR)
.retry()
.doFinally(__ -> {
sourcesByPartition.remove(partition);
sink.complete();
})
.doFinally(__ -> sourcesByPartition.remove(partition))
);

sink.next(
SubscribeReply.newBuilder()
.setAssignment(Assignment.newBuilder()
.setPartition(partition)
.setSessionId(sessionId)
)
.build()
);
return SubscribeReply.newBuilder()
.setAssignment(Assignment.newBuilder()
.setPartition(partition)
.setSessionId(sessionId)
)
.build();
})
.doFinally(__ -> subscriptions.remove(sessionId, subscription));
.doFinally(__ -> {
sources.remove(sessionId, sourcesByPartition);
subscriptions.remove(sessionId, subscription);
});
})
.log("subscribe", Level.WARNING, SignalType.ON_ERROR);
}
Expand All @@ -114,17 +112,17 @@ public Flux<ReceiveReply> receive(Mono<ReceiveRequest> requestMono) {
// TODO auto ack to the last known offset
long lastKnownOffset = request.getLastKnownOffset();

Flux<KafkaRecord> source = sources.get(sessionId).get(partition);
Flux<Record> source = sources.get(sessionId).get(partition);

if (source == null) {
log.warn("Source is null, returning empty Publisher. Request: {}", request);
log.warn("Source is null, returning empty Publisher. Request: {}", request.toString().replace("\n", "\\n"));
return Mono.empty();
}

return source
.map(consumerRecord -> ReceiveReply.newBuilder()
.setRecord(
Record.newBuilder()
ReceiveReply.Record.newBuilder()
.setOffset(consumerRecord.getOffset())
.setKey(ByteString.copyFrom(consumerRecord.getKey()))
.setValue(ByteString.copyFrom(consumerRecord.getValue()))
Expand All @@ -143,16 +141,31 @@ public Flux<ReceiveReply> receive(Mono<ReceiveRequest> requestMono) {
public Mono<Empty> ack(Mono<AckRequest> request) {
return request
.flatMap(ack -> {
Subscription subscription = subscriptions.get(ack.getAssignment().getSessionId());
StoredSubscription subscription = subscriptions.get(ack.getAssignment().getSessionId());

if (subscription == null) {
log.warn("Subscription is null, returning empty Publisher. Request: {}", ack);
log.warn("Subscription is null, returning empty Publisher. Request: {}", ack.toString().replace("\n", "\\n"));
return Mono.empty();
}

return subscription.acknowledge(ack.getAssignment().getPartition(), ack.getOffset());
return Mono.fromCompletionStage(positionsStorage.update(
subscription.getTopic(),
subscription.getGroupId(),
ack.getAssignment().getPartition(),
ack.getOffset()
));
})
.then(Mono.just(Empty.getDefaultInstance()))
.log("ack", Level.WARNING, SignalType.ON_ERROR);
}

@Value
private static class StoredSubscription {

Subscription subscription;

String topic;

String groupId;
}
}

This file was deleted.

1 change: 1 addition & 0 deletions app/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@

<logger name="org.apache.kafka.clients.consumer" level="WARN"/>
<logger name="org.apache.kafka.clients.producer" level="WARN"/>
<!--logger name="reactor.kafka.receiver.internals.DefaultKafkaReceiver" level="TRACE"/-->

</configuration>
Loading

0 comments on commit 69324c4

Please sign in to comment.