Skip to content

Commit

Permalink
fixed queue close
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykang920 committed Jun 10, 2016
1 parent 4e06100 commit c2facd4
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 42 deletions.
20 changes: 11 additions & 9 deletions x2/src/main/java/x2/BlockingQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,25 @@
public class BlockingQueue<T> {
private Queue<T> queue;
private boolean closing;

public BlockingQueue() {
queue = new LinkedList<T>();
}

public void close() {
close(null);
}

public void close(T finalItem) {
synchronized (queue) {
queue.offer(finalItem);
if (finalItem != null) {
queue.offer(finalItem);
}
closing = true;
queue.notifyAll();
}
}

public T dequeue() {
synchronized (queue) {
while (queue.size() == 0) {
Expand All @@ -43,7 +45,7 @@ public T dequeue() {
return queue.poll();
}
}

public int dequeue(List<T> values) {
synchronized (queue) {
while (queue.size() == 0) {
Expand All @@ -65,7 +67,7 @@ public int dequeue(List<T> values) {
return n;
}
}

public void enqueue(T item) {
synchronized (queue) {
if (!closing) {
Expand All @@ -76,7 +78,7 @@ public void enqueue(T item) {
}
}
}

public T tryDequeue() {
synchronized (queue) {
if (queue.size() == 0) {
Expand All @@ -85,7 +87,7 @@ public T tryDequeue() {
return queue.poll();
}
}

public int tryDequeue(List<T> values) {
synchronized (queue) {
if (queue.size() == 0) {
Expand Down
42 changes: 21 additions & 21 deletions x2/src/main/java/x2/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,90 +11,90 @@ public abstract class Flow {
protected static ThreadLocal<Event.Equivalent> equivalent;
protected static ThreadLocal<List<Event>> events;
protected static ThreadLocal<List<Handler>> handlerChain;

protected Binder binder;
protected CaseStack caseStack;
protected String name;

static {
current = new ThreadLocal<Flow>();
equivalent = new ThreadLocal<Event.Equivalent>();
events = new ThreadLocal<List<Event>>();
handlerChain = new ThreadLocal<List<Handler>>();
}

protected Flow() {
binder = new Binder();
caseStack = new CaseStack();
name = getClass().getName();
}

/** Adds the specified case to this flow. */
public Flow add(Setupable o) {
caseStack.add(o);
return this;
}

public static Binder.Token bind(Event e, Handler handler) {
return current().subscribe(e, handler);
}

/** Gets the current flow in which the current thread is running. */
public static Flow current() {
return current.get();
}

/** Sets the current flow in which the current thread is running. */
public static void current(Flow value) {
current.set(value);
}

public abstract void feed(Event e);

/** Gets the name of this flow. */
public String name() {
return name;
}

/** Removes the specified case from this flow. */
public Flow remove(Setupable o) {
caseStack.remove(o);
return this;
}

public abstract void shutdown();

public abstract Flow startup();

public Binder.Token subscribe(Event e, Handler handler) {
return binder.bind(e, handler);
}

public static void unbind(Binder.Token token) {
current().unbind(token);
current().unsubscribe(token);
}

public void unsubscribe(Binder.Token token) {
binder.unbind(token);
}

protected void setup() {
}

protected void teardown() {
}

protected void dispatch(Event e) {
List<Handler> handlers = handlerChain.get();
if (handlers.size() != 0) {
handlers.clear();
}

int chainLength = binder.buildHandlerChain(e, equivalent.get(), handlers);
if (chainLength == 0) {
return;
}

for (int i = 0, count = handlers.size(); i < count; ++i) {
Handler handler = handlers.get(i);
try {
Expand All @@ -104,7 +104,7 @@ protected void dispatch(Event e) {
// TODO: handle exception
}
}

handlers.clear();
}
}
27 changes: 15 additions & 12 deletions x2/src/main/java/x2/flows/SingleThreadFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
/** Represents a finite set of application logic. */
public class SingleThreadFlow extends EventBasedFlow implements Runnable {
protected Thread thread;

public SingleThreadFlow() {
}

public SingleThreadFlow(String name) {
this.name = name;
}


@Override
public Flow startup() {
synchronized (syncRoot) {
if (thread == null) {
Expand All @@ -30,33 +31,35 @@ public Flow startup() {
}
return this;
}


@Override
public void shutdown() {
synchronized (syncRoot) {
if (thread == null) {
return;
}
// enquque FlowStop and quit queue
// enquque FlowStop
queue.close();
try {
thread.join();
}
catch (InterruptedException ie) { }
thread = null;

caseStack.teardown(this);
teardown();
}
}

public void run() {
current.set(this);

equivalent.set(new Event.Equivalent());
events.set(new ArrayList<Event>());
handlerChain.set(new ArrayList<Handler>());

List<Event> dequeued = events.get();

while (true) {
if (queue.dequeue(dequeued) == 0) {
break;
Expand All @@ -66,11 +69,11 @@ public void run() {
}
dequeued.clear();
}

handlerChain.set(null);
events.set(null);
equivalent.set(null);

current.set(null);
}
}

0 comments on commit c2facd4

Please sign in to comment.