From fef612158f6294395f501fba2ead402a9958b381 Mon Sep 17 00:00:00 2001 From: Matt Conroy Date: Mon, 28 Mar 2016 17:30:55 -0600 Subject: [PATCH] - Add the required system implementation. Next up tests to confirm this works as expected. --- .../main/java/forklift/consumer/Consumer.java | 53 +++++++++++++++++-- .../forklift/consumer/ConsumerThread.java | 16 ++++++ .../java/forklift/consumer/FailedSystem.java | 7 +++ .../forklift/consumer/RequiredSystem.java | 6 +++ .../main/java/forklift/consumer/System.java | 19 +++++++ .../forklift/decorators/RequireSystem.java | 8 ++- .../forklift/decorators/RequireSystems.java | 16 ++++++ 7 files changed, 120 insertions(+), 5 deletions(-) create mode 100644 core/src/main/java/forklift/consumer/FailedSystem.java create mode 100644 core/src/main/java/forklift/consumer/RequiredSystem.java create mode 100644 core/src/main/java/forklift/consumer/System.java create mode 100644 core/src/main/java/forklift/decorators/RequireSystems.java diff --git a/core/src/main/java/forklift/consumer/Consumer.java b/core/src/main/java/forklift/consumer/Consumer.java index db3cbfa..8d86121 100644 --- a/core/src/main/java/forklift/consumer/Consumer.java +++ b/core/src/main/java/forklift/consumer/Consumer.java @@ -1,8 +1,8 @@ package forklift.consumer; import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; import forklift.classloader.RunAsClassLoader; import forklift.concurrent.Callback; import forklift.connectors.ConnectorException; @@ -12,11 +12,12 @@ import forklift.decorators.Config; import forklift.decorators.Headers; import forklift.decorators.MultiThreaded; +import forklift.decorators.On; import forklift.decorators.OnMessage; import forklift.decorators.OnValidate; -import forklift.decorators.On; import forklift.decorators.Ons; import forklift.decorators.Queue; +import forklift.decorators.RequireSystem; import forklift.decorators.Topic; import forklift.message.Header; import forklift.producers.ForkliftProducerI; @@ -60,6 +61,7 @@ public class Consumer { private Queue queue; private Topic topic; private List services; + private List requiredSystems; // If a queue can process multiple messages at a time we // use a thread pool to manage how much cpu load the queue can @@ -153,6 +155,14 @@ else if (m.isAnnotationPresent(On.class) || m.isAnnotationPresent(Ons.class)) Arrays.stream(m.getAnnotationsByType(On.class)).map(on -> on.value()).distinct().forEach(x -> onProcessStep.get(x).add(m)); } + requiredSystems = new ArrayList<>(); + Arrays.stream(msgHandler.getAnnotationsByType(RequireSystem.class)).forEach(rs -> { + try { + requiredSystems.add(new System(rs.value().newInstance(), rs.msTimeout())); + } catch (Exception ignored) { + } + }); + injectFields = new HashMap<>(); injectFields.put(Config.class, new HashMap<>()); injectFields.put(javax.inject.Inject.class, new HashMap<>()); @@ -180,7 +190,7 @@ else if (m.isAnnotationPresent(On.class) || m.isAnnotationPresent(Ons.class)) * get a new JMS consumer. */ public void listen() { - final ForkliftConsumerI consumer; + ForkliftConsumerI consumer = null; try { if (topic != null) consumer = connector.getTopic(topic.value()); @@ -193,6 +203,17 @@ else if (queue != null) } catch (ConnectorException e) { log.debug("", e); } + + // Ensure that the consumer if still connected is disconnected to return unack'd messages back to the queue. + try { + if (consumer != null) + consumer.close(); + } catch (JMSException e) { + log.debug("", e); + } + + // Clear the thread queue so that unack'd messages are reloaded to the processing queue later. + blockQueue.clear(); } public String getName() { @@ -206,6 +227,11 @@ public void messageLoop(ForkliftConsumerI consumer) { while (running.get()) { Message jmsMsg; while ((jmsMsg = consumer.receive(2500)) != null) { + // Verify that all systems are ready to go. If they aren't, punt and the + // wrapper thread can deal. + if (sysReqCheck().size() > 0) + return; + final ForkliftMessage msg = connector.jmsToForklift(jmsMsg); try { final Object handler = msgHandler.newInstance(); @@ -396,6 +422,27 @@ else if (producer.topic().length() > 0) return closeMe; } + /** + * Test all required systems to ensure availability. + * @return List of all failed systems. + */ + List sysReqCheck() { + final List failed = new ArrayList<>(); + + requiredSystems.forEach(s -> { + try { + if (!s.system().checkUp()) { + failed.add(new FailedSystem(s.system(), s.getRecoveryTime())); + log.info("Failed system check {}", s.system().desc()); + } + } catch (Throwable e) { + log.warn("", e); + } + }); + + return failed; + } + public Class getMsgHandler() { return msgHandler; } diff --git a/core/src/main/java/forklift/consumer/ConsumerThread.java b/core/src/main/java/forklift/consumer/ConsumerThread.java index 182c01f..38b9cb5 100644 --- a/core/src/main/java/forklift/consumer/ConsumerThread.java +++ b/core/src/main/java/forklift/consumer/ConsumerThread.java @@ -5,6 +5,7 @@ import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -41,6 +42,21 @@ public void run() { do { lastConnectAttemptTime = LocalDateTime.now(); + // Wait for all required systems to become available before starting the listener. + log.info("verifying available systems"); + synchronized(lock) { + List failed; + do { + failed = consumer.sysReqCheck(); + + try { + if (failed.size() > 0) + lock.wait(failed.stream().map(f -> f.getRecoveryTime()).max(Long::compare).get()); + } catch (InterruptedException ignored) { + } + } while (running.get() && failed.size() > 0); + } + log.info("starting consumer"); try { consumer.listen(); diff --git a/core/src/main/java/forklift/consumer/FailedSystem.java b/core/src/main/java/forklift/consumer/FailedSystem.java new file mode 100644 index 0000000..3e104ee --- /dev/null +++ b/core/src/main/java/forklift/consumer/FailedSystem.java @@ -0,0 +1,7 @@ +package forklift.consumer; + +class FailedSystem extends System { + FailedSystem(RequiredSystem system, long recoveryTime) { + super(system, recoveryTime); + } +} \ No newline at end of file diff --git a/core/src/main/java/forklift/consumer/RequiredSystem.java b/core/src/main/java/forklift/consumer/RequiredSystem.java new file mode 100644 index 0000000..92dba61 --- /dev/null +++ b/core/src/main/java/forklift/consumer/RequiredSystem.java @@ -0,0 +1,6 @@ +package forklift.consumer; + +public interface RequiredSystem { + boolean checkUp(); + String desc(); +} \ No newline at end of file diff --git a/core/src/main/java/forklift/consumer/System.java b/core/src/main/java/forklift/consumer/System.java new file mode 100644 index 0000000..8b2e536 --- /dev/null +++ b/core/src/main/java/forklift/consumer/System.java @@ -0,0 +1,19 @@ +package forklift.consumer; + +class System { + private RequiredSystem system; + private long recoveryTime; + + System(RequiredSystem system, long recoveryTime) { + this.system = system; + this.recoveryTime = recoveryTime; + } + + RequiredSystem system() { + return this.system; + } + + long getRecoveryTime() { + return this.recoveryTime; + } +} diff --git a/core/src/main/java/forklift/decorators/RequireSystem.java b/core/src/main/java/forklift/decorators/RequireSystem.java index 9c4fbbc..e203885 100644 --- a/core/src/main/java/forklift/decorators/RequireSystem.java +++ b/core/src/main/java/forklift/decorators/RequireSystem.java @@ -1,8 +1,11 @@ package forklift.decorators; +import forklift.consumer.RequiredSystem; + import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; +import java.lang.annotation.Repeatable; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @@ -13,9 +16,10 @@ */ @Documented @Inherited +@Repeatable(RequireSystems.class) @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface RequireSystem { - /** The system classes that define a required system service */ - String[] value() default ""; + Class value(); + long msTimeout() default 10000L; } diff --git a/core/src/main/java/forklift/decorators/RequireSystems.java b/core/src/main/java/forklift/decorators/RequireSystems.java new file mode 100644 index 0000000..b2bdff8 --- /dev/null +++ b/core/src/main/java/forklift/decorators/RequireSystems.java @@ -0,0 +1,16 @@ +package forklift.decorators; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface RequireSystems { + RequireSystem[] value(); +} \ No newline at end of file