Skip to content

Commit

Permalink
[hotfix] Use simulated self-type idiom in TestingRestful/DispatcherGa…
Browse files Browse the repository at this point in the history
…teway.Builder
  • Loading branch information
tillrohrmann committed Oct 11, 2019
1 parent 394d930 commit c2f6866
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void testLeaderRetrievalGateway() throws Exception {
final Time timeout = Time.seconds(10L);
final CompletableFuture<RestfulGateway> gatewayFuture = new CompletableFuture<>();
final GatewayRetriever<RestfulGateway> gatewayRetriever = () -> gatewayFuture;
final RestfulGateway gateway = TestingRestfulGateway.newBuilder().build();
final RestfulGateway gateway = new TestingRestfulGateway.Builder().build();

final TestingHandler testingHandler = new TestingHandler(
gatewayRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class JarDeleteHandlerTest extends TestLogger {
@Before
public void setUp() throws Exception {
jarDir = temporaryFolder.newFolder().toPath();
restfulGateway = TestingRestfulGateway.newBuilder().build();
restfulGateway = new TestingRestfulGateway.Builder().build();
jarDeleteHandler = new JarDeleteHandler(
() -> CompletableFuture.completedFuture(restfulGateway),
Time.seconds(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testConnectFailure() throws Exception {
RestServerEndpointConfiguration restServerConfig = RestServerEndpointConfiguration.fromConfiguration(serverConfig);
RestClientConfiguration restClientConfig = RestClientConfiguration.fromConfiguration(clientConfig);

RestfulGateway restfulGateway = TestingRestfulGateway.newBuilder().build();
RestfulGateway restfulGateway = new TestingRestfulGateway.Builder().build();
RestServerEndpointITCase.TestVersionHandler testVersionHandler = new RestServerEndpointITCase.TestVersionHandler(
() -> CompletableFuture.completedFuture(restfulGateway),
RpcUtils.INF_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testFileCleanup() throws Exception {
final Path file = dir.resolve("file");
Files.createFile(file);

RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder()
RestfulGateway mockRestfulGateway = new TestingRestfulGateway.Builder()
.build();

final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class JobExecutionResultHandlerTest extends TestLogger {

@Before
public void setUp() throws Exception {
final TestingRestfulGateway testingRestfulGateway = TestingRestfulGateway.newBuilder().build();
final TestingRestfulGateway testingRestfulGateway = new TestingRestfulGateway.Builder().build();

jobExecutionResultHandler = new JobExecutionResultHandler(
() -> CompletableFuture.completedFuture(testingRestfulGateway),
Expand All @@ -81,7 +81,7 @@ public void setUp() throws Exception {

@Test
public void testResultInProgress() throws Exception {
final TestingRestfulGateway testingRestfulGateway = TestingRestfulGateway.newBuilder()
final TestingRestfulGateway testingRestfulGateway = new TestingRestfulGateway.Builder()
.setRequestJobStatusFunction(
jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING))
.build();
Expand All @@ -103,7 +103,7 @@ public void testCompletedResult() throws Exception {
.setState(jobStatus)
.build();

final TestingRestfulGateway testingRestfulGateway = TestingRestfulGateway.newBuilder()
final TestingRestfulGateway testingRestfulGateway = new TestingRestfulGateway.Builder()
.setRequestJobStatusFunction(
jobId -> {
assertThat(jobId, equalTo(TEST_JOB_ID));
Expand All @@ -129,7 +129,7 @@ public void testCompletedResult() throws Exception {

@Test
public void testPropagateFlinkJobNotFoundExceptionAsRestHandlerException() throws Exception {
final TestingRestfulGateway testingRestfulGateway = TestingRestfulGateway.newBuilder()
final TestingRestfulGateway testingRestfulGateway = new TestingRestfulGateway.Builder()
.setRequestJobStatusFunction(
jobId -> FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class JobVertexBackPressureHandlerTest {

@Before
public void setUp() {
restfulGateway = TestingRestfulGateway.newBuilder().setRequestOperatorBackPressureStatsFunction(
restfulGateway = new TestingRestfulGateway.Builder().setRequestOperatorBackPressureStatsFunction(
(jobId, jobVertexId) -> {
if (jobId.equals(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE)) {
return CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of(new OperatorBackPressureStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void testCacheEntryCleanup() throws Exception {
final ArchivedExecutionGraph expectedExecutionGraph2 = new ArchivedExecutionGraphBuilder().build();

final AtomicInteger requestJobCalls = new AtomicInteger(0);
final TestingRestfulGateway restfulGateway = TestingRestfulGateway.newBuilder()
final TestingRestfulGateway restfulGateway = new TestingRestfulGateway.Builder()
.setRequestJobFunction(
jobId -> {
requestJobCalls.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTim
/**
* Builder for the {@link TestingDispatcherGateway}.
*/
public static final class Builder extends TestingRestfulGateway.Builder {
public static final class Builder extends TestingRestfulGateway.AbstractBuilder<Builder> {

private Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction;
private Supplier<CompletableFuture<Collection<JobID>>> listFunction;
Expand All @@ -159,11 +159,16 @@ public Builder setRequestArchivedJobFunction(Function<JobID, CompletableFuture<A
}

@Override
public TestingRestfulGateway.Builder setRequestJobFunction(Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction) {
public Builder setRequestJobFunction(Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction) {
// signature clash
throw new UnsupportedOperationException("Use setRequestArchivedJobFunction() instead.");
}

@Override
protected Builder self() {
return this;
}

public Builder setBlobServerPort(int blobServerPort) {
this.blobServerPort = blobServerPort;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,12 @@ public String getHostname() {
return hostname;
}

public static Builder newBuilder() {
return new Builder();
}

/**
* Builder for the {@link TestingRestfulGateway}.
* Abstract builder class for {@link TestingRestfulGateway} and its subclasses.
*
* @param <T> type of sub class
*/
public static class Builder {
protected abstract static class AbstractBuilder<T extends AbstractBuilder> {
protected String address = LOCALHOST;
protected String hostname = LOCALHOST;
protected Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction;
Expand All @@ -220,7 +218,7 @@ public static class Builder {
protected BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction;
protected BiFunction<JobID, String, CompletableFuture<String>> stopWithSavepointFunction;

public Builder() {
protected AbstractBuilder() {
cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION;
requestJobFunction = DEFAULT_REQUEST_JOB_FUNCTION;
requestJobResultFunction = DEFAULT_REQUEST_JOB_RESULT_FUNCTION;
Expand All @@ -234,72 +232,88 @@ public Builder() {
stopWithSavepointFunction = DEFAULT_STOP_WITH_SAVEPOINT_FUNCTION;
}

public Builder setAddress(String address) {
public T setAddress(String address) {
this.address = address;
return this;
return self();
}

public Builder setHostname(String hostname) {
public T setHostname(String hostname) {
this.hostname = hostname;
return this;
return self();
}

public Builder setRequestJobFunction(Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction) {
public T setRequestJobFunction(Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction) {
this.requestJobFunction = requestJobFunction;
return this;
return self();
}

public Builder setRequestJobResultFunction(Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction) {
public T setRequestJobResultFunction(Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction) {
this.requestJobResultFunction = requestJobResultFunction;
return this;
return self();
}

public Builder setRequestJobStatusFunction(Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction) {
public T setRequestJobStatusFunction(Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction) {
this.requestJobStatusFunction = requestJobStatusFunction;
return this;
return self();
}

public Builder setRequestMultipleJobDetailsSupplier(Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier) {
public T setRequestMultipleJobDetailsSupplier(Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier) {
this.requestMultipleJobDetailsSupplier = requestMultipleJobDetailsSupplier;
return this;
return self();
}

public Builder setRequestClusterOverviewSupplier(Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier) {
public T setRequestClusterOverviewSupplier(Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier) {
this.requestClusterOverviewSupplier = requestClusterOverviewSupplier;
return this;
return self();
}

public Builder setRequestMetricQueryServiceGatewaysSupplier(Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServiceGatewaysSupplier) {
public T setRequestMetricQueryServiceGatewaysSupplier(Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServiceGatewaysSupplier) {
this.requestMetricQueryServiceGatewaysSupplier = requestMetricQueryServiceGatewaysSupplier;
return this;
return self();
}

public Builder setRequestTaskManagerMetricQueryServiceGatewaysSupplier(Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServiceGatewaysSupplier) {
public T setRequestTaskManagerMetricQueryServiceGatewaysSupplier(Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServiceGatewaysSupplier) {
this.requestTaskManagerMetricQueryServiceGatewaysSupplier = requestTaskManagerMetricQueryServiceGatewaysSupplier;
return this;
return self();
}

public Builder setRequestOperatorBackPressureStatsFunction(BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOeratorBackPressureStatsFunction) {
public T setRequestOperatorBackPressureStatsFunction(BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOeratorBackPressureStatsFunction) {
this.requestOperatorBackPressureStatsFunction = requestOeratorBackPressureStatsFunction;
return this;
return self();
}

public Builder setCancelJobFunction(Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction) {
public T setCancelJobFunction(Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction) {
this.cancelJobFunction = cancelJobFunction;
return this;
return self();
}

public Builder setTriggerSavepointFunction(BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction) {
public T setTriggerSavepointFunction(BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction) {
this.triggerSavepointFunction = triggerSavepointFunction;
return this;
return self();
}

public Builder setStopWithSavepointFunction(BiFunction<JobID, String, CompletableFuture<String>> stopWithSavepointFunction) {
public T setStopWithSavepointFunction(BiFunction<JobID, String, CompletableFuture<String>> stopWithSavepointFunction) {
this.stopWithSavepointFunction = stopWithSavepointFunction;
return self();
}

protected abstract T self();

public abstract TestingRestfulGateway build();
}

/**
* Builder for the {@link TestingRestfulGateway}.
*/
public static class Builder extends AbstractBuilder<Builder> {

@Override
protected Builder self() {
return this;
}

public TestingRestfulGateway build() {
@Override
public TestingRestfulGateway build() {
return new TestingRestfulGateway(
address,
hostname,
Expand Down

0 comments on commit c2f6866

Please sign in to comment.