Skip to content

Commit

Permalink
Merge pull request usdot-jpo-ode#414 from usdot-jpo-ode/feature/encod…
Browse files Browse the repository at this point in the history
…ed_spat_json

Update Cloud to accept log messages from V2X Hub (SPaT) - JSON
  • Loading branch information
snallamothu authored Feb 17, 2021
2 parents ad951c8 + 8162dc2 commit e60ffbd
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class OdeLogMetadata extends OdeMsgMetadata {
private static final long serialVersionUID = -8601265839394150140L;

public enum RecordType {
bsmLogDuringEvent, rxMsg, dnMsg, bsmTx, driverAlert, unsupported
bsmLogDuringEvent, rxMsg, dnMsg, bsmTx, driverAlert, spatTx, unsupported
}

public enum SecurityResultCode {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package us.dot.its.jpo.ode.model;

public class OdeSpatMetadata extends OdeLogMetadata {
/**
*
*/
private static final long serialVersionUID = -5361008186032548625L;

public enum SpatSource {
RSU, V2X, MMITSS, unknown
}

private SpatSource spatSource;

public OdeSpatMetadata() {
super();
}

public OdeSpatMetadata(OdeMsgPayload payload) {
super(payload);
}

public OdeSpatMetadata(OdeMsgPayload payload, SerialId serialId, String receivedAt) {

}

public SpatSource getSpatSource() {
return spatSource;
}

public void setSpatSource(SpatSource spatSource) {
this.spatSource = spatSource;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode;
import us.dot.its.jpo.ode.model.OdeLogMsgMetadataLocation;
import us.dot.its.jpo.ode.model.OdeMsgPayload;
import us.dot.its.jpo.ode.model.OdeSpatMetadata;
import us.dot.its.jpo.ode.model.OdeSpatMetadata.SpatSource;
import us.dot.its.jpo.ode.model.ReceivedMessageDetails;
import us.dot.its.jpo.ode.model.RxSource;
import us.dot.its.jpo.ode.util.XmlUtils;
Expand All @@ -30,9 +32,10 @@

/***
* Encoded message Processor
* */
*/
public class Asn1DecodeMessageJSON extends AbstractSubscriberProcessor<String, String> {
private static final String BSMContentType = "BsmMessageContent";
private static final String SPATContentType = "SpatMessageContent";

private Logger logger = LoggerFactory.getLogger(this.getClass());

Expand All @@ -47,74 +50,118 @@ public Asn1DecodeMessageJSON(OdeProperties odeProps) {
protected Object process(String consumedData) {
OdeData odeData = null;
OdeMsgPayload payload = null;
OdeBsmMetadata metadata = null;
try {
JSONObject rawJSONObject = new JSONObject(consumedData);
Set<?> keys = rawJSONObject.keySet();
for (Object key : keys) {

//Send encoded BSM content to Codec service to decode BSM
if (key != null && key.toString().equals(BSMContentType)) {
/**process consumed data
* { "BsmMessageContent": [{ "metadata": { "utctimestamp: "2020-11-30T23:45:24.913657Z" } "payload":"001480CF4B950C400022D2666E923D1EA6D4E28957BD55FFFFF001C758FD7E67D07F7FFF8000000002020218E1C1004A40196FBC042210115C030EF1408801021D4074CE7E1848101C5C0806E8E1A50101A84056EE8A1AB4102B840A9ADA21B9010259C08DEE1C1C560FFDDBFC070C0222210018BFCE309623120FFE9BFBB10C8238A0FFDC3F987114241610009BFB7113024780FFAC3F95F13A26800FED93FDD51202C5E0FE17BF9B31202FBAFFFEC87FC011650090019C70808440C83207873800000000001095084081C903447E31C12FC0"}]}
OdeBsmMetadata metadata = null;
/**process consumed data { "BsmMessageContent": [{ "metadata": { "utctimestamp:"2020-11-30T23:45:24.913657Z" }, "payload":"001480CF4B950C400022D2666E923D1EA6D4E28957BD55FFFFF001C758FD7E67D07F7FFF8000000002020218E1C1004A40196FBC042210115C030EF1408801021D4074CE7E1848101C5C0806E8E1A50101A84056EE8A1AB4102B840A9ADA21B9010259C08DEE1C1C560FFDDBFC070C0222210018BFCE309623120FFE9BFBB10C8238A0FFDC3F987114241610009BFB7113024780FFAC3F95F13A26800FED93FDD51202C5E0FE17BF9B31202FBAFFFEC87FC011650090019C70808440C83207873800000000001095084081C903447E31C12FC0"}]}
*/
JSONArray rawBSMJsonContentArray = rawJSONObject.getJSONArray(BSMContentType);
for(int i=0;i<rawBSMJsonContentArray.length();i++)
{
for (int i = 0; i < rawBSMJsonContentArray.length(); i++) {
JSONObject rawBSMJsonContent = (JSONObject) rawBSMJsonContentArray.get(i);
String encodedPayload = rawBSMJsonContent.get("payload").toString();
JSONObject rawmetadata = (JSONObject) rawBSMJsonContent.get("metadata");
//construct payload

// construct payload
payload = new OdeAsn1Payload(new OdeHexByteArray(encodedPayload));
//construct metadata

// construct metadata
metadata = new OdeBsmMetadata(payload);
metadata.setOdeReceivedAt(rawmetadata.getString("utctimestamp"));
metadata.setRecordType(RecordType.bsmTx);
metadata.setSecurityResultCode(SecurityResultCode.success);
//construct metadata: receivedMessageDetails

// construct metadata: receivedMessageDetails
ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails();
receivedMessageDetails.setRxSource(RxSource.RV);

//construct metadata: locationData
// construct metadata: locationData
OdeLogMsgMetadataLocation locationData = new OdeLogMsgMetadataLocation();
receivedMessageDetails.setLocationData(locationData);

metadata.setReceivedMessageDetails(receivedMessageDetails);
metadata.setBsmSource(BsmSource.RV);

Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",EncodingRule.UPER);

Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",
EncodingRule.UPER);
metadata.addEncoding(unsecuredDataEncoding);
//construct odeData

// construct odeData
odeData = new OdeAsn1Data(metadata, payload);


publishEncodedMessageToAsn1Decoder(odeData);
}

} else if (key != null && key.toString().equals(SPATContentType)) {
/**
* process consumed data { "SpatMessageContent": [{ "metadata": { "utctimestamp:
* "2020-11-30T23:45:24.913657Z" }
* "payload":"00131A604A380583702005837800080008100000040583705043002580"}]}
*/

OdeSpatMetadata metadata = null;
JSONArray rawSPATJsonContentArray = rawJSONObject.getJSONArray(SPATContentType);
for (int i = 0; i < rawSPATJsonContentArray.length(); i++) {
JSONObject rawSPATJsonContent = (JSONObject) rawSPATJsonContentArray.get(i);
String encodedPayload = rawSPATJsonContent.get("payload").toString();
JSONObject rawmetadata = (JSONObject) rawSPATJsonContent.get("metadata");

// construct payload
payload = new OdeAsn1Payload(new OdeHexByteArray(encodedPayload));

// construct metadata
metadata = new OdeSpatMetadata(payload);
metadata.setOdeReceivedAt(rawmetadata.getString("utctimestamp"));
metadata.setRecordType(RecordType.spatTx);
metadata.setSecurityResultCode(SecurityResultCode.success);

// construct metadata: receivedMessageDetails
ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails();
receivedMessageDetails.setRxSource(RxSource.NA);

// construct metadata: locationData
OdeLogMsgMetadataLocation locationData = new OdeLogMsgMetadataLocation();
receivedMessageDetails.setLocationData(locationData);

metadata.setReceivedMessageDetails(receivedMessageDetails);
metadata.setSpatSource(SpatSource.V2X);

Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",
EncodingRule.UPER);
metadata.addEncoding(unsecuredDataEncoding);

// construct odeData
odeData = new OdeAsn1Data(metadata, payload);

publishEncodedMessageToAsn1Decoder(odeData);
}
} else {
logger.error("Error received null key from consumed message");
}
}

} catch (

} catch (Exception e) {
Exception e) {
logger.error("Error publishing to Asn1DecoderInput: {}", e.getMessage());
}
return null;
}

private void publishEncodedMessageToAsn1Decoder(OdeData odeData){
private void publishEncodedMessageToAsn1Decoder(OdeData odeData) {
XmlUtils xmlUtils = new XmlUtils();
try {
logger.info("Sending encoded payload XML to ASN1 codec {}", xmlUtils.toXml(odeData));
codecPublisher.publish(xmlUtils.toXml(odeData),codecPublisher.getOdeProperties().getKafkaTopicAsn1DecoderInput());
codecPublisher.publish(xmlUtils.toXml(odeData),
codecPublisher.getOdeProperties().getKafkaTopicAsn1DecoderInput());
} catch (JsonProcessingException e) {
logger.info("Error sending encoded payload XML to ASN1 codec {}", e.getMessage());
e.printStackTrace();
}

}
}

0 comments on commit e60ffbd

Please sign in to comment.