Skip to content

Commit

Permalink
[FLINK-17760][runtime, tests] Rework ArchivedExecutionGraphTest and u…
Browse files Browse the repository at this point in the history
…pdates TaskManagerLocation to enable comparing LocalTaskManagerLocation
  • Loading branch information
zhuzhurk committed Dec 17, 2020
1 parent b41296e commit 5aa80d5
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public boolean equals(Object obj) {
if (obj == this) {
return true;
}
else if (obj != null && obj.getClass() == TaskManagerLocation.class) {
else if (obj != null && obj.getClass() == getClass()) {
TaskManagerLocation that = (TaskManagerLocation) obj;
return this.resourceID.equals(that.resourceID) &&
this.inetAddress.equals(that.inetAddress) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,19 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
Expand All @@ -48,19 +45,16 @@
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

/**
* Tests for the {@link ArchivedExecutionGraph}.
Expand Down Expand Up @@ -98,23 +92,6 @@ public static void setupExecutionGraph() throws Exception {

jobGraph.setExecutionConfig(config);

runtimeGraph = TestingExecutionGraphBuilder
.newBuilder()
.setJobGraph(jobGraph)
.build();

runtimeGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());

List<ExecutionJobVertex> jobVertices = new ArrayList<>();
jobVertices.add(runtimeGraph.getJobVertex(v1ID));
jobVertices.add(runtimeGraph.getJobVertex(v2ID));

CheckpointStatsTracker statsTracker = new CheckpointStatsTracker(
0,
jobVertices,
mock(CheckpointCoordinatorConfiguration.class),
new UnregisteredMetricsGroup());

CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(
100,
100,
Expand All @@ -125,21 +102,21 @@ public static void setupExecutionGraph() throws Exception {
false,
false,
0);

runtimeGraph.enableCheckpointing(
JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
Arrays.asList(v1ID, v2ID),
Arrays.asList(v1ID, v2ID),
Arrays.asList(v1ID, v2ID),
chkConfig,
Collections.<ExecutionJobVertex>emptyList(),
Collections.<ExecutionJobVertex>emptyList(),
Collections.<ExecutionJobVertex>emptyList(),
Collections.<MasterTriggerRestoreHook<?>>emptyList(),
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
new MemoryStateBackend(),
statsTracker);

runtimeGraph.setJsonPlan("{}");

runtimeGraph.getJobVertex(v2ID).getTaskVertices()[0].getCurrentExecutionAttempt().fail(new RuntimeException("This exception was thrown on purpose."));
null);
jobGraph.setSnapshotSettings(checkpointingSettings);

SchedulerBase scheduler = SchedulerTestingUtils.createScheduler(jobGraph);
scheduler.initialize(ComponentMainThreadExecutorServiceAdapter.forMainThread());

runtimeGraph = scheduler.getExecutionGraph();

scheduler.startScheduling();
scheduler.handleGlobalFailure(new RuntimeException("This exception was thrown on purpose."));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public static DefaultSchedulerBuilder newSchedulerBuilderWithDefaultSlotAllocato
createDefaultExecutionSlotAllocatorFactory(jobGraph.getScheduleMode(), slotProvider, slotRequestTimeout));
}

public static DefaultScheduler createScheduler(final JobGraph jobGraph) throws Exception {
return newSchedulerBuilder(jobGraph).build();
}

public static DefaultScheduler createScheduler(
final JobGraph jobGraph,
final SlotProvider slotProvider) throws Exception {
Expand Down

0 comments on commit 5aa80d5

Please sign in to comment.