Skip to content

Commit

Permalink
[hotfix] Delete pointless test
Browse files Browse the repository at this point in the history
Blocking edges aren't supported by the AdaptiveScheduler in the first place, so there's no point in testing what happens when a savepoint is triggered for a job with blocking edges.
This wasn't caught earlier because the test wasn't very good to start with.
  • Loading branch information
zentol committed Apr 19, 2024
1 parent a312a3b commit 10c84df
Showing 1 changed file with 0 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.NoRestartBackoffTimeStrategy;
Expand All @@ -64,7 +62,6 @@
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
Expand All @@ -75,7 +72,6 @@
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
Expand Down Expand Up @@ -149,7 +145,6 @@
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexResource;
import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph;
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.createSlotOffersForResourceRequirements;
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
Expand Down Expand Up @@ -1783,56 +1778,6 @@ void testStopWithSavepointFailsInIllegalState() throws Exception {
.withCauseInstanceOf(CheckpointException.class);
}

@Test
void testSavepointFailsWhenBlockingEdgeExists() throws Exception {
JobVertex jobVertex = createNoOpVertex(PARALLELISM);
jobVertex.getOrCreateResultDataSet(
new IntermediateDataSetID(), ResultPartitionType.BLOCKING);

final ExecutionGraph executionGraph =
ExecutionGraphTestUtils.createExecutionGraph(
EXECUTOR_RESOURCE.getExecutor(), jobVertex);

executionGraph
.getAllExecutionVertices()
.forEach(
task ->
setVertexResource(
task,
new TestingLogicalSlotBuilder()
.createTestingLogicalSlot()));
executionGraph.transitionToRunning();
executionGraph
.getAllExecutionVertices()
.forEach(
task ->
task.getCurrentExecutionAttempt()
.transitionState(ExecutionState.RUNNING));

final AdaptiveScheduler scheduler =
new AdaptiveSchedulerBuilder(
streamingJobGraph(jobVertex),
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
.build();

scheduler.goToExecuting(executionGraph, null, null, Collections.emptyList());

assertThatFuture(
scheduler.stopWithSavepoint(
"some directory", false, SavepointFormatType.CANONICAL))
.eventuallyFailsWith(ExecutionException.class)
.withCauseInstanceOf(CheckpointException.class)
.withMessageContaining(CheckpointFailureReason.BLOCKING_OUTPUT_EXIST.message());

assertThatFuture(
scheduler.triggerSavepoint(
"some directory", false, SavepointFormatType.CANONICAL))
.eventuallyFailsWith(ExecutionException.class)
.withCauseInstanceOf(CheckpointException.class)
.withMessageContaining(CheckpointFailureReason.BLOCKING_OUTPUT_EXIST.message());
}

@Test
void testDeliverOperatorEventToCoordinatorFailsInIllegalState() throws Exception {
final AdaptiveScheduler scheduler =
Expand Down

0 comments on commit 10c84df

Please sign in to comment.