Skip to content

Commit

Permalink
[refactor] [tests] Generalize test handler generation
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Oct 11, 2017
1 parent c4430e6 commit 90eb902
Showing 1 changed file with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
Expand Down Expand Up @@ -128,15 +132,11 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
}
}

private static class TestBlobServerPortHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
private static class TestBlobServerPortHandler extends TestHandler<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
private volatile boolean portRetrieved = false;

private TestBlobServerPortHandler() {
super(
CompletableFuture.completedFuture(restAddress),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
BlobServerPortHeaders.getInstance());
super(BlobServerPortHeaders.getInstance());
}

@Override
Expand All @@ -146,15 +146,11 @@ protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull H
}
}

private static class TestJobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
private static class TestJobSubmitHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
private volatile boolean jobSubmitted = false;

private TestJobSubmitHandler() {
super(
CompletableFuture.completedFuture(restAddress),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
JobSubmitHeaders.getInstance());
super(JobSubmitHeaders.getInstance());
}

@Override
Expand All @@ -164,16 +160,12 @@ protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull Handle
}
}

private static class TestJobTerminationHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, EmptyResponseBody, JobTerminationMessageParameters> {
private static class TestJobTerminationHandler extends TestHandler<EmptyRequestBody, EmptyResponseBody, JobTerminationMessageParameters> {
private volatile boolean jobCanceled = false;
private volatile boolean jobStopped = false;

private TestJobTerminationHandler() {
super(
CompletableFuture.completedFuture(restAddress),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
JobTerminationHeaders.getInstance());
super(JobTerminationHeaders.getInstance());
}

@Override
Expand All @@ -189,4 +181,15 @@ protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerReq
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
}
}

private abstract static class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> {

private TestHandler(MessageHeaders<R, P, M> headers) {
super(
CompletableFuture.completedFuture(restAddress),
mockGatewayRetriever,
RpcUtils.INF_TIMEOUT,
headers);
}
}
}

0 comments on commit 90eb902

Please sign in to comment.