Skip to content

Commit

Permalink
[FLINK-16960][runtime] Add PipelinedRegion interface
Browse files Browse the repository at this point in the history
This closes apache#11647.
  • Loading branch information
GJL committed Apr 14, 2020
1 parent 1f28494 commit b36c44e
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.topology.Result;
import org.apache.flink.runtime.topology.Topology;
import org.apache.flink.runtime.topology.BaseTopology;
import org.apache.flink.runtime.topology.Vertex;

import org.slf4j.Logger;
Expand Down Expand Up @@ -60,7 +60,7 @@ public static Set<PipelinedRegion> toPipelinedRegionsSet(
}

public static <V extends Vertex<?, ?, V, R>, R extends Result<?, ?, V, R>> Set<Set<V>> computePipelinedRegions(
final Topology<?, ?, V, R> topology) {
final BaseTopology<?, ?, V, R> topology) {

// currently we let a job with co-location constraints fail as one region
// putting co-located vertices in the same region with each other can be a future improvement
Expand Down Expand Up @@ -115,7 +115,7 @@ public static Set<PipelinedRegion> toPipelinedRegionsSet(
}

private static <V extends Vertex<?, ?, V, ?>> Map<V, Set<V>> buildOneRegionForAllVertices(
final Topology<?, ?, V, ?> topology) {
final BaseTopology<?, ?, V, ?> topology) {

LOG.warn("Cannot decompose the topology into individual failover regions due to use of " +
"Co-Location constraints (iterations). Job will fail over as one holistic unit.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.runtime.jobgraph.topology;

import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.topology.PipelinedRegion;

/**
* Pipelined region on logical level, i.e., {@link JobVertex} level.
*/
public interface LogicalPipelinedRegion<V extends LogicalVertex<V, R>, R extends LogicalResult<V, R>> extends PipelinedRegion<JobVertexID, IntermediateDataSetID, V, R> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@
* Represents a logical topology, i.e. {@link JobGraph}.
*/
public interface LogicalTopology<V extends LogicalVertex<V, R>, R extends LogicalResult<V, R>>
extends Topology<JobVertexID, IntermediateDataSetID, V, R> {
extends Topology<JobVertexID, IntermediateDataSetID, V, R, LogicalPipelinedRegion<V, R>> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.runtime.scheduler.strategy;

import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.topology.PipelinedRegion;

/**
* Pipelined region on execution level, i.e., {@link ExecutionGraph} level.
*/
public interface SchedulingPipelinedRegion<V extends SchedulingExecutionVertex<V, R>, R extends SchedulingResultPartition<V, R>> extends PipelinedRegion<ExecutionVertexID, IntermediateResultPartitionID, V, R> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* Topology of {@link SchedulingExecutionVertex}.
*/
public interface SchedulingTopology<V extends SchedulingExecutionVertex<V, R>, R extends SchedulingResultPartition<V, R>>
extends Topology<ExecutionVertexID, IntermediateResultPartitionID, V, R> {
extends Topology<ExecutionVertexID, IntermediateResultPartitionID, V, R, SchedulingPipelinedRegion<V, R>> {

/**
* Looks up the {@link SchedulingExecutionVertex} for the given {@link ExecutionVertexID}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.topology;

/**
* Base topology for all logical and execution topologies.
* A topology consists of {@link Vertex} and {@link Result}.
*/
public interface BaseTopology<VID extends VertexID, RID extends ResultID,
V extends Vertex<VID, RID, V, R>, R extends Result<VID, RID, V, R>> {

/**
* Returns an iterable over all vertices, topologically sorted.
*
* @return topologically sorted iterable over all vertices
*/
Iterable<V> getVertices();

/**
* Returns whether the topology contains co-location constraints.
* Co-location constraints are currently used for iterations.
*
* @return whether the topology contains co-location constraints
*/
boolean containsCoLocationConstraints();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.runtime.topology;

/**
* A pipelined region is a set of vertices connected via pipelined data
* exchanges.
*
* @param <VID> the type of the vertex ids
* @param <RID> the type of the result ids
* @param <V> the type of the vertices
* @param <R> the type of the result
*/
public interface PipelinedRegion<VID extends VertexID, RID extends ResultID,
V extends Vertex<VID, RID, V, R>, R extends Result<VID, RID, V, R>> {

/**
* Returns vertices that are in this pipelined region.
*
* @return Iterable over all vertices in this pipelined region
*/
Iterable<V> getVertices();

/**
* Returns the vertex with the specified vertex id.
*
* @param vertexId the vertex id used to look up the vertex
* @return the vertex with the specified id
* @throws IllegalArgumentException if there is no vertex in this pipelined
* region with the specified vertex id
*/
V getVertex(VID vertexId);

/**
* Returns the results that this pipelined region consumes.
*
* @return Iterable over all consumed results
*/
Iterable<R> getConsumedResults();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,32 @@
package org.apache.flink.runtime.topology;

/**
* Base topology for all logical and execution topologies.
* A topology consists of {@link Vertex} and {@link Result}.
* Extends the {@link BaseTopology} by pipelined regions.
*/
public interface Topology<VID extends VertexID, RID extends ResultID,
V extends Vertex<VID, RID, V, R>, R extends Result<VID, RID, V, R>> {
V extends Vertex<VID, RID, V, R>, R extends Result<VID, RID, V, R>,
PR extends PipelinedRegion<VID, RID, V, R>>
extends BaseTopology<VID, RID, V, R> {

/**
* Returns an iterable over all vertices, topologically sorted.
* Returns all pipelined regions in this topology.
*
* @return topologically sorted iterable over all vertices
* @return Iterable over pipelined regions in this topology
*/
Iterable<V> getVertices();
default Iterable<PR> getAllPipelinedRegions() {
throw new UnsupportedOperationException();
}

/**
* Returns whether the topology contains co-location constraints.
* Co-location constraints are currently used for iterations.
* The pipelined region for a specified vertex.
*
* @return whether the topology contains co-location constraints
* @param vertexId the vertex id identifying the vertex for which the
* pipelined region should be returned
* @return the pipelined region of the vertex
* @throws IllegalArgumentException if there is no vertex in this topology
* with the specified vertex id
*/
boolean containsCoLocationConstraints();
default PR getPipelinedRegionOfVertex(VID vertexId) {
throw new UnsupportedOperationException();
}
}

0 comments on commit b36c44e

Please sign in to comment.