Skip to content

Commit

Permalink
[BEAM-1636] UnboundedDataset action() does not materialize RDD
Browse files Browse the repository at this point in the history
  • Loading branch information
aviemzur committed Mar 7, 2017
1 parent 1fd52f5 commit a889597
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;

/**
Expand Down Expand Up @@ -106,11 +105,7 @@ public void cache(String storageLevel) {
@Override
public void action() {
// Empty function to force computation of RDD.
rdd.foreach(new VoidFunction<WindowedValue<T>>() {
@Override public void call(WindowedValue<T> tWindowedValue) throws Exception {
// Empty implementation.
}
});
rdd.foreach(TranslationUtils.<WindowedValue<T>>emptyVoidFunction());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;

Expand Down Expand Up @@ -267,4 +268,11 @@ public static void rejectStateAndTimers(DoFn<?, ?> doFn) {
}
}

public static <T> VoidFunction<T> emptyVoidFunction() {
return new VoidFunction<T>() {
@Override public void call(T t) throws Exception {
// Empty implementation.
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.spark.translation.Dataset;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -68,11 +71,17 @@ public void cache(String storageLevel) {
@Override
public void action() {
// Force computation of DStream.
dStream.dstream().register();
dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() {
@Override
public void call(JavaRDD<WindowedValue<T>> rdd) throws Exception {
rdd.foreach(TranslationUtils.<WindowedValue<T>>emptyVoidFunction());
}
});
}

@Override
public void setName(String name) {
// ignore
}

}

0 comments on commit a889597

Please sign in to comment.