diff --git a/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/wrapper/J2735BsmDeseriizer.java b/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/wrapper/J2735BsmDeseriizer.java new file mode 100644 index 000000000..9336bc584 --- /dev/null +++ b/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/wrapper/J2735BsmDeseriizer.java @@ -0,0 +1,7 @@ +package us.dot.its.jpo.ode.wrapper; + +import us.dot.its.jpo.ode.plugin.j2735.J2735Bsm; + +public class J2735BsmDeseriizer extends MessagingDeserializer { + +} diff --git a/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/wrapper/MessageConsumer.java b/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/wrapper/MessageConsumer.java index 77cde466a..378edd78f 100644 --- a/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/wrapper/MessageConsumer.java +++ b/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/wrapper/MessageConsumer.java @@ -69,10 +69,6 @@ public MessageConsumer(String brokers, String groupId, MessageProcessor pr logger.info("Consumer Created for groupId {}", groupId); } - public MessageConsumer(String brokers, String groupId, MessageProcessor processor) { - this(brokers, groupId, processor, MessagingDeserializer.class.getName()); - } - public MessageConsumer(String brokers, String groupId, MessageProcessor processor, Properties props) { this.processor = processor; props.put("bootstrap.servers", brokers); @@ -83,10 +79,6 @@ public MessageConsumer(String brokers, String groupId, MessageProcessor pr } - public MessageConsumer(String brokers, String groupId, Properties props) { - this(brokers, groupId, null, props); - } - public void subscribe(String... topics) { List listTopics = Arrays.asList(topics); diff --git a/jpo-ode-core/src/test/resources/CVMessages/.gitignore b/jpo-ode-core/src/test/resources/CVMessages/.gitignore new file mode 100644 index 000000000..153e27add --- /dev/null +++ b/jpo-ode-core/src/test/resources/CVMessages/.gitignore @@ -0,0 +1 @@ +/VehicleSituationDataServiceResponse.uper diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/json/ToJsonServiceController.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/json/ToJsonServiceController.java index 542dc172e..849b6e282 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/json/ToJsonServiceController.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/json/ToJsonServiceController.java @@ -7,6 +7,7 @@ import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.plugin.j2735.J2735Bsm; +import us.dot.its.jpo.ode.wrapper.J2735BsmDeseriizer; import us.dot.its.jpo.ode.wrapper.MessageConsumer; /** @@ -30,13 +31,14 @@ public ToJsonServiceController(OdeProperties odeProps) { odeProps.getKafkaTopicBsmSerializedPojo(), odeProps.getKafkaTopicBsmRawJson()); - ToJsonConverter converter = new ToJsonConverter<>( + ToJsonConverter converter = new ToJsonConverter( odeProps, false, odeProps.getKafkaTopicBsmRawJson()); - MessageConsumer consumer = new MessageConsumer<>( + MessageConsumer consumer = new MessageConsumer( odeProps.getKafkaBrokers(), this.getClass().getSimpleName(), - converter); + converter, + J2735BsmDeseriizer.class.getName()); consumer.setName(this.getClass().getSimpleName()); converter.start(consumer, odeProps.getKafkaTopicBsmSerializedPojo()); diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/vsd/BsmToVsdPackager.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/vsd/BsmToVsdPackager.java index 6d93996e3..c492045d6 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/vsd/BsmToVsdPackager.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/vsd/BsmToVsdPackager.java @@ -7,7 +7,6 @@ import org.apache.tomcat.util.buf.HexUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.util.SerializationUtils; import com.oss.asn1.EncodeFailedException; import com.oss.asn1.EncodeNotSupportedException; @@ -28,7 +27,6 @@ import us.dot.its.jpo.ode.plugin.j2735.oss.OssVehicleSituationRecord; import us.dot.its.jpo.ode.udp.bsm.BsmComparator; import us.dot.its.jpo.ode.util.CodecUtils; -import us.dot.its.jpo.ode.util.JsonUtils; import us.dot.its.jpo.ode.wrapper.AbstractSubPubTransformer; import us.dot.its.jpo.ode.wrapper.MessageProducer; @@ -48,7 +46,7 @@ * (SIZE (1..10)) OF VehSitRecord, -- sets of situation data records crc * DSRC.MsgCRC } */ -public class BsmToVsdPackager extends AbstractSubPubTransformer { +public class BsmToVsdPackager extends AbstractSubPubTransformer { private static final Logger logger = LoggerFactory.getLogger(BsmToVsdPackager.class); @@ -63,23 +61,19 @@ public BsmToVsdPackager(MessageProducer producer, String outputT } @Override - protected byte[] transform(V consumedData) { + protected byte[] transform(J2735Bsm consumedData) { if (null == consumedData) { return new byte[0]; } - String jsonConsumedData = (String) SerializationUtils.deserialize((byte[]) consumedData); - logger.debug("VsdDepositor received data: {}", consumedData); - J2735Bsm bsmData = (J2735Bsm) JsonUtils.fromJson(jsonConsumedData, J2735Bsm.class); - byte[] encodedVsd = null; try { logger.debug("Consuming BSM."); - VehSitDataMessage vsd = addToVsdBundle(bsmData); + VehSitDataMessage vsd = addToVsdBundle(consumedData); // Only full VSDs (10) will be published // TODO - toggleable mechanism for periodically publishing not-full diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/vsd/BsmToVsdPackagerController.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/vsd/BsmToVsdPackagerController.java index d6a81b571..c172442b1 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/vsd/BsmToVsdPackagerController.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/vsd/BsmToVsdPackagerController.java @@ -31,8 +31,8 @@ private BsmToVsdPackagerController(OdeProperties odeProps) { BsmToVsdPackager converter = new BsmToVsdPackager<>(MessageProducer.defaultByteArrayMessageProducer( odeProps.getKafkaBrokers(), odeProps.getKafkaProducerType()), outputTopic); - MessageConsumer consumer = new MessageConsumer<>(odeProps.getKafkaBrokers(), - this.getClass().getSimpleName(), converter); + MessageConsumer consumer = new MessageConsumer(odeProps.getKafkaBrokers(), + this.getClass().getSimpleName(), converter, MessageConsumer.SERIALIZATION_BYTE_ARRAY_DESERIALIZER); consumer.setName(this.getClass().getSimpleName()); converter.start(consumer, inputTopic);