Skip to content

Commit

Permalink
[FLINK-6882] [runtime] Activate checkstyle for runtime/registration
Browse files Browse the repository at this point in the history
This closes #4099
  • Loading branch information
greghogan committed Jul 11, 2017
1 parent 4aa2ffc commit a68c15f
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 48 deletions.
1 change: 0 additions & 1 deletion flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,6 @@ under the License.
**/runtime/messages/**,
**/runtime/minicluster/**,
**/runtime/operators/**,
**/runtime/registration/**,
**/runtime/resourcemanager/**,
**/runtime/rpc/**,
**/runtime/state/**,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.flink.runtime.registration;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.RpcGateway;

import org.slf4j.Logger;

Expand All @@ -46,35 +46,30 @@
*/
public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> {

/** the logger for all log messages of this class */
/** The logger for all log messages of this class. */
protected final Logger log;

/** the target component leaderID, for example the ResourceManager leaderID */
/** The target component leaderID, for example the ResourceManager leaderID. */
private final UUID targetLeaderId;

/** the target component Address, for example the ResourceManager Address */
/** The target component Address, for example the ResourceManager Address. */
private final String targetAddress;

/** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */
/** Execution context to be used to execute the on complete action of the ResourceManagerRegistration. */
private final Executor executor;

/** the Registration of this RPC connection */
/** The Registration of this RPC connection. */
private RetryingRegistration<Gateway, Success> pendingRegistration;

/** the gateway to register, it's null until the registration is completed */
/** The gateway to register, it's null until the registration is completed. */
private volatile Gateway targetGateway;

/** flag indicating that the RPC connection is closed */
/** Flag indicating that the RPC connection is closed. */
private volatile boolean closed;

// ------------------------------------------------------------------------

public RegisteredRpcConnection(
Logger log,
String targetAddress,
UUID targetLeaderId,
Executor executor)
{
public RegisteredRpcConnection(Logger log, String targetAddress, UUID targetLeaderId, Executor executor) {
this.log = checkNotNull(log);
this.targetAddress = checkNotNull(targetAddress);
this.targetLeaderId = checkNotNull(targetLeaderId);
Expand Down Expand Up @@ -114,22 +109,22 @@ public Void apply(Throwable failure) {
}

/**
* This method generate a specific Registration, for example TaskExecutor Registration at the ResourceManager
* This method generate a specific Registration, for example TaskExecutor Registration at the ResourceManager.
*/
protected abstract RetryingRegistration<Gateway, Success> generateRegistration();

/**
* This method handle the Registration Response
* This method handle the Registration Response.
*/
protected abstract void onRegistrationSuccess(Success success);

/**
* This method handle the Registration failure
* This method handle the Registration failure.
*/
protected abstract void onRegistrationFailure(Throwable failure);

/**
* close connection
* Close connection.
*/
public void close() {
closed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ public abstract class RegistrationResponse implements Serializable {
private static final long serialVersionUID = 1L;

// ----------------------------------------------------------------------------

/**
* Base class for a successful registration. Concrete registration implementations
* will typically extend this class to attach more information.
*/
public static class Success extends RegistrationResponse {
private static final long serialVersionUID = 1L;

@Override
public String toString() {
return "Registration Successful";
Expand All @@ -50,12 +50,12 @@ public String toString() {
public static final class Decline extends RegistrationResponse {
private static final long serialVersionUID = 1L;

/** the rejection reason */
/** The rejection reason. */
private final String reason;

/**
* Creates a new rejection message.
*
*
* @param reason The reason for the rejection.
*/
public Decline(String reason) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
* for example registering the TaskExecutor at the ResourceManager.
* This {@code RetryingRegistration} implements both the initial address resolution
* and the retries-with-backoff strategy.
*
*
* <p>The registration gives access to a future that is completed upon successful registration.
* The registration can be canceled, for example when the target where it tries to register
* at looses leader status.
*
*
* @param <Gateway> The type of the gateway to connect to.
* @param <Success> The type of the successful registration responses.
*/
Expand All @@ -56,16 +56,16 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
// default configuration values
// ------------------------------------------------------------------------

/** default value for the initial registration timeout (milliseconds) */
/** Default value for the initial registration timeout (milliseconds). */
private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;

/** default value for the maximum registration timeout, after exponential back-off (milliseconds) */
/** Default value for the maximum registration timeout, after exponential back-off (milliseconds). */
private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000;

/** The pause (milliseconds) made after an registration attempt caused an exception (other than timeout) */
/** The pause (milliseconds) made after an registration attempt caused an exception (other than timeout). */
private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000;

/** The pause (milliseconds) made after the registration attempt was refused */
/** The pause (milliseconds) made after the registration attempt was refused. */
private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000;

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -113,7 +113,7 @@ public RetryingRegistration(
public RetryingRegistration(
Logger log,
RpcService rpcService,
String targetName,
String targetName,
Class<Gateway> targetType,
String targetAddress,
UUID leaderId,
Expand Down Expand Up @@ -180,7 +180,7 @@ public void startRegistration() {
try {
// trigger resolution of the resource manager address to a callable gateway
Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType);

// upon success, start the registration attempts
Future<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync(new AcceptFunction<Gateway>() {
@Override
Expand Down Expand Up @@ -223,7 +223,7 @@ private void register(final Gateway gateway, final int attempt, final long timeo
try {
log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis);
Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis);

// if the registration was successful, let the TaskExecutor know
Future<Void> registrationAcceptFuture = registrationFuture.thenAcceptAsync(new AcceptFunction<RegistrationResponse>() {
@Override
Expand All @@ -249,7 +249,7 @@ public void accept(RegistrationResponse result) {
}
}
}, rpcService.getExecutor());

// upon failure, retry
registrationAcceptFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.slf4j.LoggerFactory;

import java.util.UUID;
import java.util.concurrent.Executor;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -117,7 +118,7 @@ public void testRpcConnectionClose() throws Exception {
TestRegistrationGateway testGateway = new TestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess(connectionID));
TestingRpcService rpcService = new TestingRpcService();

try{
try {
rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);

TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService);
Expand Down Expand Up @@ -148,11 +149,7 @@ private static class TestRpcConnection extends RegisteredRpcConnection<TestRegis

private String failureMessage;

public TestRpcConnection(String targetAddress,
UUID targetLeaderId,
Executor executor,
RpcService rpcService)
{
public TestRpcConnection(String targetAddress, UUID targetLeaderId, Executor executor, RpcService rpcService) {
super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), targetAddress, targetLeaderId, executor);
this.rpcService = rpcService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import org.slf4j.LoggerFactory;

import java.util.UUID;
Expand All @@ -36,8 +35,17 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Tests for the generic retrying registration class, validating the failure, retry, and back-off behavior.
Expand Down Expand Up @@ -298,7 +306,7 @@ public void testCancellation() throws Exception {
// test registration
// ------------------------------------------------------------------------

protected static class TestRegistrationSuccess extends RegistrationResponse.Success {
static class TestRegistrationSuccess extends RegistrationResponse.Success {
private static final long serialVersionUID = 5542698790917150604L;

private final String correlationId;
Expand All @@ -312,7 +320,7 @@ public String getCorrelationId() {
}
}

protected static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {

// we use shorter timeouts here to speed up the tests
static final long INITIAL_TIMEOUT = 20;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.runtime.registration;

import akka.dispatch.Futures;

import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.rpc.TestingGatewayBase;
Expand All @@ -29,6 +27,9 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Mock gateway for {@link RegistrationResponse}.
*/
public class TestRegistrationGateway extends TestingGatewayBase {

private final BlockingQueue<RegistrationCall> invocations;
Expand All @@ -42,7 +43,6 @@ public TestRegistrationGateway(RegistrationResponse... responses) {

this.invocations = new LinkedBlockingQueue<>();
this.responses = responses;

}

// ------------------------------------------------------------------------
Expand All @@ -65,6 +65,9 @@ public BlockingQueue<RegistrationCall> getInvocations() {

// ------------------------------------------------------------------------

/**
* Invocation parameters.
*/
public static class RegistrationCall {
private final UUID leaderId;
private final long timeout;
Expand Down

0 comments on commit a68c15f

Please sign in to comment.