Skip to content

Commit

Permalink
[FLINK-6711] Activate strict checkstyle for flink-connector-kinesis
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed May 27, 2017
1 parent 28e8043 commit b12de1e
Show file tree
Hide file tree
Showing 37 changed files with 330 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -82,24 +83,24 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
// Consumer properties
// ------------------------------------------------------------------------

/** The names of the Kinesis streams that we will be consuming from */
/** The names of the Kinesis streams that we will be consuming from. */
private final List<String> streams;

/** Properties to parametrize settings such as AWS service region, initial position in stream,
* shard list retrieval behaviours, etc */
* shard list retrieval behaviours, etc. */
private final Properties configProps;

/** User supplied deserialization schema to convert Kinesis byte messages to Flink objects */
/** User supplied deserialization schema to convert Kinesis byte messages to Flink objects. */
private final KinesisDeserializationSchema<T> deserializer;

// ------------------------------------------------------------------------
// Runtime state
// ------------------------------------------------------------------------

/** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards */
/** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards. */
private transient KinesisDataFetcher<T> fetcher;

/** The sequence numbers to restore to upon restore from failure */
/** The sequence numbers to restore to upon restore from failure. */
private transient HashMap<StreamShardMetadata, SequenceNumber> sequenceNumsToRestore;

private volatile boolean running = true;
Expand All @@ -108,7 +109,7 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> imple
// State for Checkpoint
// ------------------------------------------------------------------------

/** State name to access shard sequence number states; cannot be changed */
/** State name to access shard sequence number states; cannot be changed. */
private static final String sequenceNumsStateStoreName = "Kinesis-Stream-Shard-State";

private transient ListState<Tuple2<StreamShardMetadata, SequenceNumber>> sequenceNumsStateForCheckpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kinesis;

import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
Expand All @@ -33,6 +26,15 @@
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.PropertiesUtil;

import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,10 +72,8 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
/* Optional custom partitioner */
private KinesisPartitioner<OUT> customPartitioner = null;


// --------------------------- Runtime fields ---------------------------


/* Our Kinesis instance for each parallel Flink sink */
private transient KinesisProducer producer;

Expand All @@ -83,10 +83,8 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
/* Field for async exception */
private transient volatile Throwable thrownException;


// --------------------------- Initialization and configuration ---------------------------


/**
* Create a new FlinkKinesisProducer.
* This is a constructor supporting Flink's {@see SerializationSchema}.
Expand All @@ -104,6 +102,7 @@ public ByteBuffer serialize(OUT element) {
return ByteBuffer.wrap(schema.serialize(element));
}
// use default stream and hash key

@Override
public String getTargetStream(OUT element) {
return null;
Expand Down Expand Up @@ -147,7 +146,7 @@ public void setDefaultStream(String defaultStream) {
}

/**
* Set default partition id
* Set default partition id.
* @param defaultPartition Name of the default partition
*/
public void setDefaultPartition(String defaultPartition) {
Expand All @@ -160,10 +159,8 @@ public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) {
this.customPartitioner = partitioner;
}


// --------------------------- Lifecycle methods ---------------------------


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Expand All @@ -186,7 +183,7 @@ public void open(Configuration parameters) throws Exception {
@Override
public void onSuccess(UserRecordResult result) {
if (!result.isSuccessful()) {
if(failOnError) {
if (failOnError) {
thrownException = new RuntimeException("Record was not sent successful");
} else {
LOG.warn("Record was not sent successful");
Expand Down Expand Up @@ -222,7 +219,7 @@ public void invoke(OUT value) throws Exception {
List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts();
for (Attempt attempt: attempts) {
if (attempt.getErrorMessage() != null) {
errorMessages += attempt.getErrorMessage() +"\n";
errorMessages += attempt.getErrorMessage() + "\n";
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kinesis;

package org.apache.flink.streaming.connectors.kinesis;

import java.io.Serializable;

/**
* An interface for partitioning records.
*
* @param <T> record type
*/
public abstract class KinesisPartitioner<T> implements Serializable {

/**
* Return a partition id based on the input
* Return a partition id based on the input.
* @param element Element to partition
* @return A string representing the partition id
*/
public abstract String getPartitionId(T element);

/**
* Optional method for setting an explicit hash key
* Optional method for setting an explicit hash key.
* @param element Element to get the hash key for
* @return the hash key for the element
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.amazonaws.auth.AWSCredentialsProvider;

/**
* Configuration keys for AWS service usage
* Configuration keys for AWS service usage.
*/
public class AWSConfigConstants {

Expand All @@ -30,41 +30,41 @@ public class AWSConfigConstants {
*/
public enum CredentialProvider {

/** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials */
/** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials. */
ENV_VAR,

/** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials */
/** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials. */
SYS_PROP,

/** Use a AWS credentials profile file to create the AWS credentials */
/** Use a AWS credentials profile file to create the AWS credentials. */
PROFILE,

/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties */
/** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties. */
BASIC,

/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata **/
/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/
AUTO,
}

/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */
/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
public static final String AWS_REGION = "aws.region";

/** The AWS access key ID to use when setting credentials provider type to BASIC */
/** The AWS access key ID to use when setting credentials provider type to BASIC. */
public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid";

/** The AWS secret key to use when setting credentials provider type to BASIC */
/** The AWS secret key to use when setting credentials provider type to BASIC. */
public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey";

/** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/
/** The credential provider type to use when AWS credentials are required (BASIC is used if not set). */
public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";

/** Optional configuration for profile path if credential provider type is set to be PROFILE */
/** Optional configuration for profile path if credential provider type is set to be PROFILE. */
public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path";

/** Optional configuration for profile name if credential provider type is set to be PROFILE */
/** Optional configuration for profile name if credential provider type is set to be PROFILE. */
public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name";

/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set) */
/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
public static final String AWS_ENDPOINT = "aws.endpoint";

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.flink.streaming.connectors.kinesis.config;

import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;

import com.amazonaws.services.kinesis.model.ShardIteratorType;

/**
* Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer}
* Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer}.
*/
public class ConsumerConfigConstants extends AWSConfigConstants {

Expand All @@ -33,13 +34,13 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
*/
public enum InitialPosition {

/** Start reading from the earliest possible record in the stream (excluding expired data records) */
/** Start reading from the earliest possible record in the stream (excluding expired data records). */
TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM),

/** Start reading from the latest incoming record */
/** Start reading from the latest incoming record. */
LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM),

/** Start reading from the record at the specified timestamp */
/** Start reading from the record at the specified timestamp. */
AT_TIMESTAMP(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM);

private SentinelSequenceNumber sentinelSequenceNumber;
Expand All @@ -53,55 +54,55 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
}
}

/** The initial position to start reading Kinesis streams from (LATEST is used if not set) */
/** The initial position to start reading Kinesis streams from (LATEST is used if not set). */
public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos";

/** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */
/** The initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_INITIAL_TIMESTAMP = "flink.stream.initpos.timestamp";

/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION) */
/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";

/** The base backoff time between each describeStream attempt */
/** The base backoff time between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";

/** The maximum backoff time between each describeStream attempt */
/** The maximum backoff time between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";

/** The power constant for exponential backoff between each describeStream attempt */
/** The power constant for exponential backoff between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";

/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard */
/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";

/** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException */
/** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException. */
public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";

/** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
/** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base";

/** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */
/** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max";

/** The power constant for exponential backoff between each getRecords attempt */
/** The power constant for exponential backoff between each getRecords attempt. */
public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst";

/** The interval between each getRecords request to a AWS Kinesis shard in milliseconds */
/** The interval between each getRecords request to a AWS Kinesis shard in milliseconds. */
public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";

/** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException */
/** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException. */
public static final String SHARD_GETITERATOR_RETRIES = "flink.shard.getiterator.maxretries";

/** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
/** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException. */
public static final String SHARD_GETITERATOR_BACKOFF_BASE = "flink.shard.getiterator.backoff.base";

/** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */
/** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException. */
public static final String SHARD_GETITERATOR_BACKOFF_MAX = "flink.shard.getiterator.backoff.max";

/** The power constant for exponential backoff between each getShardIterator attempt */
/** The power constant for exponential backoff between each getShardIterator attempt. */
public static final String SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getiterator.backoff.expconst";

/** The interval between each attempt to discover new shards */
/** The interval between each attempt to discover new shards. */
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis";

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

/**
* Optional producer specific configuration keys for {@link FlinkKinesisProducer}
* Optional producer specific configuration keys for {@link FlinkKinesisProducer}.
*/
public class ProducerConfigConstants extends AWSConfigConstants {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kinesis.examples;

import org.apache.flink.api.java.utils.ParameterTool;
Expand All @@ -26,7 +27,7 @@
import java.util.Properties;

/**
* This is an example on how to consume data from Kinesis
* This is an example on how to consume data from Kinesis.
*/
public class ConsumeFromKinesis {

Expand Down
Loading

0 comments on commit b12de1e

Please sign in to comment.