Skip to content

Commit

Permalink
Fix rename of java example package to examples
Browse files Browse the repository at this point in the history
Some occurences in comments and POMs where not updated.

Also change signature of join and coGroup to always return a value, not
an Option.
  • Loading branch information
aljoscha committed Sep 22, 2014
1 parent 4cce46e commit a41a29b
Show file tree
Hide file tree
Showing 13 changed files with 39 additions and 38 deletions.
20 changes: 10 additions & 10 deletions flink-examples/flink-java-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ under the License.

<archive>
<manifestEntries>
<program-class>org.apache.flink.example.java.clustering.KMeans</program-class>
<program-class>org.apache.flink.examples.java.clustering.KMeans</program-class>
</manifestEntries>
</archive>

Expand All @@ -86,7 +86,7 @@ under the License.

<archive>
<manifestEntries>
<program-class>org.apache.flink.example.java.graph.TransitiveClosureNaive</program-class>
<program-class>org.apache.flink.examples.java.graph.TransitiveClosureNaive</program-class>
</manifestEntries>
</archive>

Expand All @@ -110,7 +110,7 @@ under the License.

<archive>
<manifestEntries>
<program-class>org.apache.flink.example.java.graph.ConnectedComponents</program-class>
<program-class>org.apache.flink.examples.java.graph.ConnectedComponents</program-class>
</manifestEntries>
</archive>

Expand All @@ -134,7 +134,7 @@ under the License.

<archive>
<manifestEntries>
<program-class>org.apache.flink.example.java.graph.EnumTrianglesBasic</program-class>
<program-class>org.apache.flink.examples.java.graph.EnumTrianglesBasic</program-class>
</manifestEntries>
</archive>

Expand All @@ -160,7 +160,7 @@ under the License.

<archive>
<manifestEntries>
<program-class>org.apache.flink.example.java.graph.EnumTrianglesOpt</program-class>
<program-class>org.apache.flink.examples.java.graph.EnumTrianglesOpt</program-class>
</manifestEntries>
</archive>

Expand All @@ -186,7 +186,7 @@ under the License.

<archive>
<manifestEntries>
<program-class>org.apache.flink.example.java.graph.PageRankBasic</program-class>
<program-class>org.apache.flink.examples.java.graph.PageRankBasic</program-class>
</manifestEntries>
</archive>

Expand Down Expand Up @@ -214,7 +214,7 @@ under the License.
<archive>
<manifestEntries>
<program-class>org.apache.flink.example.java.relational.TPCHQuery10</program-class>
<program-class>org.apache.flink.examples.java.relational.TPCHQuery10</program-class>
</manifestEntries>
</archive>
<includes>
Expand All @@ -237,7 +237,7 @@ under the License.
<archive>
<manifestEntries>
<program-class>org.apache.flink.example.java.relational.TPCHQuery3</program-class>
<program-class>org.apache.flink.examples.java.relational.TPCHQuery3</program-class>
</manifestEntries>
</archive>
<includes>
Expand All @@ -259,7 +259,7 @@ under the License.

<archive>
<manifestEntries>
<program-class>org.apache.flink.example.java.relational.WebLogAnalysis</program-class>
<program-class>org.apache.flink.examples.java.relational.WebLogAnalysis</program-class>
</manifestEntries>
</archive>

Expand All @@ -284,7 +284,7 @@ under the License.

<archive>
<manifestEntries>
<program-class>org.apache.flink.example.java.wordcount.WordCount</program-class>
<program-class>org.apache.flink.examples.java.wordcount.WordCount</program-class>
</manifestEntries>
</archive>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ import scala.collection.JavaConverters._
* {{{
* KMeans <points path> <centers path> <result path> <num iterations>
* }}}
* If no parameters are provided, the program is run with default data from `KMeansData`
* If no parameters are provided, the program is run with default data from
* [[org.apache.flink.examples.java.clustering.util.KMeansData]]
* and 10 iterations.
*
* This example shows how to use:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.flink.examples.scala.graph

import org.apache.flink.api.scala._
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData
import org.apache.flink.util.Collector

/**
* An implementation of the connected components algorithm, using a delta iteration.
Expand Down Expand Up @@ -49,7 +50,7 @@ import org.apache.flink.examples.java.graph.util.ConnectedComponentsData
* }}}
*
* If no parameters are provided, the program is run with default data from
* [[org.apache.flink.example.java.graph.util.ConnectedComponentsData]] and 10 iterations.
* [[org.apache.flink.examples.java.graph.util.ConnectedComponentsData]] and 10 iterations.
*
*
* This example shows how to use:
Expand Down Expand Up @@ -79,15 +80,16 @@ object ConnectedComponents {

// apply the step logic: join with the edges
val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
Some((edge._2, vertex._2))
(edge._2, vertex._2)
}

// select the minimum neighbor
val minNeighbors = allNeighbors.groupBy(0).min(1)

// update if the component of the candidate is smaller
val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
(newVertex, oldVertex) => if (newVertex._2 < oldVertex._2) Some(newVertex) else None
(newVertex, oldVertex, out: Collector[(Long, Long)]) =>
if (newVertex._2 < oldVertex._2) out.collect(newVertex)
}

// delta and new workset are identical
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import scala.collection.mutable.MutableList
* }}}
* <br>
* If no parameters are provided, the program is run with default data from
* [[org.apache.flink.example.java.graph.util.EnumTrianglesData]]
* [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]]
*
* <p>
* This example shows how to use:
Expand Down Expand Up @@ -87,7 +87,7 @@ object EnumTrianglesBasic {
// build triads
.groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder())
// filter triads
.join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => Some(t) }
.join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t }

// emit result
if (fileOutput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ import scala.collection.mutable.MutableList
* }}}
*
* If no parameters are provided, the program is run with default data from
* [[org.apache.flink.example.java.graph.util.EnumTrianglesData]].
* [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]].
*
* This example shows how to use:
*
Expand Down Expand Up @@ -108,7 +108,7 @@ object EnumTrianglesOpt {
// build triads
.groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder())
// filter triads
.join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => Some(t)}
.join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t}

// emit result
if (fileOutput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.flink.util.Collector
* }}}
*
* If no parameters are provided, the program is run with default data from
* [[org.apache.flink.example.java.graph.util.PageRankData]] and 10 iterations.
* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations.
*
* This example shows how to use:
*
Expand Down Expand Up @@ -108,9 +108,9 @@ object PageRankBasic {

// terminate if no rank update was significant
val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
(current, next) =>
(current, next, out: Collector[Int]) =>
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) Some(1) else None
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}

(newRanks, termination)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object TransitiveClosureNaive {
val nextPaths = prevPaths
.join(edges)
.where(1).equalTo(0) {
(left, right) => Some((left._1,right._2))
(left, right) => (left._1,right._2)
}
.union(prevPaths)
.groupBy(0, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.flink.util.Collector
*
* Input files are plain text CSV files using the pipe character ('|') as field separator.
* The tables referenced in the query can be generated using the
* [org.apache.flink.example.java.relational.util.WebLogDataGenerator]] and
* [org.apache.flink.examples.java.relational.util.WebLogDataGenerator]] and
* have the following schemas
*
* {{{
Expand Down Expand Up @@ -77,7 +77,7 @@ import org.apache.flink.util.Collector
* }}}
*
* If no parameters are provided, the program is run with default data from
* [[org.apache.flink.example.java.relational.util.WebLogData]].
* [[org.apache.flink.examples.java.relational.util.WebLogData]].
*
* This example shows how to use:
*
Expand Down Expand Up @@ -109,7 +109,7 @@ object WebLogAnalysis {
.filter(visit => visit._2.substring(0, 4).toInt == 2007)

val joinDocsRanks = filteredDocs.join(filteredRanks).where(0).equalTo(1) {
(doc, rank) => Some(rank)
(doc, rank) => rank
}

val result = joinDocsRanks.coGroup(filteredVisits).where(1).equalTo(0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.flink.examples.java.wordcount.util.WordCountData
* }}}
*
* If no parameters are provided, the program is run with default data from
* [[org.apache.flink.example.java.wordcount.util.WordCountData]]
* [[org.apache.flink.examples.java.wordcount.util.WordCountData]]
*
* This example shows how to:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ trait CoGroupDataSet[T, O] extends DataSet[(Array[T], Array[O])] {

/**
* Creates a new [[DataSet]] where the result for each pair of co-grouped element lists is the
* result of the given function. You can either return an element or choose to return [[None]],
* which allows implementing a filter directly in the coGroup function.
* result of the given function.
*/
def apply[R: TypeInformation: ClassTag](
fun: (TraversableOnce[T], TraversableOnce[O]) => Option[R]): DataSet[R]
fun: (TraversableOnce[T], TraversableOnce[O]) => R): DataSet[R]

/**
* Creates a new [[DataSet]] where the result for each pair of co-grouped element lists is the
Expand Down Expand Up @@ -100,11 +99,11 @@ private[flink] class CoGroupDataSetImpl[T, O](
otherKeys: Keys[O]) extends DataSet(coGroupOperator) with CoGroupDataSet[T, O] {

def apply[R: TypeInformation: ClassTag](
fun: (TraversableOnce[T], TraversableOnce[O]) => Option[R]): DataSet[R] = {
fun: (TraversableOnce[T], TraversableOnce[O]) => R): DataSet[R] = {
Validate.notNull(fun, "CoGroup function must not be null.")
val coGrouper = new CoGroupFunction[T, O, R] {
def coGroup(left: java.lang.Iterable[T], right: java.lang.Iterable[O], out: Collector[R]) = {
fun(left.iterator.asScala, right.iterator.asScala) map { out.collect(_) }
out.collect(fun(left.iterator.asScala, right.iterator.asScala))
}
}
val coGroupOperator = new CoGroupOperator[T, O, R](thisSet.set, otherSet.set, thisKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ trait JoinDataSet[T, O] extends DataSet[(T, O)] {

/**
* Creates a new [[DataSet]] where the result for each pair of joined elements is the result
* of the given function. You can either return an element or choose to return [[None]],
* which allows implementing a filter directly in the join function.
* of the given function.
*/
def apply[R: TypeInformation: ClassTag](fun: (T, O) => Option[R]): DataSet[R]
def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]

/**
* Creates a new [[DataSet]] by passing each pair of joined values to the given function.
Expand Down Expand Up @@ -107,11 +106,11 @@ private[flink] class JoinDataSetImpl[T, O](
extends DataSet(joinOperator)
with JoinDataSet[T, O] {

def apply[R: TypeInformation: ClassTag](fun: (T, O) => Option[R]): DataSet[R] = {
def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R] = {
Validate.notNull(fun, "Join function must not be null.")
val joiner = new FlatJoinFunction[T, O, R] {
def join(left: T, right: O, out: Collector[R]) = {
fun(left, right) map { out.collect(_) }
out.collect(fun(left, right))
}
}
val joinOperator = new EquiJoin[T, O, R](thisSet, otherSet, thisKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class DeltaIterationTranslationTest {
val result = initialSolutionSet.iterateDelta(initialWorkSet, NUM_ITERATIONS, ITERATION_KEYS) {
(s, ws) =>
val wsSelfJoin = ws.map(new IdentityMapper[(Double, String)]())
.join(ws).where(1).equalTo(1) { (l, r) => Some(l) }
.join(ws).where(1).equalTo(1) { (l, r) => l }

val joined = wsSelfJoin.join(s).where(1).equalTo(2).apply(new SolutionWorksetJoin)
(joined, joined.map(new NextWorksetMapper).name(BEFORE_NEXT_WORKSET_MAP))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//
//package org.apache.flink.test.exampleJavaPrograms;
//
////import org.apache.flink.example.java.wordcount.WordCountPOJO;
////import org.apache.flink.examples.java.wordcount.WordCountPOJO;
//import org.apache.flink.test.testdata.WordCountData;
//import org.apache.flink.test.util.JavaProgramTestBase;
//import org.junit.Ignore;
Expand Down

0 comments on commit a41a29b

Please sign in to comment.