Skip to content

Commit

Permalink
[FLINK-23767][streaming] Rename StreamStatus to WatermarkStatus.
Browse files Browse the repository at this point in the history
git grep -l 'streamStatus' | xargs sed -i '' -e 's/streamStatus/watermarkStatus/g'
git grep -l 'StreamStatus' | xargs sed -i '' -e 's/StreamStatus/WatermarkStatus/g'
git grep -l 'streamstatus' | xargs sed -i '' -e 's/streamstatus/watermarkstatus/g'
git grep -l 'Stream Status' | xargs sed -i '' -e 's/Stream Status/Watermark Status/g'
git grep -l 'Stream status' | xargs sed -i '' -e 's/Stream status/Watermark status/g'
git grep -l 'stream status' | xargs sed -i '' -e 's/stream status/watermark status/g'
Renamed class StreamStatus(Test) -> WatermarkStatus(Test)
Renamed package streamstatus -> watermarkstatus
Reverted Kinesis as it does have a real StreamStatus
mvn spotless:apply
  • Loading branch information
Arvid Heise authored and AHeise committed Aug 16, 2021
1 parent 379f28b commit 474808d
Show file tree
Hide file tree
Showing 70 changed files with 427 additions and 416 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -132,7 +132,7 @@ private CollectorWrapper(Collector<OUT> inner) {
public void emitWatermark(Watermark mark) {}

@Override
public void emitStreamStatus(StreamStatus streamStatus) {}
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;

/** Wraps an existing operator so it can be bootstrapped. */
Expand Down Expand Up @@ -95,8 +95,8 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
}

@Override
public void processStreamStatus(StreamStatus streamStatus) throws Exception {
operator.processStreamStatus(streamStatus);
public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
operator.processWatermarkStatus(watermarkStatus);
}

@Override
Expand Down Expand Up @@ -208,7 +208,7 @@ private static class VoidOutput<T> implements Output<T> {
public void emitWatermark(Watermark mark) {}

@Override
public void emitStreamStatus(StreamStatus streamStatus) {}
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -67,8 +67,8 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
}

@Override
public void processStreamStatus(StreamStatus streamStatus) throws Exception {
owner.processStreamStatus(streamStatus, inputId);
public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
owner.processWatermarkStatus(watermarkStatus, inputId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.LatencyStats;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -619,26 +619,27 @@ public void processWatermark2(Watermark mark) throws Exception {
processWatermark(mark, 1);
}

public void processStreamStatus(StreamStatus streamStatus) throws Exception {
output.emitStreamStatus(streamStatus);
public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
output.emitWatermarkStatus(watermarkStatus);
}

private void processStreamStatus(StreamStatus streamStatus, int index) throws Exception {
private void processWatermarkStatus(WatermarkStatus watermarkStatus, int index)
throws Exception {
boolean wasIdle = combinedWatermark.isIdle();
if (combinedWatermark.updateStatus(index, streamStatus.isIdle())) {
if (combinedWatermark.updateStatus(index, watermarkStatus.isIdle())) {
processWatermark(new Watermark(combinedWatermark.getCombinedWatermark()));
}
if (wasIdle != combinedWatermark.isIdle()) {
output.emitStreamStatus(streamStatus);
output.emitWatermarkStatus(watermarkStatus);
}
}

public final void processStreamStatus1(StreamStatus streamStatus) throws Exception {
processStreamStatus(streamStatus, 0);
public final void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception {
processWatermarkStatus(watermarkStatus, 0);
}

public final void processStreamStatus2(StreamStatus streamStatus) throws Exception {
processStreamStatus(streamStatus, 1);
public final void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception {
processWatermarkStatus(watermarkStatus, 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.LatencyStats;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -511,13 +511,14 @@ protected void reportWatermark(Watermark mark, int inputId) throws Exception {
}
}

public final void processStreamStatus(StreamStatus streamStatus, int inputId) throws Exception {
public final void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId)
throws Exception {
boolean wasIdle = combinedWatermark.isIdle();
if (combinedWatermark.updateStatus(inputId - 1, streamStatus.isIdle())) {
if (combinedWatermark.updateStatus(inputId - 1, watermarkStatus.isIdle())) {
processWatermark(new Watermark(combinedWatermark.getCombinedWatermark()));
}
if (wasIdle != combinedWatermark.isIdle()) {
output.emitStreamStatus(streamStatus);
output.emitWatermarkStatus(watermarkStatus);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;

/** Wrapping {@link Output} that updates metrics on the number of emitted elements. */
Expand All @@ -41,8 +41,8 @@ public void emitWatermark(Watermark mark) {
}

@Override
public void emitStreamStatus(StreamStatus streamStatus) {
output.emitStreamStatus(streamStatus);
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
output.emitWatermarkStatus(watermarkStatus);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

/** {@link Input} interface used in {@link MultipleInputStreamOperator}. */
@PublicEvolving
Expand All @@ -42,13 +42,13 @@ public interface Input<IN> {
void processWatermark(Watermark mark) throws Exception;

/**
* Processes a {@link StreamStatus} that arrived on this input of the {@link
* Processes a {@link WatermarkStatus} that arrived on this input of the {@link
* MultipleInputStreamOperator}. This method is guaranteed to not be called concurrently with
* other methods of the operator.
*
* @see StreamStatus
* @see WatermarkStatus
*/
void processStreamStatus(StreamStatus streamStatus) throws Exception;
void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception;

/**
* Processes a {@link LatencyMarker} that arrived on the first input of this two-input operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

Expand All @@ -45,7 +45,7 @@ public interface Output<T> extends Collector<T> {
*/
void emitWatermark(Watermark mark);

void emitStreamStatus(StreamStatus streamStatus);
void emitWatermarkStatus(WatermarkStatus watermarkStatus);

/**
* Emits a record to the side output identified by the given {@link OutputTag}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.UserCodeClassLoader;
Expand Down Expand Up @@ -473,8 +473,8 @@ public void emitWatermark(Watermark watermark) throws Exception {
}

@Override
public void emitStreamStatus(StreamStatus streamStatus) throws Exception {
output.emitStreamStatus(streamStatus);
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
output.emitWatermarkStatus(watermarkStatus);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -314,11 +314,11 @@ protected void processAndEmitWatermark(Watermark mark) {
}

@Override
protected void processAndEmitStreamStatus(StreamStatus streamStatus) {
if (idle != streamStatus.isIdle()) {
output.emitStreamStatus(streamStatus);
protected void processAndEmitWatermarkStatus(WatermarkStatus watermarkStatus) {
if (idle != watermarkStatus.isIdle()) {
output.emitWatermarkStatus(watermarkStatus);
}
idle = streamStatus.isIdle();
idle = watermarkStatus.isIdle();
}

@Override
Expand Down Expand Up @@ -429,11 +429,11 @@ protected void processAndEmitWatermark(Watermark mark) {
}

@Override
protected void processAndEmitStreamStatus(StreamStatus streamStatus) {
if (idle != streamStatus.isIdle()) {
output.emitStreamStatus(streamStatus);
protected void processAndEmitWatermarkStatus(WatermarkStatus watermarkStatus) {
if (idle != watermarkStatus.isIdle()) {
output.emitWatermarkStatus(watermarkStatus);
}
idle = streamStatus.isIdle();
idle = watermarkStatus.isIdle();
}

@Override
Expand All @@ -447,9 +447,9 @@ protected boolean allowWatermark(Watermark mark) {
* source contexts that are relevant with {@link Watermark}s.
*
* <p>Stream source contexts that are relevant with watermarks are responsible of manipulating
* the current {@link StreamStatus}, so that stream status can be correctly propagated
* downstream. Please refer to the class-level documentation of {@link StreamStatus} for
* information on how stream status affects watermark advancement at downstream tasks.
* the current {@link WatermarkStatus}, so that watermark status can be correctly propagated
* downstream. Please refer to the class-level documentation of {@link WatermarkStatus} for
* information on how watermark status affects watermark advancement at downstream tasks.
*
* <p>This class implements the logic of idleness detection. It fires idleness detection tasks
* at a given interval; if no records or watermarks were collected by the source context between
Expand Down Expand Up @@ -502,7 +502,7 @@ public WatermarkContext(
@Override
public final void collect(T element) {
synchronized (checkpointLock) {
processAndEmitStreamStatus(StreamStatus.ACTIVE);
processAndEmitWatermarkStatus(WatermarkStatus.ACTIVE);

if (nextCheck != null) {
this.failOnNextCheck = false;
Expand All @@ -517,7 +517,7 @@ public final void collect(T element) {
@Override
public final void collectWithTimestamp(T element, long timestamp) {
synchronized (checkpointLock) {
processAndEmitStreamStatus(StreamStatus.ACTIVE);
processAndEmitWatermarkStatus(WatermarkStatus.ACTIVE);

if (nextCheck != null) {
this.failOnNextCheck = false;
Expand All @@ -533,7 +533,7 @@ public final void collectWithTimestamp(T element, long timestamp) {
public final void emitWatermark(Watermark mark) {
if (allowWatermark(mark)) {
synchronized (checkpointLock) {
processAndEmitStreamStatus(StreamStatus.ACTIVE);
processAndEmitWatermarkStatus(WatermarkStatus.ACTIVE);

if (nextCheck != null) {
this.failOnNextCheck = false;
Expand All @@ -549,7 +549,7 @@ public final void emitWatermark(Watermark mark) {
@Override
public final void markAsTemporarilyIdle() {
synchronized (checkpointLock) {
processAndEmitStreamStatus(StreamStatus.IDLE);
processAndEmitWatermarkStatus(WatermarkStatus.IDLE);
}
}

Expand Down Expand Up @@ -620,6 +620,6 @@ protected void cancelNextIdleDetectionTask() {
*/
protected abstract void processAndEmitWatermark(Watermark mark);

protected abstract void processAndEmitStreamStatus(StreamStatus streamStatus);
protected abstract void processAndEmitWatermarkStatus(WatermarkStatus watermarkStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;

/**
Expand Down Expand Up @@ -78,8 +78,8 @@ public void emitWatermark(Watermark mark) {
}

@Override
public void emitStreamStatus(StreamStatus streamStatus) {
output.emitStreamStatus(streamStatus);
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
output.emitWatermarkStatus(watermarkStatus);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

/**
* Interface for stream operators with two inputs. Use {@link
Expand Down Expand Up @@ -82,18 +82,20 @@ public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OU
void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception;

/**
* Processes a {@link StreamStatus} that arrived on the first input of this two-input operator.
* This method is guaranteed to not be called concurrently with other methods of the operator.
* Processes a {@link WatermarkStatus} that arrived on the first input of this two-input
* operator. This method is guaranteed to not be called concurrently with other methods of the
* operator.
*
* @see org.apache.flink.streaming.runtime.streamstatus.StreamStatus
* @see org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus
*/
void processStreamStatus1(StreamStatus streamStatus) throws Exception;
void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception;

/**
* Processes a {@link StreamStatus} that arrived on the second input of this two-input operator.
* This method is guaranteed to not be called concurrently with other methods of the operator.
* Processes a {@link WatermarkStatus} that arrived on the second input of this two-input
* operator. This method is guaranteed to not be called concurrently with other methods of the
* operator.
*
* @see org.apache.flink.streaming.runtime.streamstatus.StreamStatus
* @see org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus
*/
void processStreamStatus2(StreamStatus streamStatus) throws Exception;
void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception;
}
Loading

0 comments on commit 474808d

Please sign in to comment.