Skip to content

Commit

Permalink
[FLINK-6779] [scala] Activate strict checkstyle
Browse files Browse the repository at this point in the history
This closes apache#4030
  • Loading branch information
greghogan committed May 31, 2017
1 parent 9d9c53e commit 6ab7719
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 25 deletions.
36 changes: 36 additions & 0 deletions flink-scala/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,42 @@ under the License.
</executions>
</plugin>

<!-- Java check style -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>6.19</version>
</dependency>
</dependencies>
<configuration>
<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
</configuration>
<executions>
<!--
Execute checkstyle after compilation but before tests.
This ensures that any parsing or type checking errors are from
javac, so they look as expected. Beyond that, we want to
fail as early as possible.
-->
<execution>
<phase>test-compile</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@

package org.apache.flink.api.scala.operators;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
Expand All @@ -36,14 +34,16 @@
import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.operators.Grouping;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.operators.SingleInputOperator;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

import java.util.ArrayList;
import java.util.List;

import scala.Product;

/**
Expand All @@ -62,8 +62,7 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
private final Grouping<IN> grouping;

/**
* <p>
* Non grouped aggregation
* Non grouped aggregation.
*/
public ScalaAggregateOperator(org.apache.flink.api.java.DataSet<IN> input, Aggregations function, int field) {
super(Preconditions.checkNotNull(input), input.getType());
Expand All @@ -90,8 +89,7 @@ public ScalaAggregateOperator(org.apache.flink.api.java.DataSet<IN> input, Aggre
}

/**
*
* Grouped aggregation
* Grouped aggregation.
*
* @param input
* @param function
Expand Down Expand Up @@ -121,7 +119,6 @@ public ScalaAggregateOperator(Grouping<IN> input, Aggregations function, int fie
this.grouping = input;
}


public ScalaAggregateOperator<IN> and(Aggregations function, int field) {
Preconditions.checkNotNull(function);

Expand All @@ -131,7 +128,6 @@ public ScalaAggregateOperator<IN> and(Aggregations function, int field) {
throw new IllegalArgumentException("Aggregation field position is out of range.");
}


AggregationFunctionFactory factory = function.getFactory();
AggregationFunction<?> aggFunct = factory.createAggregationFunction(inType.getTypeAt(field).getTypeClass());

Expand All @@ -150,7 +146,6 @@ protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN,
throw new IllegalStateException();
}


// construct the aggregation function
AggregationFunction<Object>[] aggFunctions = new AggregationFunction[this.aggregationFunctions.size()];
int[] fields = new int[this.fields.size()];
Expand All @@ -162,12 +157,11 @@ protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN,

genName.append(aggFunctions[i].toString()).append('(').append(fields[i]).append(')').append(',');
}
genName.setLength(genName.length()-1);
genName.setLength(genName.length() - 1);

@SuppressWarnings("rawtypes")
RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(getInputType(), aggFunctions, fields);


String name = getName() != null ? getName() : genName.toString();

// distinguish between grouped reduce and non-grouped reduce
Expand Down Expand Up @@ -234,10 +228,10 @@ else if (this.grouping.getKeys() instanceof Keys.SelectorFunctionKeys) {
// --------------------------------------------------------------------------------------------

@Internal
public static final class AggregatingUdf<T extends Product>
private static final class AggregatingUdf<T extends Product>
extends RichGroupReduceFunction<T, T>
implements GroupCombineFunction<T, T>
{
implements GroupCombineFunction<T, T> {

private static final long serialVersionUID = 1L;

private final int[] fieldPositions;
Expand All @@ -258,7 +252,6 @@ public AggregatingUdf(TypeInformation<T> typeInfo, AggregationFunction<Object>[]
this.fieldPositions = fieldPositions;
}


@Override
public void open(Configuration parameters) throws Exception {
for (AggregationFunction<Object> aggFunction : aggFunctions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/


package org.apache.flink.api.scala.operators;

import org.apache.flink.annotation.PublicEvolving;
Expand All @@ -29,13 +28,14 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Product;

/**
Expand Down Expand Up @@ -124,8 +124,8 @@ public ScalaCsvOutputFormat(Path outputPath, String recordDelimiter, String fiel
/**
* Configures the format to either allow null values (writing an empty field),
* or to throw an exception when encountering a null field.
* <p>
* by default, null values are allowed.
*
* <p>By default, null values are allowed.
*
* @param allowNulls Flag to indicate whether the output format should accept null values.
*/
Expand All @@ -147,8 +147,8 @@ public void setCharsetName(String charsetName) {
* Configures whether the output format should quote string values. String values are fields
* of type {@link String} and {@link org.apache.flink.types.StringValue}, as well as
* all subclasses of the latter.
* <p>
* By default, strings are not quoted.
*
* <p>By default, strings are not quoted.
*
* @param quoteStrings Flag indicating whether string fields should be quoted.
*/
Expand Down Expand Up @@ -217,7 +217,6 @@ public String toString() {
}

/**
*
* The purpose of this method is solely to check whether the data type to be processed
* is in fact a tuple type.
*/
Expand Down

0 comments on commit 6ab7719

Please sign in to comment.