diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java index 7db04a8a2c859..6e4ffc7b0334e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java @@ -32,7 +32,6 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.storage.StorageLevel; /** @@ -106,11 +105,7 @@ public void cache(String storageLevel) { @Override public void action() { // Empty function to force computation of RDD. - rdd.foreach(new VoidFunction>() { - @Override public void call(WindowedValue tWindowedValue) throws Exception { - // Empty implementation. - } - }); + rdd.foreach(TranslationUtils.>emptyVoidFunction()); } @Override diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index f2b34183e8b2b..8545b360b31b1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -43,6 +43,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; @@ -267,4 +268,11 @@ public static void rejectStateAndTimers(DoFn doFn) { } } + public static VoidFunction emptyVoidFunction() { + return new VoidFunction() { + @Override public void call(T t) throws Exception { + // Empty implementation. + } + }; + } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java index e9abe93cba1a6..ccdaf113b4b12 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java @@ -21,7 +21,10 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.spark.translation.Dataset; +import org.apache.beam.runners.spark.translation.TranslationUtils; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,11 +71,17 @@ public void cache(String storageLevel) { @Override public void action() { // Force computation of DStream. - dStream.dstream().register(); + dStream.foreachRDD(new VoidFunction>>() { + @Override + public void call(JavaRDD> rdd) throws Exception { + rdd.foreach(TranslationUtils.>emptyVoidFunction()); + } + }); } @Override public void setName(String name) { // ignore } + }