Skip to content

Commit

Permalink
Add "gateway" and "exporter" profiles
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed Mar 5, 2018
1 parent 36d2568 commit 7947e9f
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 13 deletions.
1 change: 1 addition & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ plugins {

dependencies {
compileOnly 'org.projectlombok:lombok'
compileOnly 'org.springframework.boot:spring-boot-starter'

compile 'org.reactivestreams:reactive-streams'

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.github.bsideup.liiklus.config;

import org.springframework.context.annotation.Profile;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Profile(ExporterProfile.PROFILE_NAME)
public @interface ExporterProfile {

String PROFILE_NAME = "exporter";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.github.bsideup.liiklus.config;

import org.springframework.context.annotation.Profile;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Profile(GatewayProfile.PROFILE_NAME)
public @interface GatewayProfile {

String PROFILE_NAME = "gateway";
}
13 changes: 12 additions & 1 deletion app/src/main/java/com/github/bsideup/liiklus/Application.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
package com.github.bsideup.liiklus;

import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

import java.util.Collections;

@Slf4j
@SpringBootApplication
public class Application {

public static void main(String[] args) throws Exception {
SpringApplication.run(Application.class, args).registerShutdownHook();
start(args);
}

public static ConfigurableApplicationContext start(String[] args) {
val application = new SpringApplication(Application.class);
application.setDefaultProperties(Collections.singletonMap("spring.profiles.active", "exporter,gateway"));

return application.run(args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.concurrent.TimeUnit;

@Configuration
@GatewayProfile
public class GRPCConfiguration extends GRpcServerBuilderConfigurer {
@Override
public void configure(ServerBuilder<?> serverBuilder) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.bsideup.liiklus.monitoring;

import com.github.bsideup.liiklus.config.ExporterProfile;
import com.github.bsideup.liiklus.positions.PositionsStorage;
import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
Expand All @@ -19,6 +20,7 @@
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true)
@Slf4j
@ExporterProfile
public class MetricsCollector extends Collector {

CollectorRegistry collectorRegistry;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.bsideup.liiklus.service;

import com.github.bsideup.liiklus.config.GatewayProfile;
import com.github.bsideup.liiklus.positions.PositionsStorage;
import com.github.bsideup.liiklus.protocol.*;
import com.github.bsideup.liiklus.records.RecordsStorage;
Expand Down Expand Up @@ -27,6 +28,7 @@
@FieldDefaults(makeFinal = true)
@Slf4j
@GRpcService
@GatewayProfile
public class ReactorLiiklusServiceImpl extends ReactorLiiklusServiceGrpc.LiiklusServiceImplBase {

ConcurrentMap<String, StoredSubscription> subscriptions = new ConcurrentHashMap<>();
Expand Down
94 changes: 94 additions & 0 deletions app/src/test/java/com/github/bsideup/liiklus/ProfilesTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.github.bsideup.liiklus;

import com.github.bsideup.liiklus.test.AbstractIntegrationTest;
import com.google.common.collect.Sets;
import lombok.val;
import org.assertj.core.api.AbstractThrowableAssert;
import org.junit.After;
import org.junit.Test;
import org.springframework.boot.context.properties.bind.validation.BindValidationException;
import org.springframework.context.ConfigurableApplicationContext;

import java.util.Collection;
import java.util.Set;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThatCode;

public class ProfilesTest extends AbstractIntegrationTest {

static Set<String> KAFKA_PROPERTIES = AbstractIntegrationTest.getKafkaProperties();

static Set<String> DYNAMODB_PROPERTIES = AbstractIntegrationTest.getDynamoDBProperties();

Set<String> commonArgs = Sets.newHashSet("server.port=0", "grpc.inProcessServerName=liiklus-profile-test");

ConfigurableApplicationContext lastApplicationContext;

@After
public void tearDown() throws Exception {
if (lastApplicationContext != null) {
lastApplicationContext.close();
}
}

@Test
public void testRequired() throws Exception {
assertThatAppWithProps(commonArgs)
.hasRootCauseInstanceOf(BindValidationException.class);

assertThatAppWithProps(commonArgs, KAFKA_PROPERTIES)
.hasRootCauseInstanceOf(BindValidationException.class);

assertThatAppWithProps(commonArgs, DYNAMODB_PROPERTIES)
.hasRootCauseInstanceOf(BindValidationException.class);

assertThatAppWithProps(commonArgs, KAFKA_PROPERTIES, DYNAMODB_PROPERTIES)
.doesNotThrowAnyException();
}

@Test
public void testExporterProfile() throws Exception {
commonArgs.add("spring.profiles.active=exporter");

assertThatAppWithProps(commonArgs)
.hasRootCauseInstanceOf(BindValidationException.class);

assertThatAppWithProps(commonArgs, DYNAMODB_PROPERTIES)
.doesNotThrowAnyException();
}

@Test
public void testGatewayProfile() throws Exception {
commonArgs.add("spring.profiles.active=gateway");

assertThatAppWithProps(commonArgs)
.hasRootCauseInstanceOf(BindValidationException.class);

assertThatAppWithProps(commonArgs, KAFKA_PROPERTIES)
.hasRootCauseInstanceOf(BindValidationException.class);

assertThatAppWithProps(commonArgs, DYNAMODB_PROPERTIES)
.hasRootCauseInstanceOf(BindValidationException.class);

assertThatAppWithProps(commonArgs, KAFKA_PROPERTIES, DYNAMODB_PROPERTIES)
.doesNotThrowAnyException();
}

@SafeVarargs
protected final AbstractThrowableAssert<?, ? extends Throwable> assertThatAppWithProps(Set<String>... props) {
if (lastApplicationContext != null) {
lastApplicationContext.close();
}

return assertThatCode(() -> {
val args = Stream.of(props)
.flatMap(Collection::stream)
.map(it -> "--" + it)
.toArray(String[]::new);

lastApplicationContext = Application.start(args);
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
import com.github.bsideup.liiklus.protocol.ReactorLiiklusServiceGrpc;
import com.github.bsideup.liiklus.protocol.ReactorLiiklusServiceGrpc.ReactorLiiklusServiceStub;
import com.github.bsideup.liiklus.test.support.LocalStackContainer;
import com.google.common.collect.Sets;
import io.grpc.inprocess.InProcessChannelBuilder;
import lombok.val;
import org.apache.kafka.common.utils.Utils;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
Expand All @@ -24,15 +28,15 @@
import java.util.stream.Stream;

@RunWith(SpringRunner.class)
@ActiveProfiles("test")
@ActiveProfiles(profiles = {"test", "exporter", "gateway"})
@SpringBootTest(
classes = {Application.class, TestConfiguration.class},
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
"grpc.enabled=false",
"grpc.inProcessServerName=liiklus",
}
)
@ContextConfiguration(initializers = AbstractIntegrationTest.Initializer.class)
public abstract class AbstractIntegrationTest {

public static final int NUM_PARTITIONS = 32;
Expand All @@ -49,18 +53,37 @@ public static int getPartitionByKey(String key) {
return Utils.toPositive(Utils.murmur2(key.getBytes())) % NUM_PARTITIONS;
}

static {
val localstack = new LocalStackContainer();
private static LocalStackContainer localstack = new LocalStackContainer();

val kafka = new KafkaContainer()
.withEnv("KAFKA_NUM_PARTITIONS", NUM_PARTITIONS + "");
private static KafkaContainer kafka = new KafkaContainer()
.withEnv("KAFKA_NUM_PARTITIONS", NUM_PARTITIONS + "");

static {
Stream.of(kafka, localstack).parallel().forEach(GenericContainer::start);

System.setProperty("kafka.bootstrapServers", kafka.getBootstrapServers());
System.setProperty("grpc.enabled", "false");
}

public static Set<String> getKafkaProperties() {
return Sets.newHashSet(
"kafka.bootstrapServers=" + kafka.getBootstrapServers()
);
}

System.setProperty("dynamodb.positionsTable", "positions-" + UUID.randomUUID());
System.getProperties().putAll(localstack.getProperties());
public static Set<String> getDynamoDBProperties() {
return Sets.union(
localstack.getProperties().entrySet().stream().map(it -> it.getKey() + "=" + it.getValue()).collect(Collectors.toSet()),
Sets.newHashSet("dynamodb.positionsTable=positions")
);
}

public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
TestPropertyValues.of(getKafkaProperties()).applyTo(applicationContext);
TestPropertyValues.of(getDynamoDBProperties()).applyTo(applicationContext);
}
}

@Rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder;
import com.github.bsideup.liiklus.config.ExporterProfile;
import com.github.bsideup.liiklus.config.GatewayProfile;
import com.github.bsideup.liiklus.dynamodb.DynamoDBPositionsStorage;
import lombok.Data;
import org.hibernate.validator.constraints.NotEmpty;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.annotation.Validated;

import javax.validation.constraints.NotEmpty;
import java.util.Optional;

@Configuration
@ExporterProfile
@GatewayProfile
@ConditionalOnProperty(value = "storage.positions.type", havingValue = "DYNAMODB")
@EnableConfigurationProperties(DynamoDBConfiguration.DynamoDBProperties.class)
public class DynamoDBConfiguration {
Expand Down Expand Up @@ -49,6 +54,7 @@ DynamoDBPositionsStorage dynamoDBPositionsStorage(AmazonDynamoDBAsync dynamoDB)

@Data
@ConfigurationProperties("dynamodb")
@Validated
public static class DynamoDBProperties {
Optional<String> endpoint = Optional.empty();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
package com.github.bsideup.liiklus.kafka.config;

import com.github.bsideup.liiklus.config.GatewayProfile;
import com.github.bsideup.liiklus.kafka.KafkaRecordsStorage;
import com.github.bsideup.liiklus.positions.PositionsStorage;
import lombok.Data;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteBufferSerializer;
import org.hibernate.validator.constraints.NotEmpty;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.annotation.Validated;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;

import javax.validation.constraints.NotEmpty;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Configuration
@GatewayProfile
@EnableConfigurationProperties(KafkaRecordsStorageConfiguration.KafkaProperties.class)
@ConditionalOnProperty(value = "storage.records.type", havingValue = "KAFKA")
public class KafkaRecordsStorageConfiguration {
Expand Down Expand Up @@ -54,6 +57,7 @@ KafkaRecordsStorage reactorKafkaSource() {

@Data
@ConfigurationProperties("kafka")
@Validated
public static class KafkaProperties {

@NotEmpty
Expand Down

0 comments on commit 7947e9f

Please sign in to comment.