Skip to content

Commit

Permalink
ODE-466 Fixed up SerialId.
Browse files Browse the repository at this point in the history
  • Loading branch information
hmusavi committed Nov 19, 2018
1 parent ef6c40e commit 719810a
Show file tree
Hide file tree
Showing 22 changed files with 226 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ public SerialId() {

public SerialId(String streamId, int bundleSize,
long bundleId, int recordId) {
super();
this();
if (streamId != null)
this.streamId = streamId;
else
this.streamId = "";
this.streamId = this.streamId + "_null";

this.bundleSize = bundleSize;
this.bundleId = bundleId + (recordId / this.bundleSize);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package us.dot.its.jpo.ode.plugin;

public interface OdePlugin {
static final int INPUT_STREAM_BUFFER_SIZE = 4096;
static final int INPUT_STREAM_BUFFER_SIZE = 8192;
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public OdeData decode(BufferedInputStream bis, String filename, SerialId serialI
if (bsm != null) {
logger.debug("Decoded BSM successfully, creating OdeBsmData object.");
odeBsmData = OdeBsmDataCreatorHelper.createOdeBsmData(
(J2735Bsm) bsm, filename, serialId);
(J2735Bsm) bsm, filename);
} else {
logger.debug("Failed to decode BSM.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,31 @@ public FileAsn1CodecPublisher(OdeProperties odeProperties) {
this.codecPublisher = new LogFileToAsn1CodecPublisher(messagePub);
}

public void publishFile(Path filePath, BufferedInputStream fileInputStream, ImporterFileType fileType)
public void publishFile(Path filePath, BufferedInputStream fileInputStream, ImporterFileType fileType, int numRecords)
throws FileAsn1CodecPublisherException {
String fileName = filePath.toFile().getName();

logger.info("Publishing file {}", fileName);

try {
logger.info("Publishing data from {} to asn1_codec.", filePath);
codecPublisher.publish(fileInputStream, fileName, fileType);
codecPublisher.publish(fileInputStream, fileName, fileType, numRecords);
} catch (Exception e) {
throw new FileAsn1CodecPublisherException("Failed to publish file.", e);
}
}

public int getNumberOfRecords(Path filePath, BufferedInputStream fileInputStream, ImporterFileType fileType)
throws FileAsn1CodecPublisherException {
String fileName = filePath.toFile().getName();

logger.info("Publishing file {}", fileName);

try {
logger.info("Publishing data from {} to asn1_codec.", filePath);
return codecPublisher.getNumberOfRecords(fileInputStream, fileName, fileType);
} catch (Exception e) {
throw new FileAsn1CodecPublisherException("Failed to publish file.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import us.dot.its.jpo.ode.model.OdeLogMetadata;
import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode;
import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy;
import us.dot.its.jpo.ode.model.SerialId;
import us.dot.its.jpo.ode.plugin.j2735.J2735Bsm;
import us.dot.its.jpo.ode.plugin.j2735.builders.BsmBuilder;
import us.dot.its.jpo.ode.plugin.j2735.builders.BsmPart2ContentBuilder.BsmPart2ContentBuilderException;
Expand Down Expand Up @@ -62,8 +61,8 @@ public static OdeBsmData createOdeBsmData(
}

public static OdeBsmData createOdeBsmData(
J2735Bsm rawBsm, String filename, SerialId serialId) {
BsmLogFileParser bsmFileParser = new BsmLogFileParser(serialId.getBundleId());
J2735Bsm rawBsm, String filename) {
BsmLogFileParser bsmFileParser = new BsmLogFileParser();
bsmFileParser.setFilename(filename).getTimeParser().setUtcTimeInSec(0).getSecResCodeParser().setSecurityResultCode(SecurityResultCode.unknown);
;
return createOdeBsmData(rawBsm, null, bsmFileParser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public static void updateLogMetadata(OdeLogMetadata metadata, LogFileParser logF
}

metadata.setRecordGeneratedBy(GeneratedBy.OBU);
metadata.getSerialId().addRecordId(1);
}

public static ReceivedMessageDetails buildReceivedMessageDetails(LogFileParser parser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public void decodeAndPublish(BufferedInputStream is, String fileName, ImporterFi

J2735Bsm j2735Bsm = (J2735Bsm) JsonUtils.fromJson(line, J2735Bsm.class);
OdeData odeBsm = OdeBsmDataCreatorHelper.createOdeBsmData(
j2735Bsm, fileName,
this.serialId.setBundleId(bundleId.incrementAndGet()).addRecordId(1));
j2735Bsm, fileName);

publisher.publish(odeBsm, publisher.getOdeProperties().getKafkaTopicOdeBsmJson());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package us.dot.its.jpo.ode.coder.stream;

import java.io.BufferedInputStream;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,6 +27,7 @@
import us.dot.its.jpo.ode.model.OdeLogMetadata;
import us.dot.its.jpo.ode.model.OdeMsgPayload;
import us.dot.its.jpo.ode.model.RxSource;
import us.dot.its.jpo.ode.model.SerialId;
import us.dot.its.jpo.ode.util.JsonUtils;
import us.dot.its.jpo.ode.util.XmlUtils;

Expand All @@ -46,23 +47,25 @@ public LogFileToAsn1CodecPublisherException(String string, Exception e) {

protected StringPublisher publisher;
protected LogFileParser fileParser;

protected static AtomicInteger bundleId = new AtomicInteger(1);
protected SerialId serialId;

public LogFileToAsn1CodecPublisher(StringPublisher dataPub) {
this.publisher = dataPub;
this.serialId = new SerialId();
}

public void publish(BufferedInputStream bis, String fileName, ImporterFileType fileType)
public void publish(BufferedInputStream bis, String fileName, ImporterFileType fileType, int numRecords)
throws LogFileToAsn1CodecPublisherException {
XmlUtils xmlUtils = new XmlUtils();
ParserStatus status = ParserStatus.UNKNOWN;

if (fileType == ImporterFileType.OBU_LOG_FILE) {
fileParser = LogFileParser.factory(fileName, bundleId.incrementAndGet());
fileParser = LogFileParser.factory(fileName);
} else {
status = ParserStatus.NA;
}

serialId.setBundleSize(numRecords);

do {
try {
Expand All @@ -86,60 +89,158 @@ public void publish(BufferedInputStream bis, String fileName, ImporterFileType f
} while (status == ParserStatus.COMPLETE);
}

private void publish(XmlUtils xmlUtils) throws JsonProcessingException {
public void parsePayload(List<OdeMsgPayload> payloadList) {

OdeMsgPayload msgPayload;
OdeLogMetadata msgMetadata;
OdeData msgData;

if (fileParser instanceof DriverAlertFileParser){
logger.debug("Publishing a driverAlert.");
msgPayload = new OdeDriverAlertPayload(((DriverAlertFileParser) fileParser).getAlert());
msgMetadata = new OdeLogMetadata(msgPayload);

msgMetadata.getSerialId().setBundleId(bundleId.get()).addRecordId(1);
OdeLogMetadataCreatorHelper.updateLogMetadata(msgMetadata, fileParser);

msgData = new OdeDriverAlertData(msgMetadata, msgPayload);
publisher.publish(JsonUtils.toJson(msgData, false),
publisher.getOdeProperties().getKafkaTopicDriverAlertJson());
} else {
msgPayload = new OdeAsn1Payload(fileParser.getPayloadParser().getPayload());
if (fileParser instanceof BsmLogFileParser ||
(fileParser instanceof RxMsgFileParser && ((RxMsgFileParser)fileParser).getRxSource() == RxSource.RV)) {
logger.debug("Publishing a BSM");
msgMetadata = new OdeBsmMetadata(msgPayload);
} else {
logger.debug("Publishing a TIM");
msgMetadata = new OdeLogMetadata(msgPayload);
}

Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER);
Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",
EncodingRule.UPER);
msgMetadata.addEncoding(msgEncoding).addEncoding(unsecuredDataEncoding);

msgMetadata.getSerialId().setBundleId(bundleId.get()).addRecordId(1);
OdeLogMetadataCreatorHelper.updateLogMetadata(msgMetadata, fileParser);

msgData = new OdeAsn1Data(msgMetadata, msgPayload);
publisher.publish(xmlUtils.toXml(msgData),
publisher.getOdeProperties().getKafkaTopicAsn1DecoderInput());
}

payloadList.add(msgPayload);
}

public void publish(XmlUtils xmlUtils, List<OdeMsgPayload> payloadList) throws JsonProcessingException {
serialId.setBundleSize(payloadList.size());
for (OdeMsgPayload msgPayload : payloadList) {
OdeLogMetadata msgMetadata;
OdeData msgData;

if (fileParser instanceof DriverAlertFileParser){
logger.debug("Publishing a driverAlert.");

msgMetadata = new OdeLogMetadata(msgPayload);
msgMetadata.setSerialId(serialId);

OdeLogMetadataCreatorHelper.updateLogMetadata(msgMetadata, fileParser);

msgData = new OdeDriverAlertData(msgMetadata, msgPayload);
publisher.publish(JsonUtils.toJson(msgData, false),
publisher.getOdeProperties().getKafkaTopicDriverAlertJson());
msgMetadata.getSerialId().increment();
} else {
if (fileParser instanceof BsmLogFileParser ||
(fileParser instanceof RxMsgFileParser && ((RxMsgFileParser)fileParser).getRxSource() == RxSource.RV)) {
logger.debug("Publishing a BSM");
msgMetadata = new OdeBsmMetadata(msgPayload);
} else {
logger.debug("Publishing a TIM");
msgMetadata = new OdeLogMetadata(msgPayload);
}
msgMetadata.setSerialId(serialId);

Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER);
Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",
EncodingRule.UPER);
msgMetadata.addEncoding(msgEncoding).addEncoding(unsecuredDataEncoding);

OdeLogMetadataCreatorHelper.updateLogMetadata(msgMetadata, fileParser);

msgData = new OdeAsn1Data(msgMetadata, msgPayload);
publisher.publish(xmlUtils.toXml(msgData),
publisher.getOdeProperties().getKafkaTopicAsn1DecoderInput());
msgMetadata.getSerialId().increment();
}
}
}

private void publish(XmlUtils xmlUtils) throws JsonProcessingException {

OdeMsgPayload msgPayload;
OdeLogMetadata msgMetadata;
OdeData msgData;

if (fileParser instanceof DriverAlertFileParser){
logger.debug("Publishing a driverAlert.");
msgPayload = new OdeDriverAlertPayload(((DriverAlertFileParser) fileParser).getAlert());

msgMetadata = new OdeLogMetadata(msgPayload);
msgMetadata.setSerialId(serialId);

OdeLogMetadataCreatorHelper.updateLogMetadata(msgMetadata, fileParser);

msgData = new OdeDriverAlertData(msgMetadata, msgPayload);
publisher.publish(JsonUtils.toJson(msgData, false),
publisher.getOdeProperties().getKafkaTopicDriverAlertJson());
msgMetadata.getSerialId().increment();
} else {
msgPayload = new OdeAsn1Payload(fileParser.getPayloadParser().getPayload());

if (fileParser instanceof BsmLogFileParser ||
(fileParser instanceof RxMsgFileParser && ((RxMsgFileParser)fileParser).getRxSource() == RxSource.RV)) {
logger.debug("Publishing a BSM");
msgMetadata = new OdeBsmMetadata(msgPayload);
} else {
logger.debug("Publishing a TIM");
msgMetadata = new OdeLogMetadata(msgPayload);
}
msgMetadata.setSerialId(serialId);

Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER);
Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",
EncodingRule.UPER);
msgMetadata.addEncoding(msgEncoding).addEncoding(unsecuredDataEncoding);

OdeLogMetadataCreatorHelper.updateLogMetadata(msgMetadata, fileParser);

msgData = new OdeAsn1Data(msgMetadata, msgPayload);
publisher.publish(xmlUtils.toXml(msgData),
publisher.getOdeProperties().getKafkaTopicAsn1DecoderInput());
msgMetadata.getSerialId().increment();
}
}

@Override
public void publish(byte[] payloadBytes) throws Exception {
OdeAsn1Payload payload = new OdeAsn1Payload(payloadBytes);
OdeLogMetadata metadata = new OdeLogMetadata(payload);
metadata.getSerialId().setBundleId(bundleId.get()).addRecordId(1);

OdeLogMetadata msgMetadata = new OdeLogMetadata(payload);
msgMetadata.setSerialId(serialId);

Asn1Encoding msgEncoding = new Asn1Encoding("root", "MessageFrame", EncodingRule.UPER);
metadata.addEncoding(msgEncoding);
OdeAsn1Data asn1Data = new OdeAsn1Data(metadata, payload);
msgMetadata.addEncoding(msgEncoding);
OdeAsn1Data asn1Data = new OdeAsn1Data(msgMetadata, payload);

// publisher.publish(asn1Data.toJson(false),
// publisher.getOdeProperties().getKafkaTopicAsn1EncodedBsm());
publisher.publish(XmlUtils.toXmlS(asn1Data), publisher.getOdeProperties().getKafkaTopicAsn1DecoderInput());
}

public int getNumberOfRecords(BufferedInputStream bis, String fileName, ImporterFileType fileType)
throws LogFileToAsn1CodecPublisherException {
ParserStatus status = ParserStatus.UNKNOWN;

if (fileType == ImporterFileType.OBU_LOG_FILE) {
fileParser = LogFileParser.factory(fileName);
} else {
status = ParserStatus.NA;
}

int numRecords = 0;
do {
try {
status = fileParser.parseFile(bis, fileName);
if (status == ParserStatus.COMPLETE) {
numRecords++;
} else if (status == ParserStatus.EOF) {
bis.close();
// if parser returns PARTIAL record, we will go back and continue
// parsing
// but if it's UNKNOWN, it means that we could not parse the
// header bytes
if (status == ParserStatus.INIT) {
logger.error("Failed to parse the header bytes.");
} else {
logger.error("Failed to decode ASN.1 data");
}
}
} catch (Exception e) {
throw new LogFileToAsn1CodecPublisherException("Error parsing or publishing data.", e);
}
} while (status == ParserStatus.COMPLETE);

return numRecords;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,22 @@ public void processAndBackupFile(Path filePath, Path backupDir, Path failureDir)
inputStream = new FileInputStream(filePath.toFile());
String probeContentType = Files.probeContentType(filePath);
if (probeContentType != null && gZipPattern.matcher(probeContentType).matches() || filePath.endsWith("gz")) {
inputStream = new GZIPInputStream(inputStream);
bis = publishFile(filePath, inputStream);
// Must create a new stream for counting the records. DO NOT use the same stream that parses the file
int numRecords = getNumberOfRecords(filePath, new GZIPInputStream(new FileInputStream(filePath.toFile())));
inputStream = new GZIPInputStream(inputStream);
bis = publishFile(filePath, inputStream, numRecords);
} else if (probeContentType != null && zipPattern.matcher(probeContentType).matches() || filePath.endsWith("zip")) {
// Must create a new stream for counting the records. DO NOT use the same stream that parses the file
int numRecords = getNumberOfRecords(filePath, new ZipInputStream(new FileInputStream(filePath.toFile())));
inputStream = new ZipInputStream(inputStream);
ZipInputStream zis = (ZipInputStream)inputStream;
while (zis.getNextEntry() != null) {
bis = publishFile(filePath, inputStream);
bis = publishFile(filePath, inputStream, numRecords);
}
} else {
bis = publishFile(filePath, inputStream);
// Must create a new stream for counting the records. DO NOT use the same stream that parses the file
int numRecords = getNumberOfRecords(filePath, new FileInputStream(filePath.toFile()));
bis = publishFile(filePath, inputStream, numRecords);
}
} catch (Exception e) {
success = false;
Expand Down Expand Up @@ -131,11 +137,18 @@ public void processAndBackupFile(Path filePath, Path backupDir, Path failureDir)
}
}

private BufferedInputStream publishFile(Path filePath, InputStream inputStream)
private BufferedInputStream publishFile(Path filePath, InputStream inputStream, int numRecords)
throws FileAsn1CodecPublisherException {
BufferedInputStream bis;
bis = new BufferedInputStream(inputStream, odeProperties.getImportProcessorBufferSize());
codecPublisher.publishFile(filePath, bis, fileType);
codecPublisher.publishFile(filePath, bis, fileType, numRecords);
return bis;
}

private int getNumberOfRecords(Path filePath, InputStream inputStream)
throws FileAsn1CodecPublisherException {
BufferedInputStream bis;
bis = new BufferedInputStream(inputStream, odeProperties.getImportProcessorBufferSize());
return codecPublisher.getNumberOfRecords(filePath, bis, fileType);
}
}
Loading

0 comments on commit 719810a

Please sign in to comment.