Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Making CEP fault handling window processors resilient against issues …
Browse files Browse the repository at this point in the history
…in CEP. Wait for a given timeout or the first event is received before processing health stats. This is to avoid false positive faulty members. Make CEP fault handling window processor wait until complete topology event is received at the startup.
  • Loading branch information
ravihansa3000 committed Jun 17, 2016
1 parent b063eb4 commit a3ab4ae
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,16 @@
* CEP Topology Receiver for Fault Handling Window Processor.
*/
public class CEPTopologyEventReceiver {

private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);

private FaultHandlingWindowProcessor faultHandler;
private TopologyEventReceiver topologyEventReceiver;

public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
this.faultHandler = faultHandler;
this.topologyEventReceiver = TopologyEventReceiver.getInstance();
addEventListeners();
}

// @Override
// public void execute() {
// super.execute();
// log.info("CEP topology event receiver thread started");
// }

private void addEventListeners() {
// Load member time stamp map from the topology as a one time task
topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
Expand All @@ -63,7 +55,7 @@ protected void onEvent(Event event) {
if (!initialized) {
try {
TopologyManager.acquireReadLock();
log.debug("Complete topology event received to fault handling window processor.");
log.info("Complete topology event received to fault handling window processor.");
CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event;
initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
} catch (Exception e) {
Expand All @@ -81,7 +73,11 @@ protected void onEvent(Event event) {
protected void onEvent(Event event) {
MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId());
if (log.isDebugEnabled()) {
log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent
.getMemberId());

}
}
});

Expand All @@ -94,8 +90,14 @@ protected void onEvent(Event event) {
// do not put this member if we have already received a health event
faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(),
System.currentTimeMillis());
log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
if (log.isDebugEnabled()) {
log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
}
}
});
}

void destroy() {
topologyEventReceiver.terminate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.topology.*;
Expand All @@ -45,26 +44,26 @@
import org.wso2.siddhi.query.api.expression.constant.LongConstant;
import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* CEP window processor to handle faulty member instances. This window processor is responsible for
* publishing MemberFault event if health stats are not received within a given time window.
*/
@SiddhiExtension(namespace = "stratos",
function = "faultHandling")
@SiddhiExtension(namespace = "stratos", function = "faultHandling")
public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {

private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);

private static final String ACTIVATE_TIMEOUT_KEY = "cep.fault.handler.extension.activate.timeout";
private static final int ACTIVATE_TIMEOUT =
Integer.getInteger(ACTIVATE_TIMEOUT_KEY, 60 * 1000 * 15);
private static final int TIME_OUT = 60 * 1000;
public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool";
public static final int CEP_EXTENSION_THREAD_POOL_SIZE = 10;

private ExecutorService executorService;
private ScheduledExecutorService faultHandleScheduler;
private ScheduledFuture<?> lastSchedule;
private ThreadBarrier threadBarrier;
Expand All @@ -77,6 +76,9 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run

// Map of member id's to their last received health event time stamp
private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
private volatile boolean isActive;
private volatile boolean hasMemberTimeStampMapInitialized;
private long startTime = System.currentTimeMillis();

// Event receiver to receive topology events published by cloud-controller
private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
Expand All @@ -101,7 +103,11 @@ protected void processEvent(InListEvent listEvent) {
*
* @param event Event received by Siddhi.
*/
protected void addDataToMap(InEvent event) {
private void addDataToMap(InEvent event) {
if (!isActive) {
log.info("Received first event. Marking fault handling window processor as active");
isActive = true;
}
String id = (String) event.getData()[memberIdAttrIndex];
//checking whether this member is the topology.
//sometimes there can be a delay between publishing member terminated events
Expand Down Expand Up @@ -143,7 +149,6 @@ public Iterator<StreamEvent> iterator(String predicate) {
* @param topology Topology model object
*/
boolean loadTimeStampMapFromTopology(Topology topology) {

long currentTimeStamp = System.currentTimeMillis();
if (topology == null || topology.getServices() == null) {
return false;
Expand All @@ -164,10 +169,10 @@ boolean loadTimeStampMapFromTopology(Topology topology) {
}
}
}

if (log.isDebugEnabled()) {
log.debug(
"Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap);
hasMemberTimeStampMapInitialized = true;
if (log.isInfoEnabled()) {
log.info("Member timestamps were successfully loaded from the topology: [timestamps] " +
Arrays.toString(memberTimeStampMap.entrySet().toArray()));
}
return true;
}
Expand Down Expand Up @@ -222,7 +227,19 @@ private void publishMemberFault(Member member) {
@Override
public void run() {
try {
// wait until the first event OR given timeout to expire in order to activate this window processor
// this is to prevent false positives at the CEP startup
if (!isActive && System.currentTimeMillis() - startTime > ACTIVATE_TIMEOUT) {
log.info("Activation wait timeout has expired. Marking fault handling window processor as active");
isActive = true;
}
// do not process events until memberTimeStampMap is initialized and window processor is activated
// memberTimeStampMap will be initialized only after receiving the complete topology event
if (!(isActive && hasMemberTimeStampMapInitialized)) {
return;
}
threadBarrier.pass();

for (Object o : memberTimeStampMap.entrySet()) {
Map.Entry pair = (Map.Entry) o;
long currentTime = System.currentTimeMillis();
Expand Down Expand Up @@ -255,7 +272,7 @@ public void run() {

@Override
protected Object[] currentState() {
return new Object[] { window.currentState() };
return new Object[]{window.currentState()};
}

@Override
Expand All @@ -267,8 +284,8 @@ protected void restoreState(Object[] data) {

@Override
protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {

AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext
siddhiContext) {
if (parameters[0] instanceof IntConstant) {
timeToKeep = ((IntConstant) parameters[0]).getValue();
} else {
Expand All @@ -286,17 +303,13 @@ protected void init(Expression[] parameters, QueryPostProcessingElement nextProc
MemberFaultEventMap
.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);

// executorService = StratosThreadPool
// .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
// cepTopologyEventReceiver.setExecutorService(executorService);
// cepTopologyEventReceiver.execute();

//Ordinary scheduling
window.schedule();
if (log.isDebugEnabled()) {
log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
if (log.isInfoEnabled()) {
log.info(String.format("Fault handling window processor initialized with [timeToKeep] %s, " +
"[memberIdAttrName] %s, [memberIdAttrIndex] %s, [distributed-enabled] %s, " +
"[activate-timeout] %d", timeToKeep, memberIdAttrName, memberIdAttrIndex,
siddhiContext.isDistributedProcessingEnabled(), ACTIVATE_TIMEOUT));
}
}

Expand Down Expand Up @@ -329,20 +342,11 @@ public void setThreadBarrier(ThreadBarrier threadBarrier) {
@Override
public void destroy() {
// terminate topology listener thread
// cepTopologyEventReceiver.terminate();
cepTopologyEventReceiver.destroy();
window = null;

// Shutdown executor service
if (executorService != null) {
try {
executorService.shutdownNow();
} catch (Exception e) {
log.warn("An error occurred while shutting down cep extension executor service", e);
}
}
}

public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
return memberTimeStampMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,16 @@
* CEP Topology Receiver for Fault Handling Window Processor.
*/
public class CEPTopologyEventReceiver {

private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);

private FaultHandlingWindowProcessor faultHandler;
private TopologyEventReceiver topologyEventReceiver;

public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
this.faultHandler = faultHandler;
this.topologyEventReceiver = TopologyEventReceiver.getInstance();
addEventListeners();
}

// @Override
// public void execute() {
// super.execute();
// log.info("CEP topology event receiver thread started");
// }

private void addEventListeners() {
// Load member time stamp map from the topology as a one time task
topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
Expand All @@ -63,7 +55,7 @@ protected void onEvent(Event event) {
if (!initialized) {
try {
TopologyManager.acquireReadLock();
log.debug("Complete topology event received to fault handling window processor.");
log.info("Complete topology event received to fault handling window processor.");
CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event;
initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
} catch (Exception e) {
Expand All @@ -81,7 +73,11 @@ protected void onEvent(Event event) {
protected void onEvent(Event event) {
MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent.getMemberId());
if (log.isDebugEnabled()) {
log.debug("Member was removed from the timestamp map: [member] " + memberTerminatedEvent
.getMemberId());

}
}
});

Expand All @@ -94,8 +90,14 @@ protected void onEvent(Event event) {
// do not put this member if we have already received a health event
faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(),
System.currentTimeMillis());
log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
if (log.isDebugEnabled()) {
log.debug("Member was added to the timestamp map: [member] " + memberActivatedEvent.getMemberId());
}
}
});
}

void destroy() {
topologyEventReceiver.terminate();
}
}
Loading

0 comments on commit a3ab4ae

Please sign in to comment.