Skip to content

Commit

Permalink
[FLINK-21256] Add tests for Executing state
Browse files Browse the repository at this point in the history
  • Loading branch information
rmetzger committed Feb 10, 2021
1 parent 28585ac commit d0bcad6
Show file tree
Hide file tree
Showing 8 changed files with 744 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
Expand Down Expand Up @@ -124,7 +125,8 @@ public class ExecutionJobVertex

private InputSplitAssigner splitAssigner;

ExecutionJobVertex(
@VisibleForTesting
public ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public class ExecutionVertex
* @param maxPriorExecutionHistoryLength The number of prior Executions (= execution attempts)
* to keep.
*/
ExecutionVertex(
@VisibleForTesting
public ExecutionVertex(
ExecutionJobVertex jobVertex,
int subTaskIndex,
IntermediateResult[] producedDataSets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ private OperatorCoordinatorHolder(
this.operatorMaxParallelism = operatorMaxParallelism;
}

@VisibleForTesting
public void lazyInitialize(
Consumer<Throwable> globalFailureHandler,
ComponentMainThreadExecutor mainThreadExecutor) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.concurrent;

/** Testing ComponentMainThreadExecutor which can be manually triggered. */
public class ManuallyTriggeredComponentMainThreadExecutor
extends ManuallyTriggeredScheduledExecutorService implements ComponentMainThreadExecutor {

private final Thread executorThread;

public ManuallyTriggeredComponentMainThreadExecutor(Thread executor) {
executorThread = executor;
}

@Override
public void assertRunningInMainThread() {
assert Thread.currentThread() == executorThread;
}

@Override
public void trigger() {
assertRunningInMainThread();
super.trigger();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public void testFailExternallyDuringDeploy() {
}
}

private static class SubmitFailingSimpleAckingTaskManagerGateway
public static class SubmitFailingSimpleAckingTaskManagerGateway
extends SimpleAckingTaskManagerGateway {
@Override
public CompletableFuture<Acknowledge> submitTask(
Expand Down
Loading

0 comments on commit d0bcad6

Please sign in to comment.