Skip to content

Commit

Permalink
Merge pull request usdot-jpo-ode#212 from usdot-jpo-ode/tim_query_old
Browse files Browse the repository at this point in the history
Adding old tim query back
  • Loading branch information
tonychen091 committed Dec 14, 2017
2 parents 514db56 + 43fea0e commit 7f1c157
Showing 1 changed file with 88 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,6 +87,8 @@ public TimControllerException(String errMsg, Exception e) {
private MessageProducer<String, String> stringMsgProducer;
private MessageProducer<String, OdeObject> timProducer;

private ExecutorService threadPool;

@Autowired
public TimController(OdeProperties odeProperties) {
super();
Expand All @@ -90,7 +98,85 @@ public TimController(OdeProperties odeProperties) {
odeProperties.getKafkaProducerType());
this.timProducer = new MessageProducer<>(odeProperties.getKafkaBrokers(), odeProperties.getKafkaProducerType(),
null, OdeTimSerializer.class.getName());

this.threadPool = Executors.newFixedThreadPool(odeProperties.getRsuSrmSlots() * 3);
}






/**
* Checks given RSU for all TIMs set
*
* @param jsonString
* Request body containing RSU info
* @return list of occupied TIM slots on RSU
*/
@ResponseBody
@CrossOrigin
@RequestMapping(value = "/tim/queryold", method = RequestMethod.POST)
public synchronized ResponseEntity<String> asyncQueryForTims(@RequestBody String jsonString) { // NOSONAR

if (null == jsonString || jsonString.isEmpty()) {
logger.error("Empty request.");
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(jsonKeyValue(ERRSTR, "Empty request."));
}

ConcurrentSkipListMap<Integer, Integer> resultTable = new ConcurrentSkipListMap<>();
RSU queryTarget = (RSU) JsonUtils.fromJson(jsonString, RSU.class);

SnmpSession snmpSession = null;
try {
snmpSession = new SnmpSession(queryTarget);
snmpSession.startListen();
} catch (IOException e) {
logger.error("Error creating SNMP session.", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(jsonKeyValue(ERRSTR, "Failed to create SNMP session."));
}

// Repeatedly query the RSU to establish set rows
List<Callable<Object>> queryThreadList = new ArrayList<>();

for (int i = 0; i < odeProperties.getRsuSrmSlots(); i++) {
ScopedPDU pdu = new ScopedPDU();
pdu.add(new VariableBinding(new OID("1.0.15628.4.1.4.1.11.".concat(Integer.toString(i)))));
pdu.setType(PDU.GET);
queryThreadList.add(Executors
.callable(new TimQueryThread(snmpSession.getSnmp(), pdu, snmpSession.getTarget(), resultTable, i)));
}

try {
threadPool.invokeAll(queryThreadList);
} catch (InterruptedException e) { // NOSONAR
logger.error("Error submitting query threads for execution.", e);
threadPool.shutdownNow();
}

try {
snmpSession.endSession();
} catch (IOException e) {
logger.error("Error closing SNMP session.", e);
}

if (resultTable.containsValue(TimQueryThread.TIMEOUT_FLAG)) {
logger.error("TIM query timed out.");
return ResponseEntity.status(HttpStatus.BAD_REQUEST)
.body(jsonKeyValue(ERRSTR, "Query timeout, increase retries."));
} else {
logger.info("TIM query successful: {}", resultTable.keySet());
return ResponseEntity.status(HttpStatus.OK).body("{\"indicies_set\":" + resultTable.keySet() + "}");
}
}








/**
* Checks given RSU for all TIMs set
Expand Down Expand Up @@ -127,6 +213,8 @@ public synchronized ResponseEntity<String> bulkQuery(@RequestBody String jsonStr
for (int i = 0; i < odeProperties.getRsuSrmSlots(); i++) {
pdu.add(new VariableBinding(new OID("1.0.15628.4.1.4.1.11.".concat(Integer.toString(i)))));
}

logger.debug("VARIABLE BINDINGS?!?!?!: {}", pdu.getVariableBindings());

ResponseEvent response = null;
try {
Expand Down

0 comments on commit 7f1c157

Please sign in to comment.