Skip to content

Commit

Permalink
Merge branch 'master' into BEAM-8379
Browse files Browse the repository at this point in the history
  • Loading branch information
Ning Kang committed Nov 12, 2019
2 parents 3ce3bda + b834e53 commit af08a2b
Show file tree
Hide file tree
Showing 52 changed files with 1,928 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1905,6 +1905,7 @@ class BeamModulePlugin implements Plugin<Project> {
mustRunAfter = [
':runners:flink:1.9:job-server-container:docker',
':runners:flink:1.9:job-server:shadowJar',
':runners:spark:job-server:shadowJar',
':sdks:python:container:py2:docker',
':sdks:python:container:py35:docker',
':sdks:python:container:py36:docker',
Expand Down Expand Up @@ -1958,6 +1959,7 @@ class BeamModulePlugin implements Plugin<Project> {
addPortableWordCountTask(true, "PortableRunner")
addPortableWordCountTask(false, "FlinkRunner")
addPortableWordCountTask(true, "FlinkRunner")
addPortableWordCountTask(false, "SparkRunner")
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions model/job-management/src/main/proto/beam_job_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,40 @@ message JobMessagesResponse {
// without needing to pass through STARTING.
message JobState {
enum Enum {
// The job state reported by a runner cannot be interpreted by the SDK.
UNSPECIFIED = 0;

// The job has not yet started.
STOPPED = 1;

// The job is currently running.
RUNNING = 2;

// The job has successfully completed. (terminal)
DONE = 3;

// The job has failed. (terminal)
FAILED = 4;

// The job has been explicitly cancelled. (terminal)
CANCELLED = 5;

// The job has been updated. (terminal)
UPDATED = 6;

// The job is draining its data. (optional)
DRAINING = 7;

// The job has completed draining its data. (terminal)
DRAINED = 8;

// The job is starting up.
STARTING = 9;

// The job is cancelling. (optional)
CANCELLING = 10;

// The job is in the process of being updated. (optional)
UPDATING = 11;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,21 @@ class GreedyPCollectionFusers {
.put(
PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
GreedyPCollectionFusers::canFuseParDo)
.put(
PTransformTranslation.SPLITTABLE_SPLIT_RESTRICTION_URN,
GreedyPCollectionFusers::canFuseParDo)
.put(
PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN,
GreedyPCollectionFusers::cannotFuse)
.put(
PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN,
GreedyPCollectionFusers::cannotFuse)
.put(
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
GreedyPCollectionFusers::canFuseParDo)
.put(
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN,
GreedyPCollectionFusers::canFuseParDo)
GreedyPCollectionFusers::cannotFuse)
.put(
PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
GreedyPCollectionFusers::canFuseCompatibleEnvironment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.ComponentsOrBuilder;
import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
Expand All @@ -32,10 +33,9 @@
/**
* A way to apply a Proto-based {@link PTransformOverride}.
*
* <p>This should generally be used to replace runner-executed transforms with runner-executed
* composites and simpler runner-executed primitives. It is generically less powerful than the
* native {@link org.apache.beam.sdk.Pipeline#replaceAll(List)} and more error-prone, so should only
* be used for relatively simple replacements.
* <p>This should generally be used by runners to replace transforms within graphs. SDK construction
* code should rely on the more powerful and native {@link
* org.apache.beam.sdk.Pipeline#replaceAll(List)}.
*/
@Experimental
public class ProtoOverrides {
Expand All @@ -51,6 +51,10 @@ public static Pipeline updateTransform(
if (pt.getValue().getSpec() != null && urn.equals(pt.getValue().getSpec().getUrn())) {
MessageWithComponents updated =
compositeBuilder.getReplacement(pt.getKey(), originalPipeline.getComponents());
if (updated == null) {
continue;
}

checkArgument(
updated.getPtransform().getOutputsMap().equals(pt.getValue().getOutputsMap()),
"A %s must produce all of the outputs of the original %s",
Expand All @@ -66,8 +70,8 @@ public static Pipeline updateTransform(
}

/**
* Remove all subtransforms of the provided transform recursively.A {@link PTransform} can be the
* subtransform of only one enclosing transform.
* Remove all sub-transforms of the provided transform recursively. A {@link PTransform} can be
* the sub-transform of only one enclosing transform.
*/
private static void removeSubtransforms(PTransform pt, Components.Builder target) {
for (String subtransformId : pt.getSubtransformsList()) {
Expand All @@ -87,14 +91,16 @@ public interface TransformReplacement {
/**
* Returns the updated composite structure for the provided {@link PTransform}.
*
* <p>The returned {@link MessageWithComponents} must contain a single {@link PTransform}. The
* result {@link Components} will be merged into the existing components, and the result {@link
* PTransform} will be set as a replacement of the original {@link PTransform}. Notably, this
* does not require that the {@code existingComponents} are present in the returned {@link
* <p>If the return is null, then no replacement is performed, otherwise the returned {@link
* MessageWithComponents} must contain a single {@link PTransform}. The result {@link
* Components} will be merged into the existing components, and the result {@link PTransform}
* will be set as a replacement of the original {@link PTransform}. Notably, this does not
* require that the {@code existingComponents} are present in the returned {@link
* MessageWithComponents}.
*
* <p>Introduced components must not collide with any components in the existing components.
*/
@Nullable
MessageWithComponents getReplacement(
String transformId, ComponentsOrBuilder existingComponents);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import static org.apache.beam.runners.core.construction.PTransformTranslation.MAP_WINDOWS_TRANSFORM_URN;
import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
import static org.apache.beam.runners.core.construction.PTransformTranslation.READ_TRANSFORM_URN;
import static org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN;
import static org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN;
import static org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN;
import static org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN;
import static org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN;
import static org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_SPLIT_RESTRICTION_URN;
import static org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

Expand Down Expand Up @@ -174,6 +176,8 @@ static Collection<String> getPrimitiveTransformIds(RunnerApi.Components componen
COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN,
COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN,
SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
SPLITTABLE_SPLIT_RESTRICTION_URN,
SPLITTABLE_PROCESS_KEYED_URN,
SPLITTABLE_PROCESS_ELEMENTS_URN,
SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
Expand Down
Loading

0 comments on commit af08a2b

Please sign in to comment.