Skip to content

Commit

Permalink
[FLINK-6903] [runtime] Activate checkstyle for runtime/akka
Browse files Browse the repository at this point in the history
This closes #4114
  • Loading branch information
greghogan committed Jul 11, 2017
1 parent a68c15f commit b91df16
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 26 deletions.
1 change: 0 additions & 1 deletion flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ under the License.
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
<excludes>
**/runtime/akka/**,
**/runtime/blob/**,
**/runtime/checkpoint/**,
**/runtime/client/**,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.flink.runtime.akka;

import akka.actor.ActorSystem;
import akka.actor.Address;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.Preconditions;

import akka.actor.ActorSystem;
import akka.actor.Address;
import org.slf4j.Logger;

import scala.concurrent.duration.FiniteDuration;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.apache.flink.runtime.akka;

import akka.actor.UntypedActor;
import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;

import akka.actor.UntypedActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,35 +34,37 @@
* of type {@link RequiresLeaderSessionID} without being wrapped in a LeaderSessionMessage is
* detected, then an Exception is thrown.
*
* In order to implement the actor behavior, an implementing subclass has to override the method
* <p>In order to implement the actor behavior, an implementing subclass has to override the method
* handleMessage, which defines how messages are processed. Furthermore, the subclass has to provide
* a leader session ID option which is returned by getLeaderSessionID.
*/
public abstract class FlinkUntypedActor extends UntypedActor {

//CHECKSTYLE.OFF: MemberNameCheck - re-enable after JobManager/TaskManager refactoring in FLIP-6?
protected final Logger LOG = LoggerFactory.getLogger(getClass());
//CHECKSTYLE.ON: MemberNameCheck

/**
* This method is called by Akka if a new message has arrived for the actor. It logs the
* processing time of the incoming message if the logging level is set to debug. After logging
* the handleLeaderSessionID method is called.
*
* Important: This method cannot be overriden. The actor specific message handling logic is
* <p>Important: This method cannot be overriden. The actor specific message handling logic is
* implemented by the method handleMessage.
*
* @param message Incoming message
* @throws Exception
*/
@Override
public final void onReceive(Object message) throws Exception {
if(LOG.isTraceEnabled()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Received message {} at {} from {}.", message, getSelf().path(), getSender());

long start = System.nanoTime();

handleLeaderSessionID(message);

long duration = (System.nanoTime() - start)/ 1_000_000;
long duration = (System.nanoTime() - start) / 1_000_000;

LOG.trace("Handled message {} in {} ms from {}.", message, duration, getSender());
} else {
Expand Down Expand Up @@ -124,14 +127,14 @@ private void handleNoLeaderId(LeaderSessionMessage msg) {
* Returns the current leader session ID associcated with this actor.
* @return
*/
abstract protected UUID getLeaderSessionID();
protected abstract UUID getLeaderSessionID();

/**
* This method should be called for every outgoing message. It wraps messages which require
* a leader session ID (indicated by {@link RequiresLeaderSessionID}) in a
* {@link LeaderSessionMessage} with the actor's leader session ID.
*
* This method can be overriden to implement a different decoration behavior.
* <p>This method can be overriden to implement a different decoration behavior.
*
* @param message Message to be decorated
* @return The deocrated message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface QuarantineHandler {
* actor system
* @param actorSystem which has been quarantined
*/
void wasQuarantinedBy(final String remoteSystem, final ActorSystem actorSystem);
void wasQuarantinedBy(String remoteSystem, ActorSystem actorSystem);

/**
* Callback when the given actor system has quarantined the given remote actor system.
Expand All @@ -42,5 +42,5 @@ public interface QuarantineHandler {
* by our actor system
* @param actorSystem which has quarantined the other actor system
*/
void hasQuarantined(final String remoteSystem, final ActorSystem actorSystem);
void hasQuarantined(String remoteSystem, ActorSystem actorSystem);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

package org.apache.flink.runtime.akka;

import org.apache.flink.util.Preconditions;

import akka.actor.UntypedActor;
import akka.remote.AssociationErrorEvent;
import akka.remote.transport.Transport;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

import java.util.regex.Matcher;
Expand All @@ -33,7 +34,7 @@
* or quarantine another remote actor system. If the actor detects that the actor system has been
* quarantined or quarantined another system, then the {@link QuarantineHandler} is called.
*
* IMPORTANT: The implementation if highly specific for Akka 2.3.7. With different version the
* <p>IMPORTANT: The implementation if highly specific for Akka 2.3.7. With different version the
* quarantine state might be detected differently.
*/
public class QuarantineMonitor extends UntypedActor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@

package org.apache.flink.runtime.akka;

import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
import org.apache.flink.runtime.testingUtils.TestingUtils;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -36,6 +37,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

/**
* Tests for {@link FlinkUntypedActor}.
*/
public class FlinkUntypedActorTest {

private static ActorSystem actorSystem;
Expand Down Expand Up @@ -89,7 +93,7 @@ public void testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDM

TestActorRef<PlainFlinkUntypedActor> actor = null;

try{
try {
final Props props = Props.create(PlainFlinkUntypedActor.class, leaderSessionID);
actor = TestActorRef.create(actorSystem, props);

Expand All @@ -113,7 +117,7 @@ public void testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDM
}

private static void stopActor(ActorRef actor) {
if(actor != null) {
if (actor != null) {
actor.tell(Kill.getInstance(), ActorRef.noSender());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@

package org.apache.flink.runtime.akka;

import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
Expand All @@ -26,12 +33,6 @@
import akka.dispatch.OnComplete;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -40,12 +41,16 @@
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.FiniteDuration;

/**
* Tests for {@link QuarantineMonitor}.
*/
public class QuarantineMonitorTest extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(QuarantineMonitorTest.class);
Expand Down

0 comments on commit b91df16

Please sign in to comment.