Skip to content

Commit

Permalink
[FLINK-11465][tests] Add FlinkResource
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Nov 15, 2019
1 parent 8fb5f1b commit 751184d
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.flink;

import org.apache.flink.util.AutoCloseableAsync;

import java.io.IOException;

/**
* Controller for interacting with a cluster.
*/
public interface ClusterController extends AutoCloseableAsync {

/**
* Submits the given job to the cluster.
*
* @param job job to submit
* @return JobController for the submitted job
* @throws IOException
*/
JobController submitJob(JobSubmission job) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.tests.util.util.FactoryUtils;
import org.apache.flink.util.ExternalResource;

import java.io.IOException;

/**
* Generic interface for interacting with Flink.
*/
public interface FlinkResource extends ExternalResource {

/**
* Adds the given configuration to the existing configuration of this resource. Entries in the existing configuration
* will be overwritten.
*
* @param config config to add
* @throws IOException
*/
void addConfiguration(Configuration config) throws IOException;

/**
* Starts a cluster.
*
* <p>The exact constellation of the cluster is undefined.
*
* <p>In the case of per-job clusters this method may not start any Flink processes, deferring this to
* {@link ClusterController#submitJob(JobSubmission)}.
*
* @return controller for interacting with the cluster
* @throws IOException
* @param numTaskManagers number of task managers
*/
ClusterController startCluster(int numTaskManagers) throws IOException;

/**
* Returns the configured FlinkResource implementation, or a {@link LocalStandaloneFlinkResource} if none is configured.
*
* @return configured FlinkResource, or {@link LocalStandaloneFlinkResource} is none is configured
*/
static FlinkResource get() {
return FactoryUtils.loadAndInvokeFactory(
FlinkResourceFactory.class,
FlinkResourceFactory::create,
LocalStandaloneFlinkResourceFactory::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.flink;

import java.util.Optional;

/**
* A factory for {@link FlinkResource} implementations.
*/
@FunctionalInterface
public interface FlinkResourceFactory {

/**
* Returns a {@link FlinkResource} instance. If the instance could not be instantiated (for example, because a
* mandatory parameter was missing), then an empty {@link Optional} should be returned.
*
* @return FlinkResource instance, or an empty Optional if the instance could not be instantiated
*/
Optional<FlinkResource> create();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.flink;

/**
* Controller for interacting with a job.
*/
public interface JobController {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.flink;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.queryablestate.FutureUtils;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersInfo;
import org.apache.flink.tests.util.FlinkDistribution;
import org.apache.flink.util.ConfigurationException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Flink resource that start local standalone clusters.
*/
public class LocalStandaloneFlinkResource implements FlinkResource {

private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneFlinkResource.class);

private final FlinkDistribution distribution = new FlinkDistribution();

@Override
public void before() throws Exception {
distribution.before();
}

@Override
public void afterTestSuccess() {
distribution.afterTestSuccess();
}

@Override
public void afterTestFailure() {
distribution.afterTestFailure();
}

@Override
public void addConfiguration(final Configuration config) throws IOException {
distribution.appendConfiguration(config);
}

@Override
public ClusterController startCluster(int numTaskManagers) throws IOException {
distribution.startJobManager();
for (int x = 0; x < numTaskManagers; x++) {
distribution.startTaskManager();
}

try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), Executors.directExecutor())) {
for (int retryAttempt = 0; retryAttempt < 30; retryAttempt++) {
final CompletableFuture<TaskManagersInfo> localhost = restClient.sendRequest(
"localhost",
8081,
TaskManagersHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance());

try {
final TaskManagersInfo taskManagersInfo = localhost.get(1, TimeUnit.SECONDS);

final int numRunningTaskManagers = taskManagersInfo.getTaskManagerInfos().size();
if (numRunningTaskManagers == numTaskManagers) {
return new StandaloneClusterController(distribution);
} else {
LOG.info("Waiting for task managers to come up. {}/{} are currently running.", numRunningTaskManagers, numTaskManagers);
}
} catch (InterruptedException e) {
LOG.info("Waiting for dispatcher REST endpoint to come up...");
Thread.currentThread().interrupt();
} catch (TimeoutException | ExecutionException e) {
// ExecutionExceptions may occur if leader election is still going on
LOG.info("Waiting for dispatcher REST endpoint to come up...");
}

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (ConfigurationException e) {
throw new RuntimeException("Could not create RestClient.", e);
} catch (Exception e) {
throw new RuntimeException(e);
}

throw new RuntimeException("Cluster did not start in expected time-frame.");
}

private static class StandaloneClusterController implements ClusterController {

private final FlinkDistribution distribution;

StandaloneClusterController(FlinkDistribution distribution) {
this.distribution = distribution;
}

@Override
public JobController submitJob(JobSubmission job) throws IOException {
final JobID run = distribution.submitJob(job);

return new StandaloneJobController(run);
}

@Override
public CompletableFuture<Void> closeAsync() {
try {
distribution.stopFlinkCluster();
return CompletableFuture.completedFuture(null);
} catch (IOException e) {
return FutureUtils.getFailedFuture(e);
}
}
}

private static class StandaloneJobController implements JobController {
private final JobID jobId;

StandaloneJobController(JobID jobId) {
this.jobId = jobId;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.flink;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;

/**
* A {@link FlinkResourceFactory} for the {@link LocalStandaloneFlinkResource}.
*/
public final class LocalStandaloneFlinkResourceFactory implements FlinkResourceFactory {
private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneFlinkResourceFactory.class);

@Override
public Optional<FlinkResource> create() {
LOG.info("Created {}.", LocalStandaloneFlinkResource.class.getSimpleName());
return Optional.of(new LocalStandaloneFlinkResource());
}
}

0 comments on commit 751184d

Please sign in to comment.