Skip to content

Commit

Permalink
[FLINK-23842][coordination] Add logging statements in SourceCoordinat…
Browse files Browse the repository at this point in the history
…ors for reader registration and split requests.

This closes apache#16867
  • Loading branch information
StephanEwen committed Nov 18, 2021
1 parent 57253c5 commit 7a10982
Showing 1 changed file with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
Expand Down Expand Up @@ -163,19 +164,31 @@ public void close() throws Exception {
public void handleEventFromOperator(int subtask, OperatorEvent event) {
runInEventLoop(
() -> {
LOG.debug(
"Handling event from subtask {} of source {}: {}",
subtask,
operatorName,
event);
if (event instanceof RequestSplitEvent) {
LOG.info(
"Source {} received split request from parallel task {}",
operatorName,
subtask);
enumerator.handleSplitRequest(
subtask, ((RequestSplitEvent) event).hostName());
} else if (event instanceof SourceEventWrapper) {
enumerator.handleSourceEvent(
subtask, ((SourceEventWrapper) event).getSourceEvent());
final SourceEvent sourceEvent =
((SourceEventWrapper) event).getSourceEvent();
LOG.debug(
"Source {} received custom event from parallel task {}: {}",
operatorName,
subtask,
sourceEvent);
enumerator.handleSourceEvent(subtask, sourceEvent);
} else if (event instanceof ReaderRegistrationEvent) {
handleReaderRegistrationEvent((ReaderRegistrationEvent) event);
final ReaderRegistrationEvent registrationEvent =
(ReaderRegistrationEvent) event;
LOG.info(
"Source {} registering reader for parallel task {} @ {}",
operatorName,
subtask,
registrationEvent.location());
handleReaderRegistrationEvent(registrationEvent);
} else {
throw new FlinkException("Unrecognized Operator Event: " + event);
}
Expand Down

0 comments on commit 7a10982

Please sign in to comment.