Skip to content
This repository has been archived by the owner on Dec 17, 2018. It is now read-only.

Commit

Permalink
Prevent RaftNetworkClient.{start,stop} from being interleaved
Browse files Browse the repository at this point in the history
Resolves #22.
  • Loading branch information
allengeorge committed Jan 23, 2014
1 parent 674a3f2 commit bdcfcb7
Showing 1 changed file with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -105,12 +104,13 @@
* </ol>
* This means that {@code reconnectInterval} has the range
* {@code [minReconnectInterval, minReconnectInterval + additionalReconnectIntervalRange]}.
* <p/>
* This component is thread-safe.
*/
public final class RaftNetworkClient implements RPCSender {

private static final Logger LOGGER = LoggerFactory.getLogger(RaftNetworkClient.class);

private final AtomicBoolean running = new AtomicBoolean(false);
private final ConcurrentMap<String, RaftMember> cluster = Maps.newConcurrentMap();
private final ChannelGroup clusterChannelGroup = new DefaultChannelGroup("cluster clients");
private final Random random;
Expand All @@ -122,6 +122,8 @@ public final class RaftNetworkClient implements RPCSender {
private final int additionalReconnectIntervalRange;
private final TimeUnit timeUnit;

private volatile boolean running; // set during start/stop and accessed by netty-I/O and RaftNetworkClient caller threads

private ServerBootstrap server;
private ClientBootstrap client;
private Channel serverChannel;
Expand Down Expand Up @@ -187,13 +189,12 @@ public RaftNetworkClient(
* @param receiver instance of {@code RPCReceiver} that will be notified of incoming {@link RaftRPC} messages
* @throws IllegalStateException if this method is called multiple times
*/
public void initialize(
public synchronized void initialize(
ListeningExecutorService nonIoExecutorService,
ServerChannelFactory serverChannelFactory,
ChannelFactory clientChannelFactory,
RPCReceiver receiver) {
checkState(!running.get());

checkState(!running);
checkState(server == null);
checkState(client == null);

Expand Down Expand Up @@ -244,8 +245,8 @@ public ChannelPipeline getPipeline() throws Exception {
* <p/>
* Once a successful call to {@code start()} is made subsequent calls are noops.
*/
public void start() {
if (!running.compareAndSet(false, true)) {
public synchronized void start() {
if (running) {
return;
}

Expand All @@ -254,6 +255,12 @@ public void start() {
checkNotNull(server);
checkNotNull(client);

// IMPORTANT: set running _early, up here_
// this is because the bind() and connect()
// calls below will not succeed if their
// callbacks see "running" as false
running = true;

SocketAddress bindAddress = getResolvedBindAddress();
serverChannel = server.bind(bindAddress);

Expand All @@ -276,7 +283,7 @@ private SocketAddress getResolvedBindAddress() { // FIXME (AG): find a way to re
}

private void connect(final RaftMember server) {
if (!running.get()) {
if (!running) {
logNotRunning();
return;
}
Expand All @@ -299,7 +306,7 @@ public void operationComplete(ChannelFuture future) throws Exception {

server.setChannel(null);

if (!running.get()) {
if (!running) {
logNotRunning();
return;
}
Expand Down Expand Up @@ -352,7 +359,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
// basically, to get here we would have to schedule the connect
// just before stop() is called, after clusterChannelGroup.disconnect()
// is called, and just before clusterChannelGroup.add() is called...
if (!running.get()) {
if (!running) {
logNotRunning();
channel.close();
return;
Expand Down Expand Up @@ -393,10 +400,13 @@ private void logNotRunning() {
* <p/>
* Following a successful call to {@code stop()} subsequent calls are noops.
*/
public void stop() {
if (!running.compareAndSet(true, false)) {
return;
}
public synchronized void stop() {
// set running to 'false' early, so that new
// I/O operations and timeouts are rejected
// not checking the value of 'running'
// allows 'stop' to be called multiple times,
// AFAIK, this is not an issue and are noops
running = false;

LOGGER.info("{}: stopping network client", self.getId());

Expand All @@ -415,11 +425,11 @@ public void stop() {
}
}

private void write(String server, RaftRPC rpc) throws RPCException {
private void write(String server, RaftRPC rpc) throws RPCException { // safe to access simultaneously by multiple threads
RaftMember raftMember = cluster.get(server);
checkNotNull(raftMember);

if (!running.get()) {
if (!running) {
logNotRunning();
throw new RPCException("network client stopped");
}
Expand Down

0 comments on commit bdcfcb7

Please sign in to comment.