Skip to content

Commit

Permalink
[FLINK-15670][connector] Adds the consumer for KafkaShuffle.
Browse files Browse the repository at this point in the history
KafkaShuffle provides a transparent Kafka source and sink pair, through which the network traffic of a shuffle step is persisted and redirected.
  • Loading branch information
curcur authored and pnowojski committed May 18, 2020
1 parent 50a3b0f commit 4b866fb
Show file tree
Hide file tree
Showing 5 changed files with 424 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public FlinkKafkaConsumerBase(
* @param properties - Kafka configuration properties to be adjusted
* @param offsetCommitMode offset commit mode
*/
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
protected static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public abstract class AbstractFetcher<T, KPH> {

/** The lock that guarantees that record emission and state updates are atomic,
* from the view of taking a checkpoint. */
private final Object checkpointLock;
protected final Object checkpointLock;

/** All partitions (and their state) that this fetcher is subscribed to. */
private final List<KafkaTopicPartitionState<T, KPH>> subscribedPartitionStates;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,17 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
/** The schema to convert between Kafka's byte messages, and Flink's objects. */
private final KafkaDeserializationSchema<T> deserializer;

/** A collector to emit records in batch (bundle). **/
private final KafkaCollector kafkaCollector;

/** The handover of data and exceptions between the consumer thread and the task thread. */
private final Handover handover;
final Handover handover;

/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
private final KafkaConsumerThread consumerThread;
final KafkaConsumerThread consumerThread;

/** Flag to mark the main work loop as alive. */
private volatile boolean running = true;
volatile boolean running = true;

// ------------------------------------------------------------------------

Expand Down Expand Up @@ -111,19 +114,16 @@ public KafkaFetcher(
useMetrics,
consumerMetricGroup,
subtaskMetricGroup);
this.kafkaCollector = new KafkaCollector();
}

// ------------------------------------------------------------------------
// Fetcher work methods
// ------------------------------------------------------------------------

private final KafkaCollector kafkaCollector = new KafkaCollector();

@Override
public void runFetchLoop() throws Exception {
try {
final Handover handover = this.handover;

// kick off the actual Kafka consumer
consumerThread.start();

Expand All @@ -138,23 +138,7 @@ public void runFetchLoop() throws Exception {
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());

for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
deserializer.deserialize(record, kafkaCollector);

// emit the actual records. this also updates offset state atomically and emits
// watermarks
emitRecordsWithTimestamps(
kafkaCollector.getRecords(),
partition,
record.offset(),
record.timestamp());

if (kafkaCollector.isEndOfStreamSignalled()) {
// end of stream signaled
running = false;
break;
}
}
partitionConsumerRecordsHandler(partitionRecords, partition);
}
}
}
Expand Down Expand Up @@ -189,6 +173,29 @@ protected String getFetcherName() {
return "Kafka Fetcher";
}

protected void partitionConsumerRecordsHandler(
List<ConsumerRecord<byte[], byte[]>> partitionRecords,
KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {

for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
deserializer.deserialize(record, kafkaCollector);

// emit the actual records. this also updates offset state atomically and emits
// watermarks
emitRecordsWithTimestamps(
kafkaCollector.getRecords(),
partition,
record.offset(),
record.timestamp());

if (kafkaCollector.isEndOfStreamSignalled()) {
// end of stream signaled
running = false;
break;
}
}
}

// ------------------------------------------------------------------------
// Implement Methods of the AbstractFetcher
// ------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 4b866fb

Please sign in to comment.