Skip to content

Commit

Permalink
Fixes for pull request review
Browse files Browse the repository at this point in the history
  • Loading branch information
Schwartz-Matthew-bah committed Feb 1, 2019
1 parent af3ddca commit 10bd454
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 70 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# jpo-sdw-depositor [![Build Status](https://travis-ci.org/usdot-jpo-ode/jpo-sdw-depositor.svg?branch=dev)](https://travis-ci.org/usdot-jpo-ode/jpo-sdw-depositor) [![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=usdot.jpo.ode%3Ajpo-sdw-depositor%3Adev&metric=alert_status)](https://sonarcloud.io/dashboard?id=usdot.jpo.ode%3Ajpo-sdw-depositor%3Adev)

Subscribes to a Kafka topic and deposits messages to the SDW.
Subscribes to a Kafka topic and deposits messages to the Situation Data Warehouse (SDW).

# Overview

This is a submodule of the [jpo-ode](https://github.com/usdot-jpo-ode/jpo-ode) repository. It subscribes to a Kafka topic and listens for incoming messages. Upon message arrival, this application deposits it over REST to the Situation Data Warehouse.
This is a submodule of the [jpo-ode](https://github.com/usdot-jpo-ode/jpo-ode) repository. It subscribes to a Kafka topic and listens for incoming messages. Upon message arrival, this application deposits it over REST to the SDW.

# Installation and Operation

Expand Down
44 changes: 27 additions & 17 deletions src/main/java/jpo/sdw/depositor/DepositorProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,26 @@ public class DepositorProperties implements EnvironmentAware {
private static final Logger logger = LoggerFactory.getLogger(DepositorProperties.class);

private static final String DEFAULT_GROUP_ID = "usdot.jpo.sdw";

private static final String DEFAULT_KAFKA_PORT = "9092";
private static final String DEFAULT_KAFKA_SUBSCRIPTION_TOPIC = "topic.J2735TimBroadcastJson";

private static final String DEFAULT_DESTINATION_URL = "https://localhost:8082/sdw";

This comment has been minimized.

Copy link
@hmusavi

hmusavi Feb 1, 2019

Contributor

We know the SDW URL. Let's use the real value for the default: https://webapp-integration.cvmvp.com/whtools/rest/v2/

private static final String DEFAULT_ENCODE_TYPE = "hex";

@Autowired
private Environment environment;

private String groupId;
private String encodeType;

private String kafkaBrokers;
private String subscriptionTopic;
private String[] subscriptionTopics;

private String username;
private String password;

private String destinationUrl;

@PostConstruct
void initialize() {

logger.info("Values: {} {} {} {}", groupId, kafkaBrokers, subscriptionTopic, destinationUrl);
if (getGroupId() == null)
setGroupId(DEFAULT_GROUP_ID);

Expand All @@ -54,9 +51,18 @@ void initialize() {
}
setKafkaBrokers(dockerIp + ":" + DEFAULT_KAFKA_PORT);
}
if (getSubscriptionTopic() == null)
setSubscriptionTopic(DEFAULT_KAFKA_SUBSCRIPTION_TOPIC);


if (getEncodeType() == null)
setEncodeType(DEFAULT_ENCODE_TYPE);

if (getDestinationUrl() == null)
setDestinationUrl(DEFAULT_DESTINATION_URL);

if (getSubscriptionTopics() == null || getSubscriptionTopics().length == 0) {
logger.error("No Kafka subscription topics specified in configuration");
throw new IllegalArgumentException("No Kafka subscription topics specified in configuration");
}

if (getUsername() == null) {
logger.error("No username specified in configuration");
throw new IllegalArgumentException("No username specified in configuration");
Expand All @@ -66,10 +72,6 @@ void initialize() {
logger.error("No password specified in configuration");
throw new IllegalArgumentException("No password specified in configuration");
}

if (getDestinationUrl() == null)
setDestinationUrl(DEFAULT_DESTINATION_URL);

}

@Override
Expand Down Expand Up @@ -97,12 +99,12 @@ public void setKafkaBrokers(String kafkaBrokers) {
this.kafkaBrokers = kafkaBrokers;
}

public String getSubscriptionTopic() {
return subscriptionTopic;
public String[] getSubscriptionTopics() {
return subscriptionTopics;
}

public void setSubscriptionTopic(String subscriptionTopic) {
this.subscriptionTopic = subscriptionTopic;
public void setSubscriptionTopics(String[] subscriptionTopics) {
this.subscriptionTopics = subscriptionTopics;
}

public String getGroupId() {
Expand All @@ -128,4 +130,12 @@ public String getPassword() {
public void setPassword(String sdwPassword) {
this.password = sdwPassword;
}

public String getEncodeType() {
return encodeType;
}

public void setEncodeType(String encodeType) {
this.encodeType = encodeType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

public interface ConsumerDepositor<T> {

public void run(T t);
public void run(T... t);

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,11 @@
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import jpo.sdw.depositor.DepositorProperties;
import jpo.sdw.depositor.depositors.RestDepositor;

public class KafkaConsumerRestDepositor extends KafkaConsumerDepositor<String> {

@Autowired
DepositorProperties depositorProperties;

public static class LoopController {
private LoopController() {
throw new UnsupportedOperationException();
Expand All @@ -32,31 +27,25 @@ public static boolean loop() {

private RestDepositor<String> restDepositor;
private KafkaConsumer<String, String> kafkaConsumer;
private JSONObject jsonMsg;

public KafkaConsumerRestDepositor(KafkaConsumer<String, String> kafkaConsumer, RestDepositor<String> restDepositor) {
public KafkaConsumerRestDepositor(KafkaConsumer<String, String> kafkaConsumer, RestDepositor<String> restDepositor,
String encodeType) {
this.setKafkaConsumer(kafkaConsumer);
this.setRestDepositor(restDepositor);
this.jsonMsg = new JSONObject();
this.jsonMsg.put("systemDepositName", "string");

This comment has been minimized.

Copy link
@hmusavi

hmusavi Feb 1, 2019

Contributor

systemDepositName should be "SDW 2.3".

this.jsonMsg.put("systemDepositName", encodeType);

This comment has been minimized.

Copy link
@hmusavi

hmusavi Feb 1, 2019

Contributor

Should be encodeType

}

@Override
public void run(String topic) {
this.getKafkaConsumer().subscribe(Arrays.asList(topic));
public void run(String... topics) {
this.getKafkaConsumer().subscribe(Arrays.asList(topics));
while (LoopController.loop()) { // NOSONAR (used for unit testing)
ConsumerRecords<String, String> records = this.getKafkaConsumer().poll(100);
for (ConsumerRecord<String, String> record : records) {

long offset = record.offset();
String key = record.key();
String value = record.value();
String destination = this.getRestDepositor().getDestination().toString();

JSONObject jsonMsg = new JSONObject();
jsonMsg.put("systemDepositName", "string");
jsonMsg.put("encodeType", "hex");
jsonMsg.put("encodedMsg", value);

logger.info("Depositing message to {} KafkaOffset = {}, KafkaKey = {}, MessageValue = {}", destination,
offset, key, value);
logger.debug("Publishing message {}", record);
this.jsonMsg.put("encodedMsg", record.value());
this.getRestDepositor().deposit(jsonMsg.toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public class DepositController {
public DepositController(DepositorProperties depositorProperties) throws URISyntaxException {

List<ClientHttpRequestInterceptor> authHeaders = new ArrayList<ClientHttpRequestInterceptor>();
authHeaders.add(new BasicAuthorizationInterceptor(depositorProperties.getUsername(),
depositorProperties.getPassword()));
authHeaders.add(
new BasicAuthorizationInterceptor(depositorProperties.getUsername(), depositorProperties.getPassword()));

RestTemplate basicAuthRestTemplate = new RestTemplate();
basicAuthRestTemplate.setInterceptors(authHeaders);
Expand All @@ -32,9 +32,10 @@ public DepositController(DepositorProperties depositorProperties) throws URISynt
new URI(depositorProperties.getDestinationUrl()));

KafkaConsumerRestDepositor kcrd = new KafkaConsumerRestDepositor(
KafkaConsumerFactory.createConsumer(depositorProperties), sdwDepositor);
KafkaConsumerFactory.createConsumer(depositorProperties), sdwDepositor,
depositorProperties.getEncodeType());

kcrd.run(depositorProperties.getSubscriptionTopic());
kcrd.run(depositorProperties.getSubscriptionTopics());
}

}
3 changes: 1 addition & 2 deletions src/main/java/jpo/sdw/depositor/depositors/SDWDepositor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ public SDWDepositor(RestTemplate restTemplate, URI destination) {

@Override
public void deposit(String message) {

try {
ResponseEntity<String> result = this.getRestTemplate().postForEntity(this.getDestination(), message,
String.class);
logger.info("Response received. Status: {}, Body: {}", result.getStatusCode(), result.getBody());
logger.debug("Response received. Status: {}, Body: {}", result.getStatusCode(), result.getBody());
} catch (ResourceAccessException e) {
logger.error("Failed to send message to destination", e);
}
Expand Down
30 changes: 15 additions & 15 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
#General Properties
#==================
#groupId=${project.groupId}
groupId=usdot.jpo.ode

#Input Properties
#================
#sdw.kafkaBrokers=localhost
#sdw.kafkaPort=9092
#sdw.subscriptionTopic=topic.J2735TimBroadcastJson

#Output Properties
#=================
#sdw.username=uuuuuuuu
#sdw.password=pppppppp
#General Properties
#==================
#groupId=${project.groupId}
#groupId=usdot.jpo.ode

#Input Properties
#================
#sdw.kafkaBrokers=localhost
#sdw.kafkaPort=9092
#sdw.subscriptionTopics = {"topic.example1", "topic.example2"};

#Output Properties
#=================
#sdw.username=uuuuuuuu
#sdw.password=pppppppp
#sdw.destinationUrl=https://localhost:8082/sdw
41 changes: 36 additions & 5 deletions src/test/java/jpo/sdw/depositor/DepositorPropertiesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class DepositorPropertiesTest {
public void testAllValuesAlreadySet() {

This comment has been minimized.

Copy link
@hmusavi

hmusavi Feb 1, 2019

Contributor

Please add a test case for multiple topics.


String expectedKafkaBrokers = "testKafkaBrokers";
String expectedSubscriptionTopic = "testSubscriptionTopic";
String[] expectedSubscriptionTopics = { "testSubscriptionTopic" };
String expectedDestinationUrl = "testDestinationUrl";
String expectedGroupId = "testGroupId";
String expectedUsername = "testUsername";
Expand All @@ -28,7 +28,7 @@ public void testAllValuesAlreadySet() {
DepositorProperties testDepositorProperties = new DepositorProperties();

testDepositorProperties.setKafkaBrokers(expectedKafkaBrokers);
testDepositorProperties.setSubscriptionTopic(expectedSubscriptionTopic);
testDepositorProperties.setSubscriptionTopics(expectedSubscriptionTopics);
testDepositorProperties.setDestinationUrl(expectedDestinationUrl);
testDepositorProperties.setGroupId(expectedGroupId);
testDepositorProperties.setEnvironment(mockEnvironment);
Expand All @@ -38,8 +38,8 @@ public void testAllValuesAlreadySet() {
testDepositorProperties.initialize();

assertEquals("Incorrect kafkaBrokers", expectedKafkaBrokers, testDepositorProperties.getKafkaBrokers());
assertEquals("Incorrect subscriptionTopic", expectedSubscriptionTopic,
testDepositorProperties.getSubscriptionTopic());
assertEquals("Incorrect subscriptionTopic", expectedSubscriptionTopics[0],
testDepositorProperties.getSubscriptionTopics()[0]);
assertEquals("Incorrect destinationUrl", expectedDestinationUrl, testDepositorProperties.getDestinationUrl());
assertEquals("Incorrect groupId", expectedGroupId, testDepositorProperties.getGroupId());
assertNotNull("No environment", testDepositorProperties.getEnvironment());
Expand All @@ -54,11 +54,11 @@ public void testDefaults() {

testDepositorProperties.setUsername("uuuuuuuu");
testDepositorProperties.setPassword("pppppppp");
testDepositorProperties.setSubscriptionTopics(new String[] { "topic.Topic" });

testDepositorProperties.initialize();

assertNotNull(testDepositorProperties.getKafkaBrokers());
assertNotNull(testDepositorProperties.getSubscriptionTopic());
assertNotNull(testDepositorProperties.getDestinationUrl());
assertNotNull(testDepositorProperties.getGroupId());
}
Expand All @@ -67,6 +67,7 @@ public void testDefaults() {
public void missingUsernameThrowsException() {
DepositorProperties testDepositorProperties = new DepositorProperties();
testDepositorProperties.setPassword("pppppppp");
testDepositorProperties.setSubscriptionTopics(new String[] { "topic.Topic" });
try {
testDepositorProperties.initialize();
fail("Expected IllegalArgumentException");
Expand All @@ -80,6 +81,7 @@ public void missingUsernameThrowsException() {
public void missingPasswordThrowsException() {
DepositorProperties testDepositorProperties = new DepositorProperties();
testDepositorProperties.setUsername("uuuuuuuu");
testDepositorProperties.setSubscriptionTopics(new String[] { "topic.Topic" });
try {
testDepositorProperties.initialize();
fail("Expected IllegalArgumentException");
Expand All @@ -89,4 +91,33 @@ public void missingPasswordThrowsException() {
}
}

@Test
public void nullSubscriptionTopicsThrowsException() {
DepositorProperties testDepositorProperties = new DepositorProperties();
testDepositorProperties.setUsername("uuuuuuuu");
testDepositorProperties.setPassword("pppppppp");
try {
testDepositorProperties.initialize();
fail("Expected IllegalArgumentException");
} catch (Exception e) {
assertTrue(e instanceof IllegalArgumentException);
assertEquals("No Kafka subscription topics specified in configuration", e.getMessage());
}
}

@Test
public void emptySubscriptionTopicsThrowsException() {
DepositorProperties testDepositorProperties = new DepositorProperties();
testDepositorProperties.setUsername("uuuuuuuu");
testDepositorProperties.setPassword("pppppppp");
testDepositorProperties.setSubscriptionTopics(new String[] {});
try {
testDepositorProperties.initialize();
fail("Expected IllegalArgumentException");
} catch (Exception e) {
assertTrue(e instanceof IllegalArgumentException);
assertEquals("No Kafka subscription topics specified in configuration", e.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;

import jpo.sdw.depositor.DepositorProperties;
import jpo.sdw.depositor.consumerdepositors.KafkaConsumerRestDepositor.LoopController;
import jpo.sdw.depositor.depositors.RestDepositor;
import mockit.Capturing;
Expand All @@ -37,7 +36,7 @@ public class KafkaConsumerRestDepositorTest {
@Injectable
RestDepositor<String> injectableRestDepositor;
@Injectable
DepositorProperties injectableDepositorProperties;
String encodeType;

@Mocked
ConsumerRecord<String, String> mockConsumerRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class DepositControllerTest {
public void shouldRun() throws URISyntaxException {
new Expectations() {
{
capturingKafkaConsumerRestDepositor.run(anyString);
capturingKafkaConsumerRestDepositor.run((String[]) any);
times = 1;
}
};
Expand Down

0 comments on commit 10bd454

Please sign in to comment.