Skip to content

Commit

Permalink
Make it possible to pass the raw value to the next envelope (to be us…
Browse files Browse the repository at this point in the history
…ed in processors) (#12)
  • Loading branch information
bsideup committed Jul 31, 2018
1 parent 7a14b88 commit c33fde2
Showing 1 changed file with 74 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.github.bsideup.liiklus.records;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.experimental.Wither;
import org.reactivestreams.Publisher;
Expand All @@ -8,6 +10,7 @@
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Stream;

public interface RecordsStorage {
Expand All @@ -32,14 +35,81 @@ class OffsetInfo {
}

@Value
@Wither
@RequiredArgsConstructor
class Envelope {

String topic;

ByteBuffer key;

ByteBuffer value;
Object rawKey;

Function<Object, ByteBuffer> keyEncoder;

@Getter(lazy = true)
ByteBuffer key = keyEncoder.apply(rawKey);

Object rawValue;

Function<Object, ByteBuffer> valueEncoder;

@Getter(lazy = true)
ByteBuffer value = valueEncoder.apply(rawValue);

public Envelope(String topic, ByteBuffer key, ByteBuffer value) {
this.topic = topic;
this.rawKey = key;
this.rawValue = value;
this.keyEncoder = this.valueEncoder = it -> (ByteBuffer) it;
}

public Envelope withTopic(String topic) {
return new Envelope(
topic,
rawKey,
keyEncoder,
rawValue,
valueEncoder
);
}

public Envelope withKey(ByteBuffer key) {
return new Envelope(
topic,
key,
it -> (ByteBuffer) it,
rawValue,
valueEncoder
);
}

public Envelope withValue(ByteBuffer value) {
return new Envelope(
topic,
rawKey,
keyEncoder,
value,
it -> (ByteBuffer) it
);
}

public <T> Envelope withKey(T rawKey, Function<T, ByteBuffer> keyEncoder) {
return new Envelope(
topic,
rawKey,
(Function<Object, ByteBuffer>) keyEncoder,
rawValue,
valueEncoder
);
}

public <T> Envelope withValue(T rawValue, Function<T, ByteBuffer> valueEncoder) {
return new Envelope(
topic,
rawKey,
keyEncoder,
rawValue,
(Function<Object, ByteBuffer>) valueEncoder
);
}
}

@Value
Expand Down

0 comments on commit c33fde2

Please sign in to comment.