Skip to content

Commit

Permalink
[FLINK-8797] Port AbstractOperatorRestoreTestBase to MiniClusterResource
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Mar 11, 2018
1 parent ccb78b0 commit 6732669
Showing 1 changed file with 81 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,40 @@
package org.apache.flink.test.state.operator.restore;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.net.URL;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import scala.Option;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

/**
* Abstract class to verify that it is possible to migrate a savepoint across upgraded Flink versions and that the
Expand All @@ -79,16 +64,21 @@
*/
public abstract class AbstractOperatorRestoreTestBase extends TestLogger {

private static final int NUM_TMS = 1;
private static final int NUM_SLOTS_PER_TM = 4;
private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L);

@Rule
public final TemporaryFolder tmpFolder = new TemporaryFolder();

private static ActorSystem actorSystem = null;
private static HighAvailabilityServices highAvailabilityServices = null;
private static ActorGateway jobManager = null;
private static ActorGateway archiver = null;
private static ActorGateway taskManager = null;
@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
new Configuration(),
NUM_TMS,
NUM_SLOTS_PER_TM),
true);

private static final FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
private final boolean allowNonRestoredState;

protected AbstractOperatorRestoreTestBase() {
Expand All @@ -104,178 +94,102 @@ public static void beforeClass() {
SavepointSerializers.setFailWhenLegacyStateDetected(false);
}

@BeforeClass
public static void setupCluster() throws Exception {
final Configuration configuration = new Configuration();

FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);

actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());

highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
configuration,
TestingUtils.defaultExecutor());

Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
configuration,
actorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
NoOpMetricRegistry.INSTANCE,
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
TestingJobManager.class,
TestingMemoryArchivist.class);

jobManager = LeaderRetrievalUtils.retrieveLeaderGateway(
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
actorSystem,
timeout);

archiver = new AkkaActorGateway(master._2(), jobManager.leaderSessionID());

Configuration tmConfig = new Configuration();
tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);

ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
tmConfig,
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("tm"),
true,
TestingTaskManager.class);

taskManager = new AkkaActorGateway(taskManagerRef, jobManager.leaderSessionID());

// Wait until connected
Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
Await.ready(taskManager.ask(msg, timeout), timeout);
}

@AfterClass
public static void tearDownCluster() throws Exception {
if (highAvailabilityServices != null) {
highAvailabilityServices.closeAndCleanupAllData();
}

if (actorSystem != null) {
actorSystem.shutdown();
}

if (archiver != null) {
archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
}

if (jobManager != null) {
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
}

if (taskManager != null) {
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
}
}

@Test
public void testMigrationAndRestore() throws Throwable {
ClassLoader classLoader = this.getClass().getClassLoader();
ClusterClient<?> clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
clusterClient.setDetached(true);
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);

// submit job with old version savepoint and create a migrated savepoint in the new version
String savepointPath = migrateJob();
String savepointPath = migrateJob(classLoader, clusterClient, deadline);
// restore from migrated new version savepoint
restoreJob(savepointPath);
restoreJob(classLoader, clusterClient, deadline, savepointPath);
}

private String migrateJob() throws Throwable {
private String migrateJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline) throws Throwable {

URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
if (savepointResource == null) {
throw new IllegalArgumentException("Savepoint file does not exist.");
}
JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));

Object msg;
Object result;

// Submit job graph
msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
result = Await.result(jobManager.ask(msg, timeout), timeout);
assertNotNull(jobToMigrate.getJobID());

if (result instanceof JobManagerMessages.JobResultFailure) {
JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
throw new Exception(failure.cause());
}
Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
clusterClient.submitJob(jobToMigrate, classLoader);

// Wait for all tasks to be running
msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
Await.result(jobManager.ask(msg, timeout), timeout);
CompletableFuture<JobStatus> jobRunningFuture = FutureUtils.retrySuccesfulWithDelay(
() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
Time.milliseconds(50),
deadline,
(jobStatus) -> jobStatus == JobStatus.RUNNING,
TestingUtils.defaultScheduledExecutor());
assertEquals(
JobStatus.RUNNING,
jobRunningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));

// Trigger savepoint
File targetDirectory = tmpFolder.newFolder();
msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
String savepointPath = null;

// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
boolean retry = true;
for (int i = 0; retry && i < 10; i++) {
Future<Object> future = jobManager.ask(msg, timeout);
result = Await.result(future, timeout);

if (result instanceof JobManagerMessages.CancellationFailure) {
Thread.sleep(50L);
} else {
retry = false;
while (deadline.hasTimeLeft() && savepointPath == null) {
try {
savepointPath = clusterClient.cancelWithSavepoint(
jobToMigrate.getJobID(),
targetDirectory.getAbsolutePath());
} catch (Exception e) {
String exceptionString = ExceptionUtils.stringifyException(e);
if (!(exceptionString.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*") // legacy
|| exceptionString.matches("(.*\n)*.*Not all required tasks are currently running(.*\n)*") // flip6
|| exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not ready\\)(.*\n)*"))) { // flip6
throw e;
}
}
}

if (result instanceof JobManagerMessages.CancellationFailure) {
JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
throw new Exception(failure.cause());
}

String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
assertNotNull("Could not take savepoint.", savepointPath);

// Wait until canceled
msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
Await.ready(jobManager.ask(msg, timeout), timeout);
CompletableFuture<JobStatus> jobCanceledFuture = FutureUtils.retrySuccesfulWithDelay(
() -> clusterClient.getJobStatus(jobToMigrate.getJobID()),
Time.milliseconds(50),
deadline,
(jobStatus) -> jobStatus == JobStatus.CANCELED,
TestingUtils.defaultScheduledExecutor());
assertEquals(
JobStatus.CANCELED,
jobCanceledFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));

return savepointPath;
}

private void restoreJob(String savepointPath) throws Exception {
private void restoreJob(ClassLoader classLoader, ClusterClient<?> clusterClient, Deadline deadline, String savepointPath) throws Exception {
JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState));

Object msg;
Object result;

// Submit job graph
msg = new JobManagerMessages.SubmitJob(jobToRestore, ListeningBehaviour.DETACHED);
result = Await.result(jobManager.ask(msg, timeout), timeout);
assertNotNull("Job doesn't have a JobID.", jobToRestore.getJobID());

if (result instanceof JobManagerMessages.JobResultFailure) {
JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
throw new Exception(failure.cause());
}
Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());

msg = new JobManagerMessages.RequestJobStatus(jobToRestore.getJobID());
JobStatus status = ((JobManagerMessages.CurrentJobStatus) Await.result(jobManager.ask(msg, timeout), timeout)).status();
while (!status.isTerminalState()) {
status = ((JobManagerMessages.CurrentJobStatus) Await.result(jobManager.ask(msg, timeout), timeout)).status();
}
clusterClient.submitJob(jobToRestore, classLoader);

Assert.assertEquals(JobStatus.FINISHED, status);
CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
() -> clusterClient.getJobStatus(jobToRestore.getJobID()),
Time.milliseconds(50),
deadline,
(jobStatus) -> jobStatus == JobStatus.FINISHED,
TestingUtils.defaultScheduledExecutor());
assertEquals(
JobStatus.FINISHED,
jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
}

private JobGraph createJobGraph(ExecutionMode mode) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(new MemoryStateBackend());
env.setStateBackend((StateBackend) new MemoryStateBackend());

switch (mode) {
case MIGRATE:
Expand Down

0 comments on commit 6732669

Please sign in to comment.