diff --git a/api/build.gradle b/api/build.gradle index c884fcab..05471939 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -4,6 +4,7 @@ plugins { dependencies { compileOnly 'org.projectlombok:lombok' + compileOnly 'org.springframework.boot:spring-boot-starter' compile 'org.reactivestreams:reactive-streams' diff --git a/api/src/main/java/com/github/bsideup/liiklus/config/ExporterProfile.java b/api/src/main/java/com/github/bsideup/liiklus/config/ExporterProfile.java new file mode 100644 index 00000000..984e2d2a --- /dev/null +++ b/api/src/main/java/com/github/bsideup/liiklus/config/ExporterProfile.java @@ -0,0 +1,16 @@ +package com.github.bsideup.liiklus.config; + +import org.springframework.context.annotation.Profile; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Profile(ExporterProfile.PROFILE_NAME) +public @interface ExporterProfile { + + String PROFILE_NAME = "exporter"; +} diff --git a/api/src/main/java/com/github/bsideup/liiklus/config/GatewayProfile.java b/api/src/main/java/com/github/bsideup/liiklus/config/GatewayProfile.java new file mode 100644 index 00000000..2316ef99 --- /dev/null +++ b/api/src/main/java/com/github/bsideup/liiklus/config/GatewayProfile.java @@ -0,0 +1,16 @@ +package com.github.bsideup.liiklus.config; + +import org.springframework.context.annotation.Profile; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Profile(GatewayProfile.PROFILE_NAME) +public @interface GatewayProfile { + + String PROFILE_NAME = "gateway"; +} diff --git a/app/src/main/java/com/github/bsideup/liiklus/Application.java b/app/src/main/java/com/github/bsideup/liiklus/Application.java index 52c06217..cc9ba30b 100644 --- a/app/src/main/java/com/github/bsideup/liiklus/Application.java +++ b/app/src/main/java/com/github/bsideup/liiklus/Application.java @@ -1,14 +1,25 @@ package com.github.bsideup.liiklus; import lombok.extern.slf4j.Slf4j; +import lombok.val; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; + +import java.util.Collections; @Slf4j @SpringBootApplication public class Application { public static void main(String[] args) throws Exception { - SpringApplication.run(Application.class, args).registerShutdownHook(); + start(args); + } + + public static ConfigurableApplicationContext start(String[] args) { + val application = new SpringApplication(Application.class); + application.setDefaultProperties(Collections.singletonMap("spring.profiles.active", "exporter,gateway")); + + return application.run(args); } } diff --git a/app/src/main/java/com/github/bsideup/liiklus/config/GRPCConfiguration.java b/app/src/main/java/com/github/bsideup/liiklus/config/GRPCConfiguration.java index f15c2622..0f755e23 100644 --- a/app/src/main/java/com/github/bsideup/liiklus/config/GRPCConfiguration.java +++ b/app/src/main/java/com/github/bsideup/liiklus/config/GRPCConfiguration.java @@ -8,6 +8,7 @@ import java.util.concurrent.TimeUnit; @Configuration +@GatewayProfile public class GRPCConfiguration extends GRpcServerBuilderConfigurer { @Override public void configure(ServerBuilder serverBuilder) { diff --git a/app/src/main/java/com/github/bsideup/liiklus/monitoring/MetricsCollector.java b/app/src/main/java/com/github/bsideup/liiklus/monitoring/MetricsCollector.java index ca6a6743..39e8cd86 100644 --- a/app/src/main/java/com/github/bsideup/liiklus/monitoring/MetricsCollector.java +++ b/app/src/main/java/com/github/bsideup/liiklus/monitoring/MetricsCollector.java @@ -1,5 +1,6 @@ package com.github.bsideup.liiklus.monitoring; +import com.github.bsideup.liiklus.config.ExporterProfile; import com.github.bsideup.liiklus.positions.PositionsStorage; import io.prometheus.client.Collector; import io.prometheus.client.CollectorRegistry; @@ -19,6 +20,7 @@ @RequiredArgsConstructor @FieldDefaults(makeFinal = true) @Slf4j +@ExporterProfile public class MetricsCollector extends Collector { CollectorRegistry collectorRegistry; diff --git a/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java b/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java index b3c9394a..7030238c 100644 --- a/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java +++ b/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java @@ -1,5 +1,6 @@ package com.github.bsideup.liiklus.service; +import com.github.bsideup.liiklus.config.GatewayProfile; import com.github.bsideup.liiklus.positions.PositionsStorage; import com.github.bsideup.liiklus.protocol.*; import com.github.bsideup.liiklus.records.RecordsStorage; @@ -27,6 +28,7 @@ @FieldDefaults(makeFinal = true) @Slf4j @GRpcService +@GatewayProfile public class ReactorLiiklusServiceImpl extends ReactorLiiklusServiceGrpc.LiiklusServiceImplBase { ConcurrentMap subscriptions = new ConcurrentHashMap<>(); diff --git a/app/src/test/java/com/github/bsideup/liiklus/ProfilesTest.java b/app/src/test/java/com/github/bsideup/liiklus/ProfilesTest.java new file mode 100644 index 00000000..be489968 --- /dev/null +++ b/app/src/test/java/com/github/bsideup/liiklus/ProfilesTest.java @@ -0,0 +1,94 @@ +package com.github.bsideup.liiklus; + +import com.github.bsideup.liiklus.test.AbstractIntegrationTest; +import com.google.common.collect.Sets; +import lombok.val; +import org.assertj.core.api.AbstractThrowableAssert; +import org.junit.After; +import org.junit.Test; +import org.springframework.boot.context.properties.bind.validation.BindValidationException; +import org.springframework.context.ConfigurableApplicationContext; + +import java.util.Collection; +import java.util.Set; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThatCode; + +public class ProfilesTest extends AbstractIntegrationTest { + + static Set KAFKA_PROPERTIES = AbstractIntegrationTest.getKafkaProperties(); + + static Set DYNAMODB_PROPERTIES = AbstractIntegrationTest.getDynamoDBProperties(); + + Set commonArgs = Sets.newHashSet("server.port=0", "grpc.inProcessServerName=liiklus-profile-test"); + + ConfigurableApplicationContext lastApplicationContext; + + @After + public void tearDown() throws Exception { + if (lastApplicationContext != null) { + lastApplicationContext.close(); + } + } + + @Test + public void testRequired() throws Exception { + assertThatAppWithProps(commonArgs) + .hasRootCauseInstanceOf(BindValidationException.class); + + assertThatAppWithProps(commonArgs, KAFKA_PROPERTIES) + .hasRootCauseInstanceOf(BindValidationException.class); + + assertThatAppWithProps(commonArgs, DYNAMODB_PROPERTIES) + .hasRootCauseInstanceOf(BindValidationException.class); + + assertThatAppWithProps(commonArgs, KAFKA_PROPERTIES, DYNAMODB_PROPERTIES) + .doesNotThrowAnyException(); + } + + @Test + public void testExporterProfile() throws Exception { + commonArgs.add("spring.profiles.active=exporter"); + + assertThatAppWithProps(commonArgs) + .hasRootCauseInstanceOf(BindValidationException.class); + + assertThatAppWithProps(commonArgs, DYNAMODB_PROPERTIES) + .doesNotThrowAnyException(); + } + + @Test + public void testGatewayProfile() throws Exception { + commonArgs.add("spring.profiles.active=gateway"); + + assertThatAppWithProps(commonArgs) + .hasRootCauseInstanceOf(BindValidationException.class); + + assertThatAppWithProps(commonArgs, KAFKA_PROPERTIES) + .hasRootCauseInstanceOf(BindValidationException.class); + + assertThatAppWithProps(commonArgs, DYNAMODB_PROPERTIES) + .hasRootCauseInstanceOf(BindValidationException.class); + + assertThatAppWithProps(commonArgs, KAFKA_PROPERTIES, DYNAMODB_PROPERTIES) + .doesNotThrowAnyException(); + } + + @SafeVarargs + protected final AbstractThrowableAssert assertThatAppWithProps(Set... props) { + if (lastApplicationContext != null) { + lastApplicationContext.close(); + } + + return assertThatCode(() -> { + val args = Stream.of(props) + .flatMap(Collection::stream) + .map(it -> "--" + it) + .toArray(String[]::new); + + lastApplicationContext = Application.start(args); + }); + } + +} diff --git a/app/src/test/java/com/github/bsideup/liiklus/test/AbstractIntegrationTest.java b/app/src/test/java/com/github/bsideup/liiklus/test/AbstractIntegrationTest.java index e9c4b4ea..730a23bf 100644 --- a/app/src/test/java/com/github/bsideup/liiklus/test/AbstractIntegrationTest.java +++ b/app/src/test/java/com/github/bsideup/liiklus/test/AbstractIntegrationTest.java @@ -4,14 +4,18 @@ import com.github.bsideup.liiklus.protocol.ReactorLiiklusServiceGrpc; import com.github.bsideup.liiklus.protocol.ReactorLiiklusServiceGrpc.ReactorLiiklusServiceStub; import com.github.bsideup.liiklus.test.support.LocalStackContainer; +import com.google.common.collect.Sets; import io.grpc.inprocess.InProcessChannelBuilder; -import lombok.val; import org.apache.kafka.common.utils.Utils; import org.junit.Rule; import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.util.TestPropertyValues; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; @@ -24,15 +28,15 @@ import java.util.stream.Stream; @RunWith(SpringRunner.class) -@ActiveProfiles("test") +@ActiveProfiles(profiles = {"test", "exporter", "gateway"}) @SpringBootTest( classes = {Application.class, TestConfiguration.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { - "grpc.enabled=false", "grpc.inProcessServerName=liiklus", } ) +@ContextConfiguration(initializers = AbstractIntegrationTest.Initializer.class) public abstract class AbstractIntegrationTest { public static final int NUM_PARTITIONS = 32; @@ -49,18 +53,37 @@ public static int getPartitionByKey(String key) { return Utils.toPositive(Utils.murmur2(key.getBytes())) % NUM_PARTITIONS; } - static { - val localstack = new LocalStackContainer(); + private static LocalStackContainer localstack = new LocalStackContainer(); - val kafka = new KafkaContainer() - .withEnv("KAFKA_NUM_PARTITIONS", NUM_PARTITIONS + ""); + private static KafkaContainer kafka = new KafkaContainer() + .withEnv("KAFKA_NUM_PARTITIONS", NUM_PARTITIONS + ""); + static { Stream.of(kafka, localstack).parallel().forEach(GenericContainer::start); - System.setProperty("kafka.bootstrapServers", kafka.getBootstrapServers()); + System.setProperty("grpc.enabled", "false"); + } + + public static Set getKafkaProperties() { + return Sets.newHashSet( + "kafka.bootstrapServers=" + kafka.getBootstrapServers() + ); + } - System.setProperty("dynamodb.positionsTable", "positions-" + UUID.randomUUID()); - System.getProperties().putAll(localstack.getProperties()); + public static Set getDynamoDBProperties() { + return Sets.union( + localstack.getProperties().entrySet().stream().map(it -> it.getKey() + "=" + it.getValue()).collect(Collectors.toSet()), + Sets.newHashSet("dynamodb.positionsTable=positions") + ); + } + + public static class Initializer implements ApplicationContextInitializer { + + @Override + public void initialize(ConfigurableApplicationContext applicationContext) { + TestPropertyValues.of(getKafkaProperties()).applyTo(applicationContext); + TestPropertyValues.of(getDynamoDBProperties()).applyTo(applicationContext); + } } @Rule diff --git a/dynamodb-positions-storage/src/main/java/com/github/bsideup/liiklus/dynamodb/config/DynamoDBConfiguration.java b/dynamodb-positions-storage/src/main/java/com/github/bsideup/liiklus/dynamodb/config/DynamoDBConfiguration.java index 90d5128e..e7116d61 100644 --- a/dynamodb-positions-storage/src/main/java/com/github/bsideup/liiklus/dynamodb/config/DynamoDBConfiguration.java +++ b/dynamodb-positions-storage/src/main/java/com/github/bsideup/liiklus/dynamodb/config/DynamoDBConfiguration.java @@ -5,19 +5,24 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClient; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClientBuilder; +import com.github.bsideup.liiklus.config.ExporterProfile; +import com.github.bsideup.liiklus.config.GatewayProfile; import com.github.bsideup.liiklus.dynamodb.DynamoDBPositionsStorage; import lombok.Data; -import org.hibernate.validator.constraints.NotEmpty; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.validation.annotation.Validated; +import javax.validation.constraints.NotEmpty; import java.util.Optional; @Configuration +@ExporterProfile +@GatewayProfile @ConditionalOnProperty(value = "storage.positions.type", havingValue = "DYNAMODB") @EnableConfigurationProperties(DynamoDBConfiguration.DynamoDBProperties.class) public class DynamoDBConfiguration { @@ -49,6 +54,7 @@ DynamoDBPositionsStorage dynamoDBPositionsStorage(AmazonDynamoDBAsync dynamoDB) @Data @ConfigurationProperties("dynamodb") + @Validated public static class DynamoDBProperties { Optional endpoint = Optional.empty(); diff --git a/kafka-records-storage/src/main/java/com/github/bsideup/liiklus/kafka/config/KafkaRecordsStorageConfiguration.java b/kafka-records-storage/src/main/java/com/github/bsideup/liiklus/kafka/config/KafkaRecordsStorageConfiguration.java index e4f0d1d3..c3b8dde5 100644 --- a/kafka-records-storage/src/main/java/com/github/bsideup/liiklus/kafka/config/KafkaRecordsStorageConfiguration.java +++ b/kafka-records-storage/src/main/java/com/github/bsideup/liiklus/kafka/config/KafkaRecordsStorageConfiguration.java @@ -1,26 +1,29 @@ package com.github.bsideup.liiklus.kafka.config; +import com.github.bsideup.liiklus.config.GatewayProfile; import com.github.bsideup.liiklus.kafka.KafkaRecordsStorage; import com.github.bsideup.liiklus.positions.PositionsStorage; import lombok.Data; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteBufferSerializer; -import org.hibernate.validator.constraints.NotEmpty; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.validation.annotation.Validated; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; +import javax.validation.constraints.NotEmpty; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.UUID; @Configuration +@GatewayProfile @EnableConfigurationProperties(KafkaRecordsStorageConfiguration.KafkaProperties.class) @ConditionalOnProperty(value = "storage.records.type", havingValue = "KAFKA") public class KafkaRecordsStorageConfiguration { @@ -54,6 +57,7 @@ KafkaRecordsStorage reactorKafkaSource() { @Data @ConfigurationProperties("kafka") + @Validated public static class KafkaProperties { @NotEmpty