Skip to content

Commit

Permalink
BEAM-858 Enable ApexRunner integration test in examples.
Browse files Browse the repository at this point in the history
  • Loading branch information
tweise committed Nov 1, 2016
1 parent 968eb32 commit 77f4ba2
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 32 deletions.
31 changes: 31 additions & 0 deletions examples/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-apex</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
Expand Down Expand Up @@ -224,6 +232,29 @@
</systemPropertyVariables>
</configuration>
</execution>
<execution>
<id>apex-runner-integration-tests</id>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<includes>
<include>WordCountIT.java</include>
</includes>
<parallel>all</parallel>
<threadCount>4</threadCount>
<systemPropertyVariables>
<beamTestPipelineOptions>
[
"--project=apache-beam-testing",
"--tempRoot=gs:https://temp-storage-for-end-to-end-tests",
"--runner=org.apache.beam.runners.apex.TestApexRunner"
]
</beamTestPipelineOptions>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
2 changes: 1 addition & 1 deletion runners/apex/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<groupId>org.apache.apex</groupId>
<artifactId>apex-engine</artifactId>
<version>${apex.core.version}</version>
<scope>test</scope>
<scope>runtime</scope>
</dependency>

<!--- Beam -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializab

String getApplicationName();

@Description("set parallelism for Apex runner")
void setParallelism(int parallelism);

@Default.Integer(1)
int getParallelism();

@Description("execute the pipeline with embedded cluster")
void setEmbeddedExecution(boolean embedded);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void translate(Read.Bounded<T> transform, TranslationContext context) {
BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(
transform.getSource());
ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
unboundedSource, context.getPipelineOptions());
unboundedSource, true, context.getPipelineOptions());
context.addOperator(operator, operator.output);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,6 @@ public void populateDAG(DAG dag, Configuration conf) {
}
assertionError = null;
lc.runAsync();
if (options.getRunMillis() > 0) {
try {
long timeout = System.currentTimeMillis() + options.getRunMillis();
while (System.currentTimeMillis() < timeout) {
if (assertionError != null) {
throw assertionError;
}
}
} finally {
lc.shutdown();
}
}
return new ApexRunnerResult(lma.getDAG(), lc);
} catch (Exception e) {
Throwables.propagateIfPossible(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.datatorrent.api.LocalMode;

import java.io.IOException;
import java.lang.reflect.Field;

import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
Expand Down Expand Up @@ -63,12 +64,12 @@ public State cancel() throws IOException {

@Override
public State waitUntilFinish(Duration duration) {
throw new UnsupportedOperationException();
return ApexRunnerResult.waitUntilFinished(ctrl, duration);
}

@Override
public State waitUntilFinish() {
throw new UnsupportedOperationException();
return ApexRunnerResult.waitUntilFinished(ctrl, null);
}

@Override
Expand All @@ -84,4 +85,26 @@ public DAG getApexDAG() {
return apexDAG;
}

public static State waitUntilFinished(LocalMode.Controller ctrl, Duration duration) {
// we need to rely on internal field for now
// Apex should make it available through API in upcoming release.
long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE
: System.currentTimeMillis() + duration.getMillis();
Field appDoneField;
try {
appDoneField = ctrl.getClass().getDeclaredField("appDone");
appDoneField.setAccessible(true);
while (!appDoneField.getBoolean(ctrl) && System.currentTimeMillis() < timeout) {
if (ApexRunner.assertionError != null) {
throw ApexRunner.assertionError;
}
Thread.sleep(500);
}
return appDoneField.getBoolean(ctrl) ? State.DONE : null;
} catch (NoSuchFieldException | SecurityException | IllegalArgumentException
| IllegalAccessException | InterruptedException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,28 @@
*/
package org.apache.beam.runners.apex;

import java.io.IOException;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.joda.time.Duration;

/**
* Apex {@link PipelineRunner} for testing.
*/
public class TestApexRunner extends PipelineRunner<ApexRunnerResult> {

private ApexRunner delegate;
private static final int RUN_WAIT_MILLIS = 20000;
private final ApexRunner delegate;

private TestApexRunner(ApexPipelineOptions options) {
options.setEmbeddedExecution(true);
//options.setEmbeddedExecutionDebugMode(false);
options.setRunMillis(20000);
this.delegate = ApexRunner.fromOptions(options);
}

Expand All @@ -53,7 +56,18 @@ OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {

@Override
public ApexRunnerResult run(Pipeline pipeline) {
return delegate.run(pipeline);
ApexRunnerResult result = delegate.run(pipeline);
try {
// this is necessary for tests that just call run() and not waitUntilFinish
result.waitUntilFinish(Duration.millis(RUN_WAIT_MILLIS));
return result;
} finally {
try {
result.cancel();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
private final SerializablePipelineOptions pipelineOptions;
@Bind(JavaSerializer.class)
private final UnboundedSource<OutputT, CheckpointMarkT> source;
private final boolean isBoundedSource;
private transient UnboundedSource.UnboundedReader<OutputT> reader;
private transient boolean available = false;
@OutputPortFieldAnnotation(optional = true)
Expand All @@ -65,16 +66,24 @@ public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT>
ApexPipelineOptions options) {
this.pipelineOptions = new SerializablePipelineOptions(options);
this.source = source;
this.isBoundedSource = false;
}

public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
boolean isBoundedSource, ApexPipelineOptions options) {
this.pipelineOptions = new SerializablePipelineOptions(options);
this.source = source;
this.isBoundedSource = isBoundedSource;
}

@SuppressWarnings("unused") // for Kryo
private ApexReadUnboundedInputOperator() {
this.pipelineOptions = null; this.source = null;
this.pipelineOptions = null; this.source = null; this.isBoundedSource = false;
}

@Override
public void beginWindow(long windowId) {
if (!available && source instanceof ValuesSource) {
if (!available && (isBoundedSource || source instanceof ValuesSource)) {
// if it's a Create and the input was consumed, emit final watermark
emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
// terminate the stream (allows tests to finish faster)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public void testWindowedWordCount() throws Exception {
ApexPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(ApexPipelineOptions.class);
options.setApplicationName("StreamingWordCount");
options.setParallelism(1);
Pipeline p = Pipeline.create(options);

PCollection<KV<String, Long>> wordCounts =
Expand All @@ -110,12 +109,13 @@ public void testWindowedWordCount() throws Exception {
&& FormatAsStringFn.RESULTS.containsKey("bar")) {
break;
}
Thread.sleep(1000);
result.waitUntilFinish(Duration.millis(1000));
}
result.cancel();
Assert.assertTrue(
FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar"));
FormatAsStringFn.RESULTS.clear();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void testContainsInAnyOrder() throws Exception {
Pipeline pipeline = Pipeline.create(options);
PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3);
// TODO: good candidate to terminate fast based on processed assertion vs. for auto-shutdown
// TODO: terminate faster based on processed assertion vs. auto-shutdown
pipeline.run();
}

Expand Down Expand Up @@ -263,8 +263,8 @@ public void testMultiOutputParDoWithSideInputs() throws Exception {
Pipeline pipeline = Pipeline.create(options);

List<Integer> inputs = Arrays.asList(3, -42, 666);
final TupleTag<String> mainOutputTag = new TupleTag<String>("main");
final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput");
final TupleTag<String> mainOutputTag = new TupleTag<>("main");
final TupleTag<Void> sideOutputTag = new TupleTag<>("sideOutput");

PCollectionView<Integer> sideInput1 = pipeline
.apply("CreateSideInput1", Create.of(11))
Expand Down

0 comments on commit 77f4ba2

Please sign in to comment.