Skip to content

Commit

Permalink
added link base classes
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykang920 committed Aug 27, 2016
1 parent 4e9815c commit ae18fd3
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 5 deletions.
2 changes: 1 addition & 1 deletion x2/src/main/java/x2/EventSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

/** Cleanup helper base class for any event-consuming classes. */
public abstract class EventSink {
private volatile boolean closed;
protected volatile boolean closed;

private ArrayList<Binder.Token> bindings;
private WeakReference<Flow> flow;
Expand Down
4 changes: 4 additions & 0 deletions x2/src/main/java/x2/Link.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ protected Link(String name) {
}

/** Closes this link and releases all the associated resources. */
@Override
public void close() {
if (closed) { return; }

synchronized (names) {
names.remove(name);
}

super.close();
}

Expand Down
62 changes: 62 additions & 0 deletions x2/src/main/java/x2/links/ClientLink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2016 Jae-jun Kang
// See the file LICENSE for details.

package x2.links;

import java.util.concurrent.locks.*;

import x2.*;

/** Common base class for single-session client links. */
public abstract class ClientLink extends SessionBasedLink {
protected LinkSession session; // current link session

protected ClientLink(String name) {
super(name);
}

@Override
public void close() {
if (closed) { return; }

LinkSession session = null;
Lock wlock = rwlock.writeLock();
wlock.lock();
try {
if (this.session != null) {
session = this.session;
this.session = null;
}
}
finally {
wlock.unlock();
}

if (session != null) {
session.close();
}

super.close();
}

@Override
public void send(Event e) {
LinkSession currentSession = session();
if (currentSession == null) {
// drop event
}
currentSession.send(e);
}

public LinkSession session() {
Lock rlock = rwlock.readLock();
rlock.lock();
try {
return session;
}
finally {
rlock.unlock();
}
}
}

18 changes: 18 additions & 0 deletions x2/src/main/java/x2/links/LinkSession.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2016 Jae-jun Kang
// See the file LICENSE for details.

package x2.links;

import x2.*;

public abstract class LinkSession {
protected int handle;
protected SessionBasedLink link;

public void close() {
}

/** Sends out the specified event through this link session. */
public void send(Event e) {
}
}
80 changes: 80 additions & 0 deletions x2/src/main/java/x2/links/ServerLink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) 2016 Jae-jun Kang
// See the file LICENSE for details.

package x2.links;

import java.util.*;
import java.util.concurrent.locks.*;

import x2.*;

/** Common base class for multi-session server links. */
public abstract class ServerLink extends SessionBasedLink {
protected HashMap<Integer, LinkSession> sessions;

protected ServerLink(String name) {
super(name);
sessions = new HashMap<Integer, LinkSession>();
}

/** Broadcasts the specified event to all the connected clients. */
public void broadcast(Event e) {
Lock rlock = rwlock.readLock();
rlock.lock();
ArrayList<LinkSession> snapshot;
try {
snapshot = new ArrayList<LinkSession>(sessions.values());
}
finally {
rlock.unlock();
}
for (int i = 0, count = snapshot.size(); i < count; ++i) {
snapshot.get(i).send(e);
}
}

@Override
public void close() {
if (closed) { return; }

// Close all the active sessions
Lock rlock = rwlock.readLock();
rlock.lock();
ArrayList<LinkSession> snapshot;
try {
snapshot = new ArrayList<LinkSession>(sessions.values());
}
finally {
rlock.unlock();
}
for (int i = 0, count = snapshot.size(); i < count; ++i) {
snapshot.get(i).close();
}

Lock wlock = rwlock.writeLock();
wlock.lock();
try {
sessions.clear();
}
finally {
wlock.unlock();
}

super.close();
}

/** Sends out the specified event through this link channel. */
public void send(Event e) {
Lock rlock = rwlock.readLock();
rlock.lock();
try {
LinkSession session = sessions.get(e._getHandle());
if (session != null) {
session.send(e);
}
}
finally {
rlock.unlock();
}
}
}
4 changes: 2 additions & 2 deletions x2/src/main/java/x2/links/SessionBasedLink.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

import x2.*;

/** Abstract base class for session-based links. */
/** Abstract base class for session-based links. */
public abstract class SessionBasedLink extends Link {
private ReadWriteLock rwlock;
protected ReadWriteLock rwlock;

static {
// event factory registration here
Expand Down
6 changes: 5 additions & 1 deletion x2/src/main/java/x2/links/socket/TcpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

package x2.links.socket;

public class TcpClient {
import x2.links.*;

public class TcpClient extends ClientLink {
public TcpClient(String name) {
super(name);
}
}
29 changes: 28 additions & 1 deletion x2/src/main/java/x2/links/socket/TcpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,32 @@

package x2.links.socket;

public class TcpServer {
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;

import x2.links.*;

public class TcpServer extends ServerLink {
private ServerSocketChannel ssc;

public TcpServer(String name) {
super(name);
}

@Override
public void close() {

}

public void listen(int port) throws IOException {
listen(new InetSocketAddress(port));
}

public void listen(InetSocketAddress socketAddress) throws IOException {
ssc = ServerSocketChannel.open();
ssc.socket().bind(socketAddress);
ssc.configureBlocking(false);
}
}

0 comments on commit ae18fd3

Please sign in to comment.