Skip to content

Commit

Permalink
Revert [FLINK-15670]
Browse files Browse the repository at this point in the history
This reverts commit 78b7c71 ff1695d 9fd02fe e929c3c 03f5d54
  • Loading branch information
rmetzger committed May 18, 2020
1 parent c7f9a8c commit bf58725
Show file tree
Hide file tree
Showing 15 changed files with 33 additions and 2,138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public FlinkKafkaConsumerBase(
* @param properties - Kafka configuration properties to be adjusted
* @param offsetCommitMode offset commit mode
*/
protected static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public abstract class AbstractFetcher<T, KPH> {

/** The lock that guarantees that record emission and state updates are atomic,
* from the view of taking a checkpoint. */
protected final Object checkpointLock;
private final Object checkpointLock;

/** All partitions (and their state) that this fetcher is subscribed to. */
private final List<KafkaTopicPartitionState<T, KPH>> subscribedPartitionStates;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public enum Semantic {
/**
* The name of the default topic this producer is writing data to.
*/
protected final String defaultTopicId;
private final String defaultTopicId;

/**
* (Serializable) SerializationSchema for turning objects used with Flink into.
Expand All @@ -235,7 +235,7 @@ public enum Semantic {
/**
* Partitions of each topic.
*/
protected final Map<String, int[]> topicPartitionsMap;
private final Map<String, int[]> topicPartitionsMap;

/**
* Max number of producers in the pool. If all producers are in use, snapshoting state will throw an exception.
Expand All @@ -250,7 +250,7 @@ public enum Semantic {
/**
* Flag controlling whether we are writing the Flink record's timestamp into Kafka.
*/
protected boolean writeTimestampToKafka = false;
private boolean writeTimestampToKafka = false;

/**
* Flag indicating whether to accept failures (and log them), or to fail on failures.
Expand All @@ -273,7 +273,7 @@ public enum Semantic {
protected transient volatile Exception asyncException;

/** Number of unacknowledged records. */
protected final AtomicLong pendingRecords = new AtomicLong();
private final AtomicLong pendingRecords = new AtomicLong();

/** Cache of metrics to replace already registered metrics instead of overwriting existing ones. */
private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>();
Expand Down Expand Up @@ -1214,7 +1214,7 @@ private FlinkKafkaInternalProducer<byte[], byte[]> initProducer(boolean register
return producer;
}

protected void checkErroneous() throws FlinkKafkaException {
private void checkErroneous() throws FlinkKafkaException {
Exception e = asyncException;
if (e != null) {
// prevent double throwing
Expand Down Expand Up @@ -1256,7 +1256,7 @@ private static Properties getPropertiesFromBrokerList(String brokerList) {
return props;
}

protected static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));

Expand All @@ -1281,7 +1281,7 @@ public int compare(PartitionInfo o1, PartitionInfo o2) {
*/
@VisibleForTesting
@Internal
protected static class KafkaTransactionState {
static class KafkaTransactionState {

private final transient FlinkKafkaInternalProducer<byte[], byte[]> producer;

Expand Down Expand Up @@ -1315,10 +1315,6 @@ boolean isTransactional() {
return transactionalId != null;
}

public FlinkKafkaInternalProducer<byte[], byte[]> getProducer() {
return producer;
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,14 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
/** The schema to convert between Kafka's byte messages, and Flink's objects. */
private final KafkaDeserializationSchema<T> deserializer;

/** A collector to emit records in batch (bundle). **/
private final KafkaCollector kafkaCollector;

/** The handover of data and exceptions between the consumer thread and the task thread. */
final Handover handover;
private final Handover handover;

/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
final KafkaConsumerThread consumerThread;
private final KafkaConsumerThread consumerThread;

/** Flag to mark the main work loop as alive. */
volatile boolean running = true;
private volatile boolean running = true;

// ------------------------------------------------------------------------

Expand Down Expand Up @@ -114,16 +111,19 @@ public KafkaFetcher(
useMetrics,
consumerMetricGroup,
subtaskMetricGroup);
this.kafkaCollector = new KafkaCollector();
}

// ------------------------------------------------------------------------
// Fetcher work methods
// ------------------------------------------------------------------------

private final KafkaCollector kafkaCollector = new KafkaCollector();

@Override
public void runFetchLoop() throws Exception {
try {
final Handover handover = this.handover;

// kick off the actual Kafka consumer
consumerThread.start();

Expand All @@ -138,7 +138,23 @@ public void runFetchLoop() throws Exception {
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());

partitionConsumerRecordsHandler(partitionRecords, partition);
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
deserializer.deserialize(record, kafkaCollector);

// emit the actual records. this also updates offset state atomically and emits
// watermarks
emitRecordsWithTimestamps(
kafkaCollector.getRecords(),
partition,
record.offset(),
record.timestamp());

if (kafkaCollector.isEndOfStreamSignalled()) {
// end of stream signaled
running = false;
break;
}
}
}
}
}
Expand Down Expand Up @@ -173,29 +189,6 @@ protected String getFetcherName() {
return "Kafka Fetcher";
}

protected void partitionConsumerRecordsHandler(
List<ConsumerRecord<byte[], byte[]>> partitionRecords,
KafkaTopicPartitionState<TopicPartition> partition) throws Exception {

for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
deserializer.deserialize(record, kafkaCollector);

// emit the actual records. this also updates offset state atomically and emits
// watermarks
emitRecordsWithTimestamps(
kafkaCollector.getRecords(),
partition,
record.offset(),
record.timestamp());

if (kafkaCollector.isEndOfStreamSignalled()) {
// end of stream signaled
running = false;
break;
}
}
}

// ------------------------------------------------------------------------
// Implement Methods of the AbstractFetcher
// ------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit bf58725

Please sign in to comment.