Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

- Add the required system implementation. Next up tests to confirm th… #99

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 50 additions & 3 deletions core/src/main/java/forklift/consumer/Consumer.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package forklift.consumer;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import forklift.classloader.RunAsClassLoader;
import forklift.concurrent.Callback;
import forklift.connectors.ConnectorException;
Expand All @@ -12,11 +12,12 @@
import forklift.decorators.Config;
import forklift.decorators.Headers;
import forklift.decorators.MultiThreaded;
import forklift.decorators.On;
import forklift.decorators.OnMessage;
import forklift.decorators.OnValidate;
import forklift.decorators.On;
import forklift.decorators.Ons;
import forklift.decorators.Queue;
import forklift.decorators.RequireSystem;
import forklift.decorators.Topic;
import forklift.message.Header;
import forklift.producers.ForkliftProducerI;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class Consumer {
private Queue queue;
private Topic topic;
private List<ConsumerService> services;
private List<System> requiredSystems;

// If a queue can process multiple messages at a time we
// use a thread pool to manage how much cpu load the queue can
Expand Down Expand Up @@ -153,6 +155,14 @@ else if (m.isAnnotationPresent(On.class) || m.isAnnotationPresent(Ons.class))
Arrays.stream(m.getAnnotationsByType(On.class)).map(on -> on.value()).distinct().forEach(x -> onProcessStep.get(x).add(m));
}

requiredSystems = new ArrayList<>();
Arrays.stream(msgHandler.getAnnotationsByType(RequireSystem.class)).forEach(rs -> {
try {
requiredSystems.add(new System(rs.value().newInstance(), rs.msTimeout()));
} catch (Exception ignored) {
}
});

injectFields = new HashMap<>();
injectFields.put(Config.class, new HashMap<>());
injectFields.put(javax.inject.Inject.class, new HashMap<>());
Expand Down Expand Up @@ -180,7 +190,7 @@ else if (m.isAnnotationPresent(On.class) || m.isAnnotationPresent(Ons.class))
* get a new JMS consumer.
*/
public void listen() {
final ForkliftConsumerI consumer;
ForkliftConsumerI consumer = null;
try {
if (topic != null)
consumer = connector.getTopic(topic.value());
Expand All @@ -193,6 +203,17 @@ else if (queue != null)
} catch (ConnectorException e) {
log.debug("", e);
}

// Ensure that the consumer if still connected is disconnected to return unack'd messages back to the queue.
try {
if (consumer != null)
consumer.close();
} catch (JMSException e) {
log.debug("", e);
}

// Clear the thread queue so that unack'd messages are reloaded to the processing queue later.
blockQueue.clear();
}

public String getName() {
Expand All @@ -206,6 +227,11 @@ public void messageLoop(ForkliftConsumerI consumer) {
while (running.get()) {
Message jmsMsg;
while ((jmsMsg = consumer.receive(2500)) != null) {
// Verify that all systems are ready to go. If they aren't, punt and the
// wrapper thread can deal.
if (sysReqCheck().size() > 0)
return;

final ForkliftMessage msg = connector.jmsToForklift(jmsMsg);
try {
final Object handler = msgHandler.newInstance();
Expand Down Expand Up @@ -396,6 +422,27 @@ else if (producer.topic().length() > 0)
return closeMe;
}

/**
* Test all required systems to ensure availability.
* @return List of all failed systems.
*/
List<FailedSystem> sysReqCheck() {
final List<FailedSystem> failed = new ArrayList<>();

requiredSystems.forEach(s -> {
try {
if (!s.system().checkUp()) {
failed.add(new FailedSystem(s.system(), s.getRecoveryTime()));
log.info("Failed system check {}", s.system().desc());
}
} catch (Throwable e) {
log.warn("", e);
}
});

return failed;
}

public Class<?> getMsgHandler() {
return msgHandler;
}
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/forklift/consumer/ConsumerThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -41,6 +42,21 @@ public void run() {
do {
lastConnectAttemptTime = LocalDateTime.now();

// Wait for all required systems to become available before starting the listener.
log.info("verifying available systems");
synchronized(lock) {
List<FailedSystem> failed;
do {
failed = consumer.sysReqCheck();

try {
if (failed.size() > 0)
lock.wait(failed.stream().map(f -> f.getRecoveryTime()).max(Long::compare).get());
} catch (InterruptedException ignored) {
}
} while (running.get() && failed.size() > 0);
}

log.info("starting consumer");
try {
consumer.listen();
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/forklift/consumer/FailedSystem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package forklift.consumer;

class FailedSystem extends System {
FailedSystem(RequiredSystem system, long recoveryTime) {
super(system, recoveryTime);
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/forklift/consumer/RequiredSystem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package forklift.consumer;

public interface RequiredSystem {
boolean checkUp();
String desc();
}
19 changes: 19 additions & 0 deletions core/src/main/java/forklift/consumer/System.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package forklift.consumer;

class System {
private RequiredSystem system;
private long recoveryTime;

System(RequiredSystem system, long recoveryTime) {
this.system = system;
this.recoveryTime = recoveryTime;
}

RequiredSystem system() {
return this.system;
}

long getRecoveryTime() {
return this.recoveryTime;
}
}
8 changes: 6 additions & 2 deletions core/src/main/java/forklift/decorators/RequireSystem.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package forklift.decorators;

import forklift.consumer.RequiredSystem;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
Expand All @@ -13,9 +16,10 @@
*/
@Documented
@Inherited
@Repeatable(RequireSystems.class)
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RequireSystem {
/** The system classes that define a required system service */
String[] value() default "";
Class<? extends RequiredSystem> value();
long msTimeout() default 10000L;
}
16 changes: 16 additions & 0 deletions core/src/main/java/forklift/decorators/RequireSystems.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package forklift.decorators;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Documented
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RequireSystems {
RequireSystem[] value();
}