Skip to content

Commit

Permalink
[FLINK-2050] [ml] Ports existing ML algorithms to new pipeline mechanism
Browse files Browse the repository at this point in the history
Adds pipeline comments

Adds pipeline IT case
  • Loading branch information
tillrohrmann committed May 22, 2015
1 parent fde0341 commit 1e57475
Show file tree
Hide file tree
Showing 88 changed files with 2,217 additions and 1,794 deletions.
4 changes: 2 additions & 2 deletions docs/libs/ml/als.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ val parameters = ParameterMap()
.add(ALS.Seed, 42l)

// Calculate the factorization
val factorization = als.fit(inputDS, parameters)
als.fit(inputDS, parameters)

// Read the testing data set from a csv file
val testingDS: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData)

// Calculate the ratings according to the matrix factorization
val predictedRatings = factorization.transform(testingDS)
val predictedRatings = als.predict(testingDS)
{% endhighlight %}
6 changes: 3 additions & 3 deletions docs/libs/ml/cocoa.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,19 @@ The CoCoA implementation can be controlled by the following parameters:
val trainingDS: DataSet[LabeledVector] = env.readSVMFile(pathToTrainingFile)

// Create the CoCoA learner
val cocoa = CoCoA()
val svm = CoCoA()
.setBlocks(10)
.setIterations(10)
.setLocalIterations(10)
.setRegularization(0.5)
.setStepsize(0.5)

// Learn the SVM model
val svm = cocoa.fit(trainingDS)
svm.fit(trainingDS)

// Read the testing data set
val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)

// Calculate the predictions for the testing data set
val predictionDS: DataSet[LabeledVector] = model.transform(testingDS)
val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
{% endhighlight %}
4 changes: 2 additions & 2 deletions docs/libs/ml/multiple_linear_regression.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ val trainingDS: DataSet[LabeledVector] = ...
val testingDS: DataSet[Vector] = ...

// Fit the linear model to the provided data
val model = mlr.fit(trainingDS)
mlr.fit(trainingDS)

// Calculate the predictions for the test data
val predictions = model.transform(testingDS)
val predictions = mlr.predict(testingDS)
{% endhighlight %}
4 changes: 2 additions & 2 deletions docs/libs/ml/polynomial_base_feature_mapper.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ val parameters = ParameterMap()
.add(MultipleLinearRegression.Stepsize, 0.5)

// Create pipeline PolynomialBase -> MultipleLinearRegression
val chained = polyBase.chain(mlr)
val pipeline = polyBase.chainPredictor(mlr)

// Learn the model
val model = chained.fit(trainingDS)
pipeline.fit(trainingDS)
{% endhighlight %}
3 changes: 3 additions & 0 deletions docs/libs/ml/standard_scaler.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ val scaler = StandardScaler()
// Obtain data set to be scaled
val dataSet: DataSet[Vector] = ...

// Learn the mean and standard deviation of the training data
scaler.fit(dataSet)

// Scale the provided data set to have mean=10.0 and std=2.0
val scaledDS = scaler.transform(dataSet)
{% endhighlight %}
2 changes: 1 addition & 1 deletion flink-clients/src/test/assembly/test-assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ under the License.
<fileSet>
<directory>${project.build.testOutputDirectory}</directory>
<outputDirectory>/</outputDirectory>
<!--modify/add include to match your pipeline(s) -->
<!--modify/add include to match your package(s) -->
<includes>
<include>org/apache/flink/client/testjar/**</include>
</includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.junit.Test;

/**
* This class contains test for the configuration pipeline. In particular, the serialization of {@link Configuration}
* This class contains test for the configuration package. In particular, the serialization of {@link Configuration}
* objects is tested.
*/
public class ConfigurationTest {
Expand Down
2 changes: 1 addition & 1 deletion flink-dist/src/main/assemblies/bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ under the License.
</excludes>
</fileSet>
<fileSet>
<!-- copy python pipeline -->
<!-- copy python package -->
<directory>../flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python</directory>
<outputDirectory>resources/python/</outputDirectory>
<fileMode>0755</fileMode>
Expand Down
4 changes: 2 additions & 2 deletions flink-examples/flink-java-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ under the License.
<!--
<execution>
<id>TPCHQuery10</id>
<phase>pipeline</phase>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
Expand All @@ -228,7 +228,7 @@ under the License.
<!--
<execution>
<id>TPCHQuery3</id>
<phase>pipeline</phase>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public boolean filter(Lineitem l) throws ParseException {
}
});

// Join customers with orders and pipeline them into a ShippingPriorityItem
// Join customers with orders and package them into a ShippingPriorityItem
DataSet<ShippingPriorityItem> customerWithOrders =
customers.join(orders).where(0).equalTo(1)
.with(
Expand Down
6 changes: 3 additions & 3 deletions flink-examples/flink-scala-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ under the License.
</configuration>
</plugin>

<!-- get default data from flink-java-examples pipeline -->
<!-- get default data from flink-java-examples package -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
Expand Down Expand Up @@ -357,7 +357,7 @@ under the License.
<!--
<execution>
<id>TPCHQuery10</id>
<phase>pipeline</phase>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
Expand All @@ -380,7 +380,7 @@ under the License.
<!--
<execution>
<id>TPCHQuery3</id>
<phase>pipeline</phase>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
*/

//pipeline org.apache.flink.api.java.aggregation;
//package org.apache.flink.api.java.aggregation;
//
//
//public abstract class AvgAggregationFunction<T> extends AggregationFunction<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,8 @@ private static void writeTupleClass(PrintWriter w, int numFields) {
// head
w.print(HEADER);

// pipeline and imports
w.println("pipeline " + PACKAGE + ';');
// package and imports
w.println("package " + PACKAGE + ';');
w.println();
w.println("import org.apache.flink.util.StringUtils;");
w.println();
Expand Down Expand Up @@ -780,8 +780,8 @@ private static void writeTupleBuilderClass(PrintWriter w, int numFields) {
// head
w.print(HEADER);

// pipeline and imports
w.println("pipeline " + PACKAGE + "." + BUILDER_SUFFIX + ';');
// package and imports
w.println("package " + PACKAGE + "." + BUILDER_SUFFIX + ';');
w.println();
w.println("import java.util.LinkedList;");
w.println("import java.util.List;");
Expand Down
2 changes: 1 addition & 1 deletion flink-java8/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ under the License.
</configuration>
</plugin>

<!-- get default data from flink-java-examples pipeline -->
<!-- get default data from flink-java-examples package -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

/**
* This pipeline contains the various traversals over the program plan and the
* This package contains the various traversals over the program plan and the
* optimizer DAG (directed acyclic graph) that are made in the course of
* the optimization.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ under the License.

<!--
Execute "mvn clean pipeline -Pbuild-jar"
Execute "mvn clean package -Pbuild-jar"
to build a jar file out of this project!
How to use the Flink Quickstart pom:
Expand All @@ -61,11 +61,11 @@ under the License.
b) Build a jar for running on the cluster:
There are two options for creating a jar from this project
b.1) "mvn clean pipeline" -> this will create a fat jar which contains all
b.1) "mvn clean package" -> this will create a fat jar which contains all
dependencies necessary for running the jar created by this pom in a cluster.
The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.
b.2) "mvn clean pipeline -Pbuild-jar" -> This will also create a fat-jar, but with much
b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
nicer dependency exclusion handling. This approach is preferred and leads to
much cleaner jar files.
-->
Expand Down Expand Up @@ -98,7 +98,7 @@ under the License.
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<!-- Run shade goal on pipeline phase -->
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
* Skeleton for a Flink Job.
*
* For a full example of a Flink Job, see the WordCountJob.java file in the
* same pipeline/directory or have a look at the website.
* same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
* cluster.
* Just type
* mvn clean pipeline
* mvn clean package
* in the projects root directory.
* You will find the jar in
* target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ under the License.

<!--
Execute "mvn clean pipeline -Pbuild-jar"
Execute "mvn clean package -Pbuild-jar"
to build a jar file out of this project!
How to use the Flink Quickstart pom:
Expand All @@ -62,11 +62,11 @@ under the License.
b) Build a jar for running on the cluster:
There are two options for creating a jar from this project
b.1) "mvn clean pipeline" -> this will create a fat jar which contains all
b.1) "mvn clean package" -> this will create a fat jar which contains all
dependencies necessary for running the jar created by this pom in a cluster.
The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.
b.2) "mvn clean pipeline -Pbuild-jar" -> This will also create a fat-jar, but with much
b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
nicer dependency exclusion handling. This approach is preferred and leads to
much cleaner jar files.
-->
Expand Down Expand Up @@ -102,7 +102,7 @@ under the License.
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<!-- Run shade goal on pipeline phase -->
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import org.apache.flink.api.scala._
* Skeleton for a Flink Job.
*
* For a full example of a Flink Job, see the WordCountJob.scala file in the
* same pipeline/directory or have a look at the website.
* same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
* cluster. Just type
* {{{
* mvn clean pipeline
* mvn clean package
* }}}
* in the projects root directory. You will find the jar in
* target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
* Skeleton for a Flink on Tez Job running using Tez local mode.
*
* For a full example of a Flink on TezJob, see the WordCountJob.java file in the
* same pipeline/directory or have a look at the website.
* same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
* cluster.
* Just type
* mvn clean pipeline
* mvn clean package
* in the projects root directory.
* You will find the jar in
* target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
* Skeleton for a Flink on Tez program running on Yarn.
*
* For a full example of a Flink on Tez program, see the WordCountJob.java file in the
* same pipeline/directory or have a look at the website.
* same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
* cluster.
* Just type
* mvn clean pipeline
* mvn clean package
* in the projects root directory.
* You will find the jar in
* target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

/**
* This pipeline contains the messages that are sent between {@link org.apache.flink.runtime.jobmanager.JobManager}
* This package contains the messages that are sent between {@link org.apache.flink.runtime.jobmanager.JobManager}
* and {@link org.apache.flink.runtime.taskmanager.TaskManager} to coordinate the checkpoint snapshots of the
* distributed dataflow.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

/**
* This pipeline contains the messages that are sent between actors, like the
* This package contains the messages that are sent between actors, like the
* {@link org.apache.flink.runtime.jobmanager.JobManager} and
* {@link org.apache.flink.runtime.taskmanager.TaskManager} to coordinate the distributed operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ public synchronized JarFileCreator addClass(final Class<?> clazz) {
}

/**
* Manually specify the pipeline of the dependencies.
* Manually specify the package of the dependencies.
*
* @param p
* the pipeline to be included.
* the package to be included.
*/
public synchronized JarFileCreator addPackage(String p) {
this.packages.add(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void TestUDFPackage() throws IOException {
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class");
ans.add("org/apache/flink/util/Collector.class");

Assert.assertTrue("Jar file for UDF pipeline is not correct", validate(ans, out));
Assert.assertTrue("Jar file for UDF package is not correct", validate(ans, out));

out.delete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ import scala.reflect.ClassTag
*
* A rich function can be used when more control is required, for example for accessing the
* `RuntimeContext`. The rich function for `flatMap` is `RichFlatMapFunction`, all other functions
* are named similarly. All functions are available in pipeline
* are named similarly. All functions are available in package
* `org.apache.flink.api.common.functions`.
*
* The elements are partitioned depending on the parallelism of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ under the License.
<fileSet>
<directory>${project.build.testOutputDirectory}</directory>
<outputDirectory>/</outputDirectory>
<!--modify/add include to match your pipeline(s) -->
<!--modify/add include to match your package(s) -->
<includes>
<include>org/apache/flink/api/avro/testjar/**</include>
</includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

/**
* Hadoop 1.2.1 {@link org.apache.hadoop.mapred.FileOutputCommitter} takes {@link org.apache.hadoop.mapred.JobContext}
* as input parameter. However JobContext class is pipeline private, and in Hadoop 2.2.0 it's public.
* as input parameter. However JobContext class is package private, and in Hadoop 2.2.0 it's public.
* This class takes {@link org.apache.hadoop.mapred.JobConf} as input instead of JobContext in order to setup and commit tasks.
*/
public class HadoopFileOutputCommitter extends FileOutputCommitter implements Serializable {
Expand Down
Loading

0 comments on commit 1e57475

Please sign in to comment.