Skip to content

Commit

Permalink
Fixing serialization issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
hmusavi committed Jul 18, 2017
1 parent 7d89a4d commit 281b5df
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package us.dot.its.jpo.ode.wrapper;

import us.dot.its.jpo.ode.plugin.j2735.J2735Bsm;

public class J2735BsmDeseriizer extends MessagingDeserializer<J2735Bsm> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@ public MessageConsumer(String brokers, String groupId, MessageProcessor<K, V> pr
logger.info("Consumer Created for groupId {}", groupId);
}

public MessageConsumer(String brokers, String groupId, MessageProcessor<K, V> processor) {
this(brokers, groupId, processor, MessagingDeserializer.class.getName());
}

public MessageConsumer(String brokers, String groupId, MessageProcessor<K, V> processor, Properties props) {
this.processor = processor;
props.put("bootstrap.servers", brokers);
Expand All @@ -83,10 +79,6 @@ public MessageConsumer(String brokers, String groupId, MessageProcessor<K, V> pr

}

public MessageConsumer(String brokers, String groupId, Properties props) {
this(brokers, groupId, null, props);
}

public void subscribe(String... topics) {

List<String> listTopics = Arrays.asList(topics);
Expand Down
1 change: 1 addition & 0 deletions jpo-ode-core/src/test/resources/CVMessages/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/VehicleSituationDataServiceResponse.uper
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import us.dot.its.jpo.ode.OdeProperties;
import us.dot.its.jpo.ode.plugin.j2735.J2735Bsm;
import us.dot.its.jpo.ode.wrapper.J2735BsmDeseriizer;
import us.dot.its.jpo.ode.wrapper.MessageConsumer;

/**
Expand All @@ -30,13 +31,14 @@ public ToJsonServiceController(OdeProperties odeProps) {
odeProps.getKafkaTopicBsmSerializedPojo(),
odeProps.getKafkaTopicBsmRawJson());

ToJsonConverter<J2735Bsm> converter = new ToJsonConverter<>(
ToJsonConverter<J2735Bsm> converter = new ToJsonConverter<J2735Bsm>(
odeProps, false, odeProps.getKafkaTopicBsmRawJson());

MessageConsumer<String, J2735Bsm> consumer = new MessageConsumer<>(
MessageConsumer<String, J2735Bsm> consumer = new MessageConsumer<String, J2735Bsm>(
odeProps.getKafkaBrokers(),
this.getClass().getSimpleName(),
converter);
converter,
J2735BsmDeseriizer.class.getName());

consumer.setName(this.getClass().getSimpleName());
converter.start(consumer, odeProps.getKafkaTopicBsmSerializedPojo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.apache.tomcat.util.buf.HexUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.SerializationUtils;

import com.oss.asn1.EncodeFailedException;
import com.oss.asn1.EncodeNotSupportedException;
Expand All @@ -28,7 +27,6 @@
import us.dot.its.jpo.ode.plugin.j2735.oss.OssVehicleSituationRecord;
import us.dot.its.jpo.ode.udp.bsm.BsmComparator;
import us.dot.its.jpo.ode.util.CodecUtils;
import us.dot.its.jpo.ode.util.JsonUtils;
import us.dot.its.jpo.ode.wrapper.AbstractSubPubTransformer;
import us.dot.its.jpo.ode.wrapper.MessageProducer;

Expand All @@ -48,7 +46,7 @@
* (SIZE (1..10)) OF VehSitRecord, -- sets of situation data records crc
* DSRC.MsgCRC }
*/
public class BsmToVsdPackager<V> extends AbstractSubPubTransformer<String, V, byte[]> {
public class BsmToVsdPackager extends AbstractSubPubTransformer<String, J2735Bsm, byte[]> {

private static final Logger logger = LoggerFactory.getLogger(BsmToVsdPackager.class);

Expand All @@ -63,23 +61,19 @@ public BsmToVsdPackager(MessageProducer<String, byte[]> producer, String outputT
}

@Override
protected byte[] transform(V consumedData) {
protected byte[] transform(J2735Bsm consumedData) {

if (null == consumedData) {
return new byte[0];
}

String jsonConsumedData = (String) SerializationUtils.deserialize((byte[]) consumedData);

logger.debug("VsdDepositor received data: {}", consumedData);

J2735Bsm bsmData = (J2735Bsm) JsonUtils.fromJson(jsonConsumedData, J2735Bsm.class);

byte[] encodedVsd = null;
try {
logger.debug("Consuming BSM.");

VehSitDataMessage vsd = addToVsdBundle(bsmData);
VehSitDataMessage vsd = addToVsdBundle(consumedData);

// Only full VSDs (10) will be published
// TODO - toggleable mechanism for periodically publishing not-full
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ private BsmToVsdPackagerController(OdeProperties odeProps) {
BsmToVsdPackager<byte[]> converter = new BsmToVsdPackager<>(MessageProducer.defaultByteArrayMessageProducer(
odeProps.getKafkaBrokers(), odeProps.getKafkaProducerType()), outputTopic);

MessageConsumer<String, byte[]> consumer = new MessageConsumer<>(odeProps.getKafkaBrokers(),
this.getClass().getSimpleName(), converter);
MessageConsumer<String, byte[]> consumer = new MessageConsumer<String, byte[]>(odeProps.getKafkaBrokers(),
this.getClass().getSimpleName(), converter, MessageConsumer.SERIALIZATION_BYTE_ARRAY_DESERIALIZER);

consumer.setName(this.getClass().getSimpleName());
converter.start(consumer, inputTopic);
Expand Down

0 comments on commit 281b5df

Please sign in to comment.