Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/upgrade native packager #8

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connectors/activemq/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ libraryDependencies ++= Seq(
"org.apache.geronimo.specs" % "geronimo-jms_1.1_spec" % "1.1.1",
"org.apache.activemq" % "activemq-all" % "5.8.0",
"commons-io" % "commons-io" % "2.4" % "test",
"com.novocode" % "junit-interface" % "0.10-M4" % "test"
"com.novocode" % "junit-interface" % "0.10" % "test"
)

crossPaths := false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,39 @@

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQTextMessage;

public class ActiveMQConnector implements ForkliftConnectorI {
private ActiveMQConnectionFactory factory;
private ActiveMQConnection conn;
private ActiveMQConnection conn;
private String brokerUrl;

public ActiveMQConnector() {

}

public ActiveMQConnector(String brokerUrl) {
this.brokerUrl = brokerUrl;
}

@Override
public synchronized void start()
public synchronized void start()
throws ConnectorException {
if (brokerUrl == null)
throw new ConnectorException("brokerUrl wasn't set");

factory = new ActiveMQConnectionFactory("", "", brokerUrl);
}

@Override
public synchronized void stop()
public synchronized void stop()
throws ConnectorException {
if (conn != null)
try {
Expand All @@ -43,7 +45,7 @@ public synchronized void stop()
}

@Override
public synchronized Connection getConnection()
public synchronized Connection getConnection()
throws ConnectorException {
if (conn == null || !conn.isStarted())
try {
Expand All @@ -55,11 +57,11 @@ public synchronized Connection getConnection()

if (conn == null)
throw new ConnectorException("Could not create connection to activemq.");

return conn;
}
public synchronized Session getSession()

public synchronized Session getSession()
throws ConnectorException {
try {
return getConnection().createSession(
Expand All @@ -68,9 +70,9 @@ public synchronized Session getSession()
throw new ConnectorException(e.getMessage());
}
}

@Override
public MessageConsumer getQueue(String name)
public MessageConsumer getQueue(String name)
throws ConnectorException {
final Session s = getSession();
try {
Expand All @@ -79,9 +81,9 @@ public MessageConsumer getQueue(String name)
throw new ConnectorException(e.getMessage());
}
}

@Override
public MessageConsumer getTopic(String name)
public MessageConsumer getTopic(String name)
throws ConnectorException {
final Session s = getSession();
try {
Expand All @@ -90,4 +92,15 @@ public MessageConsumer getTopic(String name)
throw new ConnectorException(e.getMessage());
}
}

@Override
public ForkliftMessage jmsToForklift(Message m) {
try {
final ForkliftMessage msg = new ForkliftMessage();
msg.setMsg(((ActiveMQTextMessage)m).getText());
return msg;
} catch (JMSException e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package forklift.consumer;

import forklift.decorators.OnMessage;
import forklift.decorators.Queue;

@Queue("test")
public class TestQueueConsumer {

@OnMessage
public void handle() {
}
}
1 change: 1 addition & 0 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % "1.0.13",
"org.apache.geronimo.specs" % "geronimo-jms_1.1_spec" % "1.1.1",
"org.reflections" % "reflections" % "0.9.9-RC1",
"org.mockito" % "mockito-all" % "1.9.5" % "test",
"com.novocode" % "junit-interface" % "0.10-M4" % "test"
)

Expand Down
1 change: 1 addition & 0 deletions core/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "0.7.1")
84 changes: 22 additions & 62 deletions core/src/main/java/forklift/Forklift.java
Original file line number Diff line number Diff line change
@@ -1,108 +1,68 @@
package forklift;

import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

import forklift.exception.StartupException;
import forklift.spring.ContextManager;

/**
* Main ForkLift application instance. ForkLift is started here
* and stopped here.
*/
public class Forklift {
private static Logger log = LoggerFactory.getLogger("ForkLift");

public ApplicationContext context;

private boolean classpath;

private AtomicBoolean running = new AtomicBoolean(false);

public Forklift() {
log.debug("Creating ForkLift");

Runtime.getRuntime().addShutdownHook(new ForkliftShutdown(this));
}
public synchronized void start()

public synchronized void start()
throws StartupException {
start("services.xml");
start(ForkliftConfig.class);
}

public synchronized void start(String resource)
throws StartupException {
log.debug("Initializing Spring Context from Classpath");
try {
context = new ClassPathXmlApplicationContext(resource);
} catch (Exception e) {
throw new StartupException(e.getMessage());
}
((ClassPathXmlApplicationContext)context).registerShutdownHook();

classpath = true;

running.set(true);
}

public synchronized void start(File configFile)

public synchronized void start(Class<?> config)
throws StartupException {
log.debug("Initializing Spring Context from File {}", configFile.getAbsolutePath());
log.debug("Initializing Spring Context");

try {
context = new FileSystemXmlApplicationContext("file:https://" + configFile.getAbsolutePath());
ContextManager.start(config);
log.debug("Init complete!");
} catch (Exception e) {
log.debug("An error occurred starting the spring context");
throw new StartupException(e.getMessage());
}
((FileSystemXmlApplicationContext)context).registerShutdownHook();

classpath = false;


running.set(true);
}

public void shutdown() {
if (!running.getAndSet(false))
return;

if (classpath)
((ClassPathXmlApplicationContext)context).close();
else
((FileSystemXmlApplicationContext)context).close();

ContextManager.stop();
}

public boolean isRunning() {
return running.get();
}

public ApplicationContext getContext() {
return context;
}

public static void main(String args[])

public static void main(String args[])
throws StartupException {
final Forklift forklift = mainWithTestHook(args);
if (!forklift.isRunning())
throw new RuntimeException("Unable to start Forklift.");
}

public static Forklift mainWithTestHook(String args[])
throws StartupException {
if (args.length != 1) {
System.err.println("Config file not specified.");
return null;
}

public static Forklift mainWithTestHook(String args[])
throws StartupException {
final Forklift forklift = new Forklift();
final File f = new File(args[0]);
if (f.exists())
forklift.start(f);
else
forklift.start(args[0]);

forklift.start();
return forklift;
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/forklift/ForkliftConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package forklift;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan(value={"forklift"})
public class ForkliftConfig {

}
18 changes: 0 additions & 18 deletions core/src/main/java/forklift/ForkliftShutdown.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package forklift.connectors;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;

public interface ForkliftConnectorI {
Expand All @@ -10,4 +11,11 @@ public interface ForkliftConnectorI {
// Session getSession() throws ConnectorException;
MessageConsumer getQueue(String name) throws ConnectorException;
MessageConsumer getTopic(String name) throws ConnectorException;

/**
* Convert a jms message to a forklift message.
* @param m - the message to process
* @return - a new ForkliftMessage.
*/
ForkliftMessage jmsToForklift(Message m);
}
13 changes: 13 additions & 0 deletions core/src/main/java/forklift/connectors/ForkliftMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package forklift.connectors;

public class ForkliftMessage {
private String msg;

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}
}
20 changes: 9 additions & 11 deletions core/src/main/java/forklift/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,33 @@
import java.util.Map;
import java.util.Set;

import forklift.connectors.ForkliftConnectorI;
import forklift.decorators.Queue;
import forklift.decorators.Topic;
import forklift.spring.ContextManager;

/**
* Consumer is responsible for orchestration of moving the message from the broker
* to the appropriate msgHandler.
* to the appropriate msgHandler.
* @author mattconroy
*
*/
public class Consumer {
private Integer id = null;
private Map<String, Listener> listeners = new HashMap<String, Listener>();

public Consumer(Set<Class<?>> msgHandlers) {
for (Class<?> c : msgHandlers) {
Queue q = c.getAnnotation(Queue.class);
Topic t = c.getAnnotation(Topic.class);

final Listener l = new Listener();
if (q != null) {

} else if (t != null) {

}


final Listener l = new Listener(
q, t, c, ContextManager.getContext().getBean(ForkliftConnectorI.class));
listeners.put(l.getName(), l);
l.listen();
}
}

void setId(Integer id) {
this.id = id;
}
Expand Down
Loading