Skip to content

Commit

Permalink
[FLINK-23704][task] FLIP-27 sources are not generating LatencyMarkers
Browse files Browse the repository at this point in the history
  • Loading branch information
SteNicholas authored and pnowojski committed Oct 15, 2021
1 parent 9993f45 commit 223d688
Show file tree
Hide file tree
Showing 4 changed files with 524 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
Expand All @@ -49,6 +51,7 @@
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
Expand All @@ -57,11 +60,15 @@
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.FunctionWithException;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Base source operator only used for integrating the source reader which is proposed by FLIP-27. It
Expand Down Expand Up @@ -149,6 +156,8 @@ private enum OperatingMode {

private InternalSourceReaderMetricGroup sourceMetricGroup;

private LatencyMarkerEmitter<OUT> latencyMarerEmitter;

public SourceOperator(
FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception>
readerFactory,
Expand Down Expand Up @@ -282,6 +291,19 @@ public void open() throws Exception {
watermarkStrategy, sourceMetricGroup);
}

latencyMarerEmitter =
new LatencyMarkerEmitter<>(
getProcessingTimeService(),
getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
: getContainingTask()
.getEnvironment()
.getTaskManagerInfo()
.getConfiguration()
.getLong(MetricOptions.LATENCY_INTERVAL),
getOperatorID(),
getRuntimeContext().getIndexOfThisSubtask());

// restore the state if necessary.
final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());
if (!splits.isEmpty()) {
Expand All @@ -296,13 +318,17 @@ public void open() throws Exception {
sourceReader.start();

eventTimeLogic.startPeriodicWatermarkEmits();
latencyMarerEmitter.startLatencyMarkerEmit();
}

@Override
public void finish() throws Exception {
if (eventTimeLogic != null) {
eventTimeLogic.stopPeriodicWatermarkEmits();
}
if (latencyMarerEmitter != null) {
latencyMarerEmitter.stopLatencyMarkerEmit();
}
super.finish();

finished.complete(null);
Expand Down Expand Up @@ -348,6 +374,7 @@ private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws Except
switch (operatingMode) {
case OUTPUT_NOT_INITIALIZED:
currentMainOutput = eventTimeLogic.createMainOutput(output);
latencyMarerEmitter.emitMainOutput(output);
lastInvokedOutput = output;
this.operatingMode = OperatingMode.READING;
return convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
Expand Down Expand Up @@ -478,4 +505,79 @@ public void forceStop() {
this.forcedStopFuture.complete(null);
}
}

private static class LatencyMarkerEmitter<OUT> {

private final ProcessingTimeService timeService;

private final long latencyTrackingInterval;

private final OperatorID operatorId;

private final int subtaskIndex;

@Nullable private DataOutput<OUT> currentMainOutput;

@Nullable private ScheduledFuture<?> latencyMarkerTimer;

public LatencyMarkerEmitter(
final ProcessingTimeService timeService,
long latencyTrackingInterval,
final OperatorID operatorId,
final int subtaskIndex) {
this.timeService = timeService;
this.latencyTrackingInterval = latencyTrackingInterval;
this.operatorId = operatorId;
this.subtaskIndex = subtaskIndex;
}

// ------------------------------------------------------------------------

public void emitMainOutput(PushingAsyncDataInput.DataOutput<OUT> output) {
// At the moment, we assume only one output is ever created!
// This assumption is strict, currently, because many of the classes in this
// implementation
// do not support re-assigning the underlying output
checkState(currentMainOutput == null, "Main output has already been set.");
currentMainOutput = output;
}

public void startLatencyMarkerEmit() {
checkState(
latencyMarkerTimer == null, "Latency marker emitter has already been started");
if (latencyTrackingInterval == 0) {
// a value of zero means not activated
return;
}
latencyMarkerTimer =
timeService.scheduleWithFixedDelay(
this::triggerLatencyMarkerEmit, 0L, latencyTrackingInterval);
}

public void stopLatencyMarkerEmit() {
if (latencyMarkerTimer != null) {
latencyMarkerTimer.cancel(false);
latencyMarkerTimer = null;
}
}

void triggerLatencyMarkerEmit(@SuppressWarnings("unused") long wallClockTimestamp) {
if (currentMainOutput != null) {
try {
// ProcessingTimeService callbacks are executed under the
// checkpointing lock
currentMainOutput.emitLatencyMarker(
new LatencyMarker(
timeService.getCurrentProcessingTime(),
operatorId,
subtaskIndex));
} catch (Throwable t) {
// we catch the Throwable here so that we don't trigger the
// processing
// timer services async exception handler
LOG.warn("Error while emitting latency marker.", t);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,30 @@

package org.apache.flink.streaming.api.operators.source;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;

import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

Expand Down Expand Up @@ -243,6 +251,19 @@ private static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
final SourceOperator<T, MockSourceSplit> sourceOperator =
new TestingSourceOperator<>(
reader, watermarkStrategy, timeService, emitProgressiveWatermarks);

sourceOperator.setup(
new SourceOperatorStreamTask<Integer>(
new StreamMockEnvironment(
new Configuration(),
new Configuration(),
new ExecutionConfig(),
1L,
new MockInputSplitProvider(),
1,
new TestTaskStateManager())),
new MockStreamConfig(new Configuration(), 1),
new MockOutput<>(new ArrayList<>()));
sourceOperator.initializeState(stateContext);
sourceOperator.open();

Expand Down
Loading

0 comments on commit 223d688

Please sign in to comment.