Skip to content

Commit

Permalink
[FLINK-6711] Activate strict checkstyle for flink-connector-filesystem
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed May 27, 2017
1 parent fab8fe5 commit 7292c87
Show file tree
Hide file tree
Showing 26 changed files with 315 additions and 296 deletions.
3 changes: 1 addition & 2 deletions flink-connectors/flink-connector-filesystem/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -104,7 +104,6 @@ under the License.
<type>test-jar</type>
</dependency>


<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
* limitations under the License.
*/

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;

import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
Expand All @@ -31,15 +32,15 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Map;

/**
* Implementation of AvroKeyValue writer that can be used in Sink.
* Each entry would be wrapped in GenericRecord with key/value fields(same as in m/r lib)
Expand All @@ -49,15 +50,15 @@
BucketingSink<Tuple2<Long, Long>> sink = new BucketingSink<Tuple2<Long, Long>>("/tmp/path");
sink.setBucketer(new DateTimeBucketer<Tuple2<Long, Long>>("yyyy-MM-dd/HH/mm/"));
sink.setPendingSuffix(".avro");
Map<String,String> properties = new HashMap<>();
Map<String, String> properties = new HashMap<>();
Schema longSchema = Schema.create(Type.LONG);
String keySchema = longSchema.toString();
String valueSchema = longSchema.toString();
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema);
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema);
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, Boolean.toString(true));
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
sink.setWriter(new AvroSinkWriter<Long, Long>(properties));
sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
}
Expand All @@ -77,37 +78,37 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
private final Map<String, String> properties;

/**
* C'tor for the writer
* <p>
* You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
* C'tor for the writer.
*
* <p>You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
* @param properties
*/
@SuppressWarnings("deprecation")
public AvroKeyValueSinkWriter(Map<String, String> properties) {
this.properties = properties;

String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA);
if (keySchemaString == null) {
throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property");
}
Schema.parse(keySchemaString);//verifying that schema valid
Schema.parse(keySchemaString); //verifying that schema valid

String valueSchemaString = properties.get(CONF_OUTPUT_VALUE_SCHEMA);
if (valueSchemaString == null) {
throw new IllegalStateException("No value schema provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property");
}
Schema.parse(valueSchemaString);//verifying that schema valid
Schema.parse(valueSchemaString); //verifying that schema valid
}

private boolean getBoolean(Map<String,String> conf, String key, boolean def) {
private boolean getBoolean(Map<String, String> conf, String key, boolean def) {
String value = conf.get(key);
if (value == null) {
return def;
}
return Boolean.parseBoolean(value);
}
private int getInt(Map<String,String> conf, String key, int def) {

private int getInt(Map<String, String> conf, String key, int def) {
String value = conf.get(key);
if (value == null) {
return def;
Expand All @@ -116,7 +117,7 @@ private int getInt(Map<String,String> conf, String key, int def) {
}

//this derived from AvroOutputFormatBase.getCompressionCodec(..)
private CodecFactory getCompressionCodec(Map<String,String> conf) {
private CodecFactory getCompressionCodec(Map<String, String> conf) {
if (getBoolean(conf, CONF_COMPRESS, false)) {
int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, CodecFactory.DEFAULT_DEFLATE_LEVEL);
int xzLevel = getInt(conf, CONF_XZ_LEVEL, CodecFactory.DEFAULT_XZ_LEVEL);
Expand Down Expand Up @@ -147,12 +148,12 @@ public void open(FileSystem fs, Path path) throws IOException {

@Override
public void close() throws IOException {
super.close();//the order is important since super.close flushes inside
super.close(); //the order is important since super.close flushes inside
if (keyValueWriter != null) {
keyValueWriter.close();
}
}

@Override
public long flush() throws IOException {
if (keyValueWriter != null) {
Expand Down Expand Up @@ -184,7 +185,7 @@ public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfi
public Writer<Tuple2<K, V>> duplicate() {
return new AvroKeyValueSinkWriter<K, V>(properties);
}

// taken from m/r avro lib to remove dependency on it
private static final class AvroKeyValueWriter<K, V> {
/** A writer for the Avro container file. */
Expand Down Expand Up @@ -245,7 +246,12 @@ long sync() throws IOException {
}
}

// taken from AvroKeyValue avro-mapr lib
/**
* A reusable Avro generic record for writing key/value pairs to the
* file.
*
* <p>taken from AvroKeyValue avro-mapr lib
*/
public static class AvroKeyValue<K, V> {
/** The name of the key value pair generic record. */
public static final String KEY_VALUE_PAIR_RECORD_NAME = "KeyValuePair";
Expand Down Expand Up @@ -293,7 +299,7 @@ public V getValue() {

/**
* Creates a KeyValuePair generic record schema.
*
*
* @return A schema for a generic record with two fields: 'key' and
* 'value'.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.fs;

import org.apache.hadoop.fs.Path;
Expand All @@ -25,8 +26,8 @@
* A bucketer is used with a {@link RollingSink}
* to put emitted elements into rolling files.
*
* <p>
* The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever
*
* <p>The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever
* a new element arrives it will ask the {@code Bucketer} if a new bucket should be started and
* the old one closed. The {@code Bucketer} can, for example, decide to start new buckets
* based on system time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.fs;

package org.apache.flink.streaming.connectors.fs;

/**
* A clock that can provide the current time.
*
* <p>
* Normally this would be system time, but for testing a custom {@code Clock} can be provided.
*
* <p>Normally this would be system time, but for testing a custom {@code Clock} can be provided.
*/
public interface Clock {

/**
* Return the current system time in milliseconds.
*/
public long currentTimeMillis();
long currentTimeMillis();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.fs;

import org.apache.hadoop.fs.Path;
Expand All @@ -29,35 +30,35 @@
/**
* A {@link Bucketer} that assigns to buckets based on current system time.
*
* <p>
* The {@code DateTimeBucketer} will create directories of the following form:
*
* <p>The {@code DateTimeBucketer} will create directories of the following form:
* {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
* that was specified as a base path when creating the
* {@link RollingSink}. The {@code dateTimePath}
* is determined based on the current system time and the user provided format string.
*
* <p>
* {@link SimpleDateFormat} is used to derive a date string from the current system time and
*
* <p>{@link SimpleDateFormat} is used to derive a date string from the current system time and
* the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
* files will have a granularity of hours.
*
*
* <p>
* Example:
*
* <p>Example:
*
* <pre>{@code
* Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
* }</pre>
*
* This will create for example the following bucket path:
* <p>This will create for example the following bucket path:
* {@code /base/1976-12-31-14/}
*
* @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer} instead.
*/
@Deprecated
public class DateTimeBucketer implements Bucketer {

private static Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class);
private static final Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class);

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -95,7 +96,6 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
this.dateFormatter = new SimpleDateFormat(formatString);
}


@Override
public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.fs;

import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;

import org.apache.hadoop.fs.Path;

/**
Expand Down
Loading

0 comments on commit 7292c87

Please sign in to comment.