Skip to content

Commit

Permalink
[FLINK-21525] Move scheduler benchmarks to Flink
Browse files Browse the repository at this point in the history
Fix the compilation errors due to FLINK-21401
  • Loading branch information
Thesharing authored and zhuzhurk committed Mar 8, 2021
1 parent d6444f3 commit c70000c
Show file tree
Hide file tree
Showing 21 changed files with 1,192 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/

package org.apache.flink.runtime.scheduler.benchmark;

import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobType;

/**
* {@link JobConfiguration} contains the configuration of a STREAMING/BATCH job. It concludes {@link
* DistributionPattern}, {@link ResultPartitionType}, {@link JobType}, {@link ExecutionMode}.
*/
public enum JobConfiguration {
STREAMING(
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED,
JobType.STREAMING,
ExecutionMode.PIPELINED,
4000),

BATCH(
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING,
JobType.BATCH,
ExecutionMode.BATCH,
4000),

STREAMING_TEST(
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED,
JobType.STREAMING,
ExecutionMode.PIPELINED,
10),

BATCH_TEST(
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.BLOCKING,
JobType.BATCH,
ExecutionMode.BATCH,
10);

private final int parallelism;
private final DistributionPattern distributionPattern;
private final ResultPartitionType resultPartitionType;
private final JobType jobType;
private final ExecutionMode executionMode;

JobConfiguration(
DistributionPattern distributionPattern,
ResultPartitionType resultPartitionType,
JobType jobType,
ExecutionMode executionMode,
int parallelism) {
this.distributionPattern = distributionPattern;
this.resultPartitionType = resultPartitionType;
this.jobType = jobType;
this.executionMode = executionMode;
this.parallelism = parallelism;
}

public int getParallelism() {
return parallelism;
}

public DistributionPattern getDistributionPattern() {
return distributionPattern;
}

public ResultPartitionType getResultPartitionType() {
return resultPartitionType;
}

public JobType getJobType() {
return jobType;
}

public ExecutionMode getExecutionMode() {
return executionMode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/

package org.apache.flink.runtime.scheduler.benchmark;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

/** Utilities for scheduler benchmarks. */
public class SchedulerBenchmarkUtils {

public static List<JobVertex> createDefaultJobVertices(JobConfiguration jobConfiguration) {

final List<JobVertex> jobVertices = new ArrayList<>();

final JobVertex source = new JobVertex("source");
source.setInvokableClass(NoOpInvokable.class);
source.setParallelism(jobConfiguration.getParallelism());
jobVertices.add(source);

final JobVertex sink = new JobVertex("sink");
sink.setInvokableClass(NoOpInvokable.class);
sink.setParallelism(jobConfiguration.getParallelism());
jobVertices.add(sink);

sink.connectNewDataSetAsInput(
source,
jobConfiguration.getDistributionPattern(),
jobConfiguration.getResultPartitionType());

return jobVertices;
}

public static JobGraph createJobGraph(JobConfiguration jobConfiguration) throws IOException {
return createJobGraph(Collections.emptyList(), jobConfiguration);
}

public static JobGraph createJobGraph(
List<JobVertex> jobVertices, JobConfiguration jobConfiguration) throws IOException {

final JobGraph jobGraph =
JobGraphTestUtils.streamingJobGraph(jobVertices.toArray(new JobVertex[0]));

jobGraph.setJobType(jobConfiguration.getJobType());

final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setExecutionMode(jobConfiguration.getExecutionMode());
jobGraph.setExecutionConfig(executionConfig);

return jobGraph;
}

public static ExecutionGraph createAndInitExecutionGraph(
List<JobVertex> jobVertices, JobConfiguration jobConfiguration) throws Exception {

final JobGraph jobGraph = createJobGraph(jobVertices, jobConfiguration);

final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forMainThread();

final DefaultScheduler scheduler =
SchedulerTestingUtils.createScheduler(jobGraph, mainThreadExecutor);

return scheduler.getExecutionGraph();
}

public static void deployTasks(
ExecutionGraph executionGraph,
JobVertexID jobVertexID,
TestingLogicalSlotBuilder slotBuilder,
boolean sendScheduleOrUpdateConsumersMessage)
throws JobException, ExecutionException, InterruptedException {

for (ExecutionVertex vertex : executionGraph.getJobVertex(jobVertexID).getTaskVertices()) {
LogicalSlot slot = slotBuilder.createTestingLogicalSlot();
Execution execution = vertex.getCurrentExecutionAttempt();
execution
.registerProducedPartitions(
slot.getTaskManagerLocation(), sendScheduleOrUpdateConsumersMessage)
.get();
assignResourceAndDeploy(vertex, slot);
}
}

public static void deployAllTasks(
ExecutionGraph executionGraph, TestingLogicalSlotBuilder slotBuilder)
throws JobException, ExecutionException, InterruptedException {

for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
LogicalSlot slot = slotBuilder.createTestingLogicalSlot();
vertex.getCurrentExecutionAttempt()
.registerProducedPartitions(slot.getTaskManagerLocation(), true)
.get();
assignResourceAndDeploy(vertex, slot);
}
}

private static void assignResourceAndDeploy(ExecutionVertex vertex, LogicalSlot slot)
throws JobException {
vertex.tryAssignResource(slot);
vertex.deploy();
}

public static void transitionTaskStatus(
ExecutionGraph executionGraph, JobVertexID jobVertexID, ExecutionState state) {

for (ExecutionVertex vertex : executionGraph.getJobVertex(jobVertexID).getTaskVertices()) {
executionGraph.updateState(
new TaskExecutionStateTransition(
new TaskExecutionState(
vertex.getCurrentExecutionAttempt().getAttemptId(), state)));
}
}

public static void transitionTaskStatus(
DefaultScheduler scheduler,
AccessExecutionJobVertex vertex,
int subtask,
ExecutionState executionState) {

final ExecutionAttemptID attemptId =
vertex.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, executionState));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/

package org.apache.flink.runtime.scheduler.benchmark.deploying;

import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;

/**
* The benchmark of deploying downstream tasks in a BATCH job. The related method is {@link
* Execution#deploy}.
*/
public class DeployingDownstreamTasksInBatchJobBenchmark extends DeployingTasksBenchmarkBase {

private ExecutionVertex[] vertices;

public void setup(JobConfiguration jobConfiguration) throws Exception {
createAndSetupExecutionGraph(jobConfiguration);

final JobVertex source = jobVertices.get(0);

for (ExecutionVertex ev : executionGraph.getJobVertex(source.getID()).getTaskVertices()) {
Execution execution = ev.getCurrentExecutionAttempt();
execution.deploy();
}

final JobVertex sink = jobVertices.get(1);

vertices = executionGraph.getJobVertex(sink.getID()).getTaskVertices();
}

public void deployDownstreamTasks() throws Exception {
for (ExecutionVertex ev : vertices) {
Execution execution = ev.getCurrentExecutionAttempt();
execution.deploy();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/

package org.apache.flink.runtime.scheduler.benchmark.deploying;

import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

/**
* The benchmark of deploying downstream tasks in a BATCH job. The related method is {@link
* Execution#deploy}.
*/
public class DeployingDownstreamTasksInBatchJobBenchmarkTest extends TestLogger {

@Test
public void deployDownstreamTasks() throws Exception {
DeployingDownstreamTasksInBatchJobBenchmark benchmark =
new DeployingDownstreamTasksInBatchJobBenchmark();
benchmark.setup(JobConfiguration.BATCH_TEST);
benchmark.deployDownstreamTasks();
}
}
Loading

0 comments on commit c70000c

Please sign in to comment.