Skip to content

Commit

Permalink
[FLINK-8276] [kafka] Properly annotate APIs for Kafka connector
Browse files Browse the repository at this point in the history
This closes apache#5173.
  • Loading branch information
zhangminglei authored and tzulitai committed Jan 12, 2018
1 parent 9b5fce6 commit 4ceabed
Show file tree
Hide file tree
Showing 79 changed files with 177 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* <p>Please refer to Kafka's documentation for the available configuration properties:
* http:https://kafka.apache.org/documentation.html#newconsumerconfigs</p>
*/
@PublicEvolving
public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {

private static final long serialVersionUID = 2324564345203409112L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
Expand All @@ -39,6 +40,7 @@
/**
* Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x
*/
@PublicEvolving
public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
Expand All @@ -35,6 +36,7 @@
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
@PublicEvolving
public class Kafka010AvroTableSource extends KafkaAvroTableSource {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
Expand All @@ -28,6 +29,7 @@
/**
* Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format.
*/
@PublicEvolving
public class Kafka010JsonTableSink extends KafkaJsonTableSink {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
Expand All @@ -32,6 +33,7 @@
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
@PublicEvolving
public class Kafka010JsonTableSource extends KafkaJsonTableSource {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
Expand All @@ -29,6 +30,7 @@
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
@PublicEvolving
public abstract class Kafka010TableSource extends KafkaTableSource {

// The deserialization schema for the Kafka records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka.internal;

import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
Expand All @@ -42,6 +43,7 @@
*
* @param <T> The type of elements produced by the fetcher.
*/
@Internal
public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {

public Kafka010Fetcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.kafka.internal;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;

import java.util.Properties;
Expand All @@ -25,6 +26,7 @@
* A partition discoverer that can be used to discover topics and partitions metadata
* from Kafka brokers via the Kafka 0.10 high-level consumer API.
*/
@Internal
public class Kafka010PartitionDiscoverer extends Kafka09PartitionDiscoverer {

public Kafka010PartitionDiscoverer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.streaming.connectors.kafka.internal;

import org.apache.flink.annotation.Internal;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

Expand All @@ -32,6 +34,7 @@
*
* <p>Because of that, we need two versions whose compiled code goes against different method signatures.
*/
@Internal
public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;

/**
* Error codes used in {@link FlinkKafka011Exception}.
*/
@PublicEvolving
public enum FlinkKafka011ErrorCode {
PRODUCERS_POOL_EMPTY,
EXTERNAL_ERROR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.FlinkException;

/**
* Exception used by {@link FlinkKafkaProducer011} and {@link FlinkKafkaConsumer011}.
*/
@PublicEvolving
public class FlinkKafka011Exception extends FlinkException {

private final FlinkKafka011ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* <p>Please refer to Kafka's documentation for the available configuration properties:
* http:https://kafka.apache.org/documentation.html#newconsumerconfigs</p>
*/
@PublicEvolving
public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {

private static final long serialVersionUID = 2324564345203409112L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
Expand All @@ -35,6 +36,7 @@
/**
* Kafka {@link StreamTableSource} for Kafka 0.11.
*/
@PublicEvolving
public class Kafka011AvroTableSource extends KafkaAvroTableSource {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
Expand All @@ -32,6 +33,7 @@
/**
* Kafka {@link StreamTableSource} for Kafka 0.11.
*/
@PublicEvolving
public class Kafka011JsonTableSource extends KafkaJsonTableSource {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
Expand All @@ -29,6 +30,7 @@
/**
* Kafka {@link StreamTableSource} for Kafka 0.11.
*/
@PublicEvolving
public abstract class Kafka011TableSource extends KafkaTableSource {

// The deserialization schema for the Kafka records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka.internal;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -101,6 +102,7 @@
* required changes via Java Reflection API. It might not be the prettiest solution. An alternative would be to
* re-implement whole Kafka's 0.11 REST API client on our own.
*/
@PublicEvolving
public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.streaming.connectors.kafka.internal;

import org.apache.flink.annotation.Internal;

import java.util.HashSet;
import java.util.Set;

Expand All @@ -34,6 +36,7 @@
* </ul>
* In other words, any particular generated id will always be assigned to one and only one subtask.
*/
@Internal
public class TransactionalIdsGenerator {
private final String prefix;
private final int subtaskIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.flink.streaming.connectors.kafka.internal.metrics;

import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Gauge;

import org.apache.kafka.common.Metric;

/**
* Gauge for getting the current value of a Kafka metric.
*/
@Internal
public class KafkaMetricMuttableWrapper implements Gauge<Double> {
private org.apache.kafka.common.Metric kafkaMetric;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
* <p>When using a Kafka topic to send data between Flink jobs, we recommend using the
* {@see TypeInformationSerializationSchema} and {@see TypeInformationKeyValueSerializationSchema}.</p>
*/
@PublicEvolving
public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {

private static final long serialVersionUID = -6272159445203409112L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
Expand All @@ -36,6 +37,7 @@
*
* @param <IN> Type of the messages to write into Kafka.
*/
@PublicEvolving
public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
Expand All @@ -35,6 +36,7 @@
/**
* Kafka {@link StreamTableSource} for Kafka 0.8.
*/
@PublicEvolving
public class Kafka08AvroTableSource extends KafkaAvroTableSource {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
Expand All @@ -30,6 +31,7 @@
/**
* Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
*/
@PublicEvolving
public class Kafka08JsonTableSink extends KafkaJsonTableSink {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
Expand All @@ -32,6 +33,7 @@
/**
* Kafka {@link StreamTableSource} for Kafka 0.8.
*/
@PublicEvolving
public class Kafka08JsonTableSource extends KafkaJsonTableSource {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
Expand All @@ -29,6 +30,7 @@
/**
* Kafka {@link StreamTableSource} for Kafka 0.8.
*/
@PublicEvolving
public abstract class Kafka08TableSource extends KafkaTableSource {

// The deserialization schema for the Kafka records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka.internals;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
Expand Down Expand Up @@ -57,6 +58,7 @@
*
* @param <T> The type of elements produced by the fetcher.
*/
@Internal
public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {

static final KafkaTopicPartitionState<TopicAndPartition> MARKER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.kafka.internals;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.NetUtils;

import kafka.cluster.Broker;
Expand Down Expand Up @@ -49,6 +50,7 @@
* A partition discoverer that can be used to discover topics and partitions metadata
* from Kafka brokers via the Kafka 0.8 low-level consumer API.
*/
@Internal
public class Kafka08PartitionDiscoverer extends AbstractPartitionDiscoverer {

private static final Logger LOG = LoggerFactory.getLogger(Kafka08PartitionDiscoverer.class);
Expand Down
Loading

0 comments on commit 4ceabed

Please sign in to comment.