Skip to content

Commit

Permalink
Add example plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed May 12, 2018
1 parent c928310 commit 106b302
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Envelope {
}

@Value
@Wither
class Record {

Envelope envelope;
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
id "com.google.protobuf" version "0.8.4" apply false
}

configure(subprojects.findAll { !it.name.startsWith("examples") }) {
configure(subprojects.findAll { !it.name.startsWith("examples/") }) {
apply plugin: 'io.spring.dependency-management'
apply plugin: 'maven'

Expand Down
63 changes: 63 additions & 0 deletions examples/plugin/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
plugins {
id "java"
id "io.spring.dependency-management"
}

jar {
baseName = "example-plugin"
manifest {
attributes(
'Plugin-Id': "example-plugin",
'Plugin-Version': "1.0.0",
)
}

into('lib') {
from configurations.compile
}
}

sourceSets.test.resources.srcDir jar.outputs.files.singleFile.parentFile
test {
dependsOn jar
}

repositories {
jcenter()
maven { url 'https://jitpack.io' }
}

dependencyManagement {
overriddenByDependencies = false

imports {
mavenBom 'org.springframework.boot:spring-boot-dependencies:2.0.0.RELEASE'
mavenBom 'org.testcontainers:testcontainers-bom:1.7.2'
}

dependencies {
dependency 'com.github.bsideup.liiklus:api:0.3.0'
dependency 'com.github.bsideup.liiklus:protocol:0.3.0'
dependency 'io.grpc:grpc-netty:1.11.0'

dependency 'org.projectlombok:lombok:1.16.8'
dependency 'com.google.auto.service:auto-service:1.0-rc4'
}
}

dependencies {
compileOnly 'org.projectlombok:lombok'
compileOnly "com.google.auto.service:auto-service"

compileOnly "com.github.bsideup.liiklus:api"
compileOnly 'org.springframework.boot:spring-boot-starter'
compileOnly 'io.projectreactor:reactor-core'

compile 'org.apache.commons:commons-lang3:3.7'

testCompile 'org.testcontainers:kafka'
testCompile 'ch.qos.logback:logback-classic:'
testCompile "com.github.bsideup.liiklus:protocol"
testCompile "io.grpc:grpc-netty"
testCompile 'org.assertj:assertj-core'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.github.bsideup.liiklus.plugins.example;

import com.github.bsideup.liiklus.records.RecordPostProcessor;
import com.github.bsideup.liiklus.records.RecordsStorage.Record;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.nio.ByteBuffer;

public class ExampleRecordPostProcessor implements RecordPostProcessor {
@Override
public Publisher<Record> postProcess(Publisher<Record> publisher) {
return Flux.from(publisher)
.map(record -> {
String key = new String(record.getEnvelope().getKey().array());
if ("maskMe".equals(key)) {
return new Record(
record.getEnvelope().withValue(ByteBuffer.wrap("**masked**".getBytes())),
record.getTimestamp(),
record.getPartition(),
record.getOffset()
);
} else {
return record;
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.github.bsideup.liiklus.plugins.example;

import com.github.bsideup.liiklus.records.RecordPreProcessor;
import com.github.bsideup.liiklus.records.RecordsStorage.Envelope;
import org.apache.commons.lang3.StringUtils;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class ExampleRecordPreProcessor implements RecordPreProcessor {

@Override
public CompletionStage<Envelope> preProcess(Envelope envelope) {
String value = StandardCharsets.UTF_8.decode(envelope.getValue()).toString();

String reversed = StringUtils.reverse(value);
return CompletableFuture.completedFuture(
envelope.withValue(ByteBuffer.wrap(reversed.getBytes()))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.github.bsideup.liiklus.plugins.example.config;

import com.github.bsideup.liiklus.config.LiiklusConfiguration;
import com.github.bsideup.liiklus.plugins.example.ExampleRecordPostProcessor;
import com.github.bsideup.liiklus.plugins.example.ExampleRecordPreProcessor;
import com.google.auto.service.AutoService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@AutoService(LiiklusConfiguration.class)
@Configuration
public class ExamplePluginConfiguration implements LiiklusConfiguration {

@Bean
ExampleRecordPreProcessor encryptionRecordPreProcessor() {
return new ExampleRecordPreProcessor();
}

@Bean
ExampleRecordPostProcessor exampleRecordPostProcessor() {
return new ExampleRecordPostProcessor();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.github.bsideup.liiklus.plugins.example;

import com.github.bsideup.liiklus.protocol.ReceiveReply.Record;
import com.github.bsideup.liiklus.plugins.example.support.AbstractIntegrationTest;
import org.junit.Test;

import java.time.Duration;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

public class SmokeTest extends AbstractIntegrationTest {

@Test
public void testPreProcessor() {
String key = UUID.randomUUID().toString();

publishRecord(key, "Hello!");

Record record = receiveRecords(key).blockFirst(Duration.ofSeconds(10));

assertThat(record).isNotNull().satisfies(it -> {
assertThat(it.getValue().toStringUtf8()).isEqualTo("!olleH");
});
}

@Test
public void testPostProcessor() {
String key = "maskMe";

publishRecord(key, "Hello!");

Record record = receiveRecords(key).blockFirst(Duration.ofSeconds(10));

assertThat(record).isNotNull().satisfies(it -> {
assertThat(it.getValue().toStringUtf8()).isEqualTo("**masked**");
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.github.bsideup.liiklus.plugins.example.support;

import com.github.bsideup.liiklus.protocol.*;
import com.github.bsideup.liiklus.protocol.ReactorLiiklusServiceGrpc.ReactorLiiklusServiceStub;
import com.github.bsideup.liiklus.protocol.ReceiveReply.Record;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.ToStringConsumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.UUID;
import java.util.stream.Stream;

public abstract class AbstractIntegrationTest {

protected static final ReactorLiiklusServiceStub stub;

static {
KafkaContainer kafka = new KafkaContainer();

GenericContainer liiklus = new GenericContainer("bsideup/liiklus:0.3.0")
.withNetwork(kafka.getNetwork())
.withEnv("storage_positions_type", "MEMORY")
.withEnv("kafka_bootstrapServers", kafka.getNetworkAliases().get(0) + ":9093")
.withExposedPorts(6565)
.withClasspathResourceMapping("/example-plugin.jar", "/app/plugins/example-plugin.jar", BindMode.READ_ONLY)
.withLogConsumer(new ToStringConsumer() {
@Override
public void accept(OutputFrame outputFrame) {
System.out.print("\uD83D\uDEA6 " + outputFrame.getUtf8String());
}
});

Stream.of(kafka, liiklus).parallel().forEach(GenericContainer::start);

ManagedChannel channel = NettyChannelBuilder.forAddress(liiklus.getContainerIpAddress(), liiklus.getMappedPort(6565))
.usePlaintext()
.build();

stub = ReactorLiiklusServiceGrpc.newReactorStub(channel);
}

protected String topic = "test-topic-" + UUID.randomUUID();

protected PublishReply publishRecord(String key, String value) {
return stub.publish(Mono.just(PublishRequest.newBuilder()
.setTopic(topic)
.setKey(ByteString.copyFromUtf8(key))
.setValue(ByteString.copyFromUtf8(value))
.build()
)).block(Duration.ofSeconds(10));
}

protected Flux<Record> receiveRecords(String key) {
return stub
.subscribe(Mono.just(SubscribeRequest.newBuilder()
.setTopic(topic)
.setGroup(UUID.randomUUID().toString())
.setAutoOffsetReset(SubscribeRequest.AutoOffsetReset.EARLIEST)
.build()
))
.flatMap(it -> stub.receive(Mono.just(ReceiveRequest.newBuilder().setAssignment(it.getAssignment()).build())))
.map(ReceiveReply::getRecord)
.filter(it -> key.equals(it.getKey().toStringUtf8()));
}
}
11 changes: 11 additions & 0 deletions examples/plugin/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ include 'app'
include 'protocol'

include 'examples/java'
include 'examples/plugin'

file('plugins').eachDir { dir ->
def projectName = dir.name
Expand Down

0 comments on commit 106b302

Please sign in to comment.