Skip to content

Commit

Permalink
Rewrite the Scala API as (somewhat) thin Layer on Java API
Browse files Browse the repository at this point in the history
Don't bother looking at the diff, this is almost a complete rewrite of
the previous Scala API. This uses all the work put into the Java API,
such as TypeInformation, the serializers and comparators and the
operators.

The Scala DataSet and ExecutionEnvironment wrap their respective Java
equivalents. TypeInformation is generated by a macro that uses
TypeInformationGen and other macro support classes. The Java
TypeExtractor is completely bypassed but the TypeInformation and
sub-classes are created by the Scala type analyzer. There is special
support for Scala Tuples in the form of ScalaTupleTypeInfo,
ScalaTupleSerializer, and ScalaTupleComparator.

This also adds tests to flink-scala that are ports of the tests in
flink-java.

There are not yet any Scala specific tests in flink-tests. All the
scala example ITCases are commented out, as well as the examples
themselves. Those will be uncommented once the examples are ported. This
will happen in separate commits.
  • Loading branch information
aljoscha committed Sep 22, 2014
1 parent 568dff1 commit b8131fa
Show file tree
Hide file tree
Showing 137 changed files with 9,193 additions and 10,230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ else if (should instanceof char[]) {

// --------------------------------------------------------------------------------------------

private TypeSerializer<T> getSerializer() {
protected TypeSerializer<T> getSerializer() {
TypeSerializer<T> serializer = createSerializer();
if (serializer == null) {
throw new RuntimeException("Test case corrupt. Returns null as serializer.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,67 +24,82 @@
import org.apache.flink.example.java.clustering.KMeans.Centroid;
import org.apache.flink.example.java.clustering.KMeans.Point;

import java.util.LinkedList;
import java.util.List;

/**
* Provides the default data sets used for the K-Means example program.
* The default data sets are used, if no parameters are given to the program.
*
*/
public class KMeansData {

// We have the data as object arrays so that we can also generate Scala Data Sources from it.
public static final Object[][] CENTROIDS = new Object[][] {
new Object[] {1, -31.85, -44.77},
new Object[]{2, 35.16, 17.46},
new Object[]{3, -5.16, 21.93},
new Object[]{4, -24.06, 6.81}
};

public static final Object[][] POINTS = new Object[][] {
new Object[] {-14.22, -48.01},
new Object[] {-22.78, 37.10},
new Object[] {56.18, -42.99},
new Object[] {35.04, 50.29},
new Object[] {-9.53, -46.26},
new Object[] {-34.35, 48.25},
new Object[] {55.82, -57.49},
new Object[] {21.03, 54.64},
new Object[] {-13.63, -42.26},
new Object[] {-36.57, 32.63},
new Object[] {50.65, -52.40},
new Object[] {24.48, 34.04},
new Object[] {-2.69, -36.02},
new Object[] {-38.80, 36.58},
new Object[] {24.00, -53.74},
new Object[] {32.41, 24.96},
new Object[] {-4.32, -56.92},
new Object[] {-22.68, 29.42},
new Object[] {59.02, -39.56},
new Object[] {24.47, 45.07},
new Object[] {5.23, -41.20},
new Object[] {-23.00, 38.15},
new Object[] {44.55, -51.50},
new Object[] {14.62, 59.06},
new Object[] {7.41, -56.05},
new Object[] {-26.63, 28.97},
new Object[] {47.37, -44.72},
new Object[] {29.07, 51.06},
new Object[] {0.59, -31.89},
new Object[] {-39.09, 20.78},
new Object[] {42.97, -48.98},
new Object[] {34.36, 49.08},
new Object[] {-21.91, -49.01},
new Object[] {-46.68, 46.04},
new Object[] {48.52, -43.67},
new Object[] {30.05, 49.25},
new Object[] {4.03, -43.56},
new Object[] {-37.85, 41.72},
new Object[] {38.24, -48.32},
new Object[] {20.83, 57.85}
};

public static DataSet<Centroid> getDefaultCentroidDataSet(ExecutionEnvironment env) {

return env.fromElements(
new Centroid(1, -31.85, -44.77),
new Centroid(2, 35.16, 17.46),
new Centroid(3, -5.16, 21.93),
new Centroid(4, -24.06, 6.81)
);
List<Centroid> centroidList = new LinkedList<Centroid>();
for (Object[] centroid : CENTROIDS) {
centroidList.add(
new Centroid((Integer) centroid[0], (Double) centroid[1], (Double) centroid[2]));
}
return env.fromCollection(centroidList);
}

public static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env) {

return env.fromElements(
new Point(-14.22, -48.01),
new Point(-22.78, 37.10),
new Point(56.18, -42.99),
new Point(35.04, 50.29),
new Point(-9.53, -46.26),
new Point(-34.35, 48.25),
new Point(55.82, -57.49),
new Point(21.03, 54.64),
new Point(-13.63, -42.26),
new Point(-36.57, 32.63),
new Point(50.65, -52.40),
new Point(24.48, 34.04),
new Point(-2.69, -36.02),
new Point(-38.80, 36.58),
new Point(24.00, -53.74),
new Point(32.41, 24.96),
new Point(-4.32, -56.92),
new Point(-22.68, 29.42),
new Point(59.02, -39.56),
new Point(24.47, 45.07),
new Point(5.23, -41.20),
new Point(-23.00, 38.15),
new Point(44.55, -51.50),
new Point(14.62, 59.06),
new Point(7.41, -56.05),
new Point(-26.63, 28.97),
new Point(47.37, -44.72),
new Point(29.07, 51.06),
new Point(0.59, -31.89),
new Point(-39.09, 20.78),
new Point(42.97, -48.98),
new Point(34.36, 49.08),
new Point(-21.91, -49.01),
new Point(-46.68, 46.04),
new Point(48.52, -43.67),
new Point(30.05, 49.25),
new Point(4.03, -43.56),
new Point(-37.85, 41.72),
new Point(38.24, -48.32),
new Point(20.83, 57.85)
);
List<Point> pointList = new LinkedList<Point>();
for (Object[] point : POINTS) {
pointList.add(new Point((Double) point[0], (Double) point[1]));
}
return env.fromCollection(pointList);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,45 @@
*/
public class WordCountData {

public static final String[] WORDS = new String[] {
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
};

public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) {

return env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
);
return env.fromElements(WORDS);
}
}
10 changes: 8 additions & 2 deletions flink-examples/flink-scala-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ under the License.
<artifactId>flink-scala</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -232,7 +237,8 @@ under the License.
</includes>
</configuration>
</execution>
-->

<execution>
<id>WordCount</id>
<phase>package</phase>
Expand All @@ -254,7 +260,7 @@ under the License.
</includes>
</configuration>
</execution>
<!--
<execution>
<id>ConnectedComponents</id>
<phase>package</phase>
Expand Down
Loading

0 comments on commit b8131fa

Please sign in to comment.