diff --git a/flink-examples/flink-java-examples/pom.xml b/flink-examples/flink-java-examples/pom.xml index 31698293d369a..836b0bc313d79 100644 --- a/flink-examples/flink-java-examples/pom.xml +++ b/flink-examples/flink-java-examples/pom.xml @@ -61,7 +61,7 @@ under the License. - org.apache.flink.example.java.clustering.KMeans + org.apache.flink.examples.java.clustering.KMeans @@ -86,7 +86,7 @@ under the License. - org.apache.flink.example.java.graph.TransitiveClosureNaive + org.apache.flink.examples.java.graph.TransitiveClosureNaive @@ -110,7 +110,7 @@ under the License. - org.apache.flink.example.java.graph.ConnectedComponents + org.apache.flink.examples.java.graph.ConnectedComponents @@ -134,7 +134,7 @@ under the License. - org.apache.flink.example.java.graph.EnumTrianglesBasic + org.apache.flink.examples.java.graph.EnumTrianglesBasic @@ -160,7 +160,7 @@ under the License. - org.apache.flink.example.java.graph.EnumTrianglesOpt + org.apache.flink.examples.java.graph.EnumTrianglesOpt @@ -186,7 +186,7 @@ under the License. - org.apache.flink.example.java.graph.PageRankBasic + org.apache.flink.examples.java.graph.PageRankBasic @@ -214,7 +214,7 @@ under the License. - org.apache.flink.example.java.relational.TPCHQuery10 + org.apache.flink.examples.java.relational.TPCHQuery10 @@ -237,7 +237,7 @@ under the License. - org.apache.flink.example.java.relational.TPCHQuery3 + org.apache.flink.examples.java.relational.TPCHQuery3 @@ -259,7 +259,7 @@ under the License. - org.apache.flink.example.java.relational.WebLogAnalysis + org.apache.flink.examples.java.relational.WebLogAnalysis @@ -284,7 +284,7 @@ under the License. - org.apache.flink.example.java.wordcount.WordCount + org.apache.flink.examples.java.wordcount.WordCount diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index a0bb874c929d6..e3b499e6fe7e4 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -58,7 +58,8 @@ import scala.collection.JavaConverters._ * {{{ * KMeans * }}} - * If no parameters are provided, the program is run with default data from `KMeansData` + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.examples.java.clustering.util.KMeansData]] * and 10 iterations. * * This example shows how to use: diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala index 8e3fbfd1f43f7..df28ad012ac76 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala @@ -19,6 +19,7 @@ package org.apache.flink.examples.scala.graph import org.apache.flink.api.scala._ import org.apache.flink.examples.java.graph.util.ConnectedComponentsData +import org.apache.flink.util.Collector /** * An implementation of the connected components algorithm, using a delta iteration. @@ -49,7 +50,7 @@ import org.apache.flink.examples.java.graph.util.ConnectedComponentsData * }}} * * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.example.java.graph.util.ConnectedComponentsData]] and 10 iterations. + * [[org.apache.flink.examples.java.graph.util.ConnectedComponentsData]] and 10 iterations. * * * This example shows how to use: @@ -79,7 +80,7 @@ object ConnectedComponents { // apply the step logic: join with the edges val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) => - Some((edge._2, vertex._2)) + (edge._2, vertex._2) } // select the minimum neighbor @@ -87,7 +88,8 @@ object ConnectedComponents { // update if the component of the candidate is smaller val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) { - (newVertex, oldVertex) => if (newVertex._2 < oldVertex._2) Some(newVertex) else None + (newVertex, oldVertex, out: Collector[(Long, Long)]) => + if (newVertex._2 < oldVertex._2) out.collect(newVertex) } // delta and new workset are identical diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala index 0ad2e81d0533d..c920c31a1bcb1 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala @@ -57,7 +57,7 @@ import scala.collection.mutable.MutableList * }}} *
* If no parameters are provided, the program is run with default data from - * [[org.apache.flink.example.java.graph.util.EnumTrianglesData]] + * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]] * *

* This example shows how to use: @@ -87,7 +87,7 @@ object EnumTrianglesBasic { // build triads .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder()) // filter triads - .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => Some(t) } + .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t } // emit result if (fileOutput) { diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala index 24b7978d6ad83..80cce3505f453 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala @@ -61,7 +61,7 @@ import scala.collection.mutable.MutableList * }}} * * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.example.java.graph.util.EnumTrianglesData]]. + * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]]. * * This example shows how to use: * @@ -108,7 +108,7 @@ object EnumTrianglesOpt { // build triads .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder()) // filter triads - .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => Some(t)} + .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t} // emit result if (fileOutput) { diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala index cdd04f7b71f1c..5f515c46b1177 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala @@ -52,7 +52,7 @@ import org.apache.flink.util.Collector * }}} * * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.example.java.graph.util.PageRankData]] and 10 iterations. + * [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations. * * This example shows how to use: * @@ -108,9 +108,9 @@ object PageRankBasic { // terminate if no rank update was significant val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") { - (current, next) => + (current, next, out: Collector[Int]) => // check for significant update - if (math.abs(current.rank - next.rank) > EPSILON) Some(1) else None + if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1) } (newRanks, termination) diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala index 4c79a1be755e2..25347ca8ac1ae 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala @@ -37,7 +37,7 @@ object TransitiveClosureNaive { val nextPaths = prevPaths .join(edges) .where(1).equalTo(0) { - (left, right) => Some((left._1,right._2)) + (left, right) => (left._1,right._2) } .union(prevPaths) .groupBy(0, 1) diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala index 156d6350e4aea..100b3d2534106 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala @@ -45,7 +45,7 @@ import org.apache.flink.util.Collector * * Input files are plain text CSV files using the pipe character ('|') as field separator. * The tables referenced in the query can be generated using the - * [org.apache.flink.example.java.relational.util.WebLogDataGenerator]] and + * [org.apache.flink.examples.java.relational.util.WebLogDataGenerator]] and * have the following schemas * * {{{ @@ -77,7 +77,7 @@ import org.apache.flink.util.Collector * }}} * * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.example.java.relational.util.WebLogData]]. + * [[org.apache.flink.examples.java.relational.util.WebLogData]]. * * This example shows how to use: * @@ -109,7 +109,7 @@ object WebLogAnalysis { .filter(visit => visit._2.substring(0, 4).toInt == 2007) val joinDocsRanks = filteredDocs.join(filteredRanks).where(0).equalTo(1) { - (doc, rank) => Some(rank) + (doc, rank) => rank } val result = joinDocsRanks.coGroup(filteredVisits).where(1).equalTo(0) { diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala index 9d93ca8c73bb4..989ed874e2afe 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala @@ -32,7 +32,7 @@ import org.apache.flink.examples.java.wordcount.util.WordCountData * }}} * * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.example.java.wordcount.util.WordCountData]] + * [[org.apache.flink.examples.java.wordcount.util.WordCountData]] * * This example shows how to: * diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala index f936b43e9d3e2..b0661a1c1198d 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala @@ -63,11 +63,10 @@ trait CoGroupDataSet[T, O] extends DataSet[(Array[T], Array[O])] { /** * Creates a new [[DataSet]] where the result for each pair of co-grouped element lists is the - * result of the given function. You can either return an element or choose to return [[None]], - * which allows implementing a filter directly in the coGroup function. + * result of the given function. */ def apply[R: TypeInformation: ClassTag]( - fun: (TraversableOnce[T], TraversableOnce[O]) => Option[R]): DataSet[R] + fun: (TraversableOnce[T], TraversableOnce[O]) => R): DataSet[R] /** * Creates a new [[DataSet]] where the result for each pair of co-grouped element lists is the @@ -100,11 +99,11 @@ private[flink] class CoGroupDataSetImpl[T, O]( otherKeys: Keys[O]) extends DataSet(coGroupOperator) with CoGroupDataSet[T, O] { def apply[R: TypeInformation: ClassTag]( - fun: (TraversableOnce[T], TraversableOnce[O]) => Option[R]): DataSet[R] = { + fun: (TraversableOnce[T], TraversableOnce[O]) => R): DataSet[R] = { Validate.notNull(fun, "CoGroup function must not be null.") val coGrouper = new CoGroupFunction[T, O, R] { def coGroup(left: java.lang.Iterable[T], right: java.lang.Iterable[O], out: Collector[R]) = { - fun(left.iterator.asScala, right.iterator.asScala) map { out.collect(_) } + out.collect(fun(left.iterator.asScala, right.iterator.asScala)) } } val coGroupOperator = new CoGroupOperator[T, O, R](thisSet.set, otherSet.set, thisKeys, diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala index 3f9f6e942553d..c3f4dda51b3d5 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala @@ -62,10 +62,9 @@ trait JoinDataSet[T, O] extends DataSet[(T, O)] { /** * Creates a new [[DataSet]] where the result for each pair of joined elements is the result - * of the given function. You can either return an element or choose to return [[None]], - * which allows implementing a filter directly in the join function. + * of the given function. */ - def apply[R: TypeInformation: ClassTag](fun: (T, O) => Option[R]): DataSet[R] + def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R] /** * Creates a new [[DataSet]] by passing each pair of joined values to the given function. @@ -107,11 +106,11 @@ private[flink] class JoinDataSetImpl[T, O]( extends DataSet(joinOperator) with JoinDataSet[T, O] { - def apply[R: TypeInformation: ClassTag](fun: (T, O) => Option[R]): DataSet[R] = { + def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R] = { Validate.notNull(fun, "Join function must not be null.") val joiner = new FlatJoinFunction[T, O, R] { def join(left: T, right: O, out: Collector[R]) = { - fun(left, right) map { out.collect(_) } + out.collect(fun(left, right)) } } val joinOperator = new EquiJoin[T, O, R](thisSet, otherSet, thisKeys, diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala index 5631cbb9c4a56..9b953ee3b96ea 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala @@ -58,7 +58,7 @@ class DeltaIterationTranslationTest { val result = initialSolutionSet.iterateDelta(initialWorkSet, NUM_ITERATIONS, ITERATION_KEYS) { (s, ws) => val wsSelfJoin = ws.map(new IdentityMapper[(Double, String)]()) - .join(ws).where(1).equalTo(1) { (l, r) => Some(l) } + .join(ws).where(1).equalTo(1) { (l, r) => l } val joined = wsSelfJoin.join(s).where(1).equalTo(2).apply(new SolutionWorksetJoin) (joined, joined.map(new NextWorksetMapper).name(BEFORE_NEXT_WORKSET_MAP)) diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java index 5639ed7fe1b74..0928a46257373 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountPOJOITCase.java @@ -18,7 +18,7 @@ // //package org.apache.flink.test.exampleJavaPrograms; // -////import org.apache.flink.example.java.wordcount.WordCountPOJO; +////import org.apache.flink.examples.java.wordcount.WordCountPOJO; //import org.apache.flink.test.testdata.WordCountData; //import org.apache.flink.test.util.JavaProgramTestBase; //import org.junit.Ignore;