collector) throws Exception;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index f6d3d319ea3b2..232206c29a36e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -41,6 +41,7 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.types.Value;
+import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.List;
@@ -48,88 +49,55 @@
/**
* Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
*
*
- * State related apis in {@link RuntimeContext} are not supported yet because the key may get changed
- * while accessing states in the working thread.
+ * State related apis in {@link RuntimeContext} are not supported yet because the key may get
+ * changed while accessing states in the working thread.
*
- * {@link IterationRuntimeContext#getIterationAggregator(String)} is not supported since the aggregator
- * may be modified by multiple threads.
+ * {@link IterationRuntimeContext#getIterationAggregator(String)} is not supported since the
+ * aggregator may be modified by multiple threads.
*
* @param The type of the input elements.
* @param The type of the returned elements.
*/
-
@PublicEvolving
-public abstract class RichAsyncFunction extends AbstractRichFunction
- implements AsyncFunction {
+public abstract class RichAsyncFunction extends AbstractRichFunction implements AsyncFunction {
- private transient RuntimeContext runtimeContext;
+ private static final long serialVersionUID = 3858030061138121840L;
@Override
- public void setRuntimeContext(RuntimeContext t) {
- super.setRuntimeContext(t);
+ public void setRuntimeContext(RuntimeContext runtimeContext) {
+ Preconditions.checkNotNull(runtimeContext);
- if (t != null) {
- runtimeContext = new RichAsyncFunctionRuntimeContext(t);
+ if (runtimeContext instanceof IterationRuntimeContext) {
+ super.setRuntimeContext(
+ new RichAsyncFunctionIterationRuntimeContext(
+ (IterationRuntimeContext) runtimeContext));
+ } else {
+ super.setRuntimeContext(new RichAsyncFunctionRuntimeContext(runtimeContext));
}
}
@Override
public abstract void asyncInvoke(IN input, AsyncCollector collector) throws Exception;
- @Override
- public RuntimeContext getRuntimeContext() {
- if (this.runtimeContext != null) {
- return runtimeContext;
- } else {
- throw new IllegalStateException("The runtime context has not been initialized.");
- }
- }
-
- @Override
- public IterationRuntimeContext getIterationRuntimeContext() {
- if (this.runtimeContext != null) {
- return (IterationRuntimeContext) runtimeContext;
- } else {
- throw new IllegalStateException("The runtime context has not been initialized.");
- }
- }
+ // -----------------------------------------------------------------------------------------
+ // Wrapper classes
+ // -----------------------------------------------------------------------------------------
/**
- * A wrapper class to delegate {@link RuntimeContext}. State related apis are disabled.
+ * A wrapper class for async function's {@link RuntimeContext}. The async function runtime
+ * context only supports basic operations which are thread safe. Consequently, state access,
+ * accumulators, broadcast variables and the distributed cache are disabled.
*/
- private class RichAsyncFunctionRuntimeContext implements IterationRuntimeContext {
- private RuntimeContext runtimeContext;
-
- public RichAsyncFunctionRuntimeContext(RuntimeContext context) {
- runtimeContext = context;
- }
-
- private IterationRuntimeContext getIterationRuntineContext() {
- if (this.runtimeContext instanceof IterationRuntimeContext) {
- return (IterationRuntimeContext) this.runtimeContext;
- } else {
- throw new IllegalStateException("This stub is not part of an iteration step function.");
- }
- }
-
- @Override
- public int getSuperstepNumber() {
- return getIterationRuntineContext().getSuperstepNumber();
- }
-
- @Override
- public > T getIterationAggregator(String name) {
- throw new UnsupportedOperationException("Get iteration aggregator is not supported in rich async function");
- }
+ private static class RichAsyncFunctionRuntimeContext implements RuntimeContext {
+ private final RuntimeContext runtimeContext;
- @Override
- public T getPreviousIterationAggregate(String name) {
- return getIterationRuntineContext().getPreviousIterationAggregate(name);
+ RichAsyncFunctionRuntimeContext(RuntimeContext context) {
+ runtimeContext = Preconditions.checkNotNull(context);
}
@Override
@@ -172,74 +140,108 @@ public ClassLoader getUserCodeClassLoader() {
return runtimeContext.getUserCodeClassLoader();
}
+ // -----------------------------------------------------------------------------------
+ // Unsupported operations
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public DistributedCache getDistributedCache() {
+ throw new UnsupportedOperationException("Distributed cache is not supported in rich async functions.");
+ }
+
+ @Override
+ public ValueState getState(ValueStateDescriptor stateProperties) {
+ throw new UnsupportedOperationException("State is not supported in rich async functions.");
+ }
+
+ @Override
+ public ListState getListState(ListStateDescriptor stateProperties) {
+ throw new UnsupportedOperationException("State is not supported in rich async functions.");
+ }
+
+ @Override
+ public ReducingState getReducingState(ReducingStateDescriptor stateProperties) {
+ throw new UnsupportedOperationException("State is not supported in rich async functions.");
+ }
+
@Override
public void addAccumulator(String name, Accumulator accumulator) {
- runtimeContext.addAccumulator(name, accumulator);
+ throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
}
@Override
public Accumulator getAccumulator(String name) {
- return runtimeContext.getAccumulator(name);
+ throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
}
@Override
public Map> getAllAccumulators() {
- return runtimeContext.getAllAccumulators();
+ throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
}
@Override
public IntCounter getIntCounter(String name) {
- return runtimeContext.getIntCounter(name);
+ throw new UnsupportedOperationException("Int counters are not supported in rich async functions.");
}
@Override
public LongCounter getLongCounter(String name) {
- return runtimeContext.getLongCounter(name);
+ throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
}
@Override
public DoubleCounter getDoubleCounter(String name) {
- return runtimeContext.getDoubleCounter(name);
+ throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
}
@Override
public Histogram getHistogram(String name) {
- return runtimeContext.getHistogram(name);
+ throw new UnsupportedOperationException("Histograms are not supported in rich async functions.");
}
@Override
public boolean hasBroadcastVariable(String name) {
- return runtimeContext.hasBroadcastVariable(name);
+ throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
}
@Override
public List getBroadcastVariable(String name) {
- return runtimeContext.getBroadcastVariable(name);
+ throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
}
@Override
public C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer initializer) {
- return runtimeContext.getBroadcastVariableWithInitializer(name, initializer);
+ throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
}
+ }
- @Override
- public DistributedCache getDistributedCache() {
- return runtimeContext.getDistributedCache();
+ private static class RichAsyncFunctionIterationRuntimeContext extends RichAsyncFunctionRuntimeContext implements IterationRuntimeContext {
+
+ private final IterationRuntimeContext iterationRuntimeContext;
+
+ RichAsyncFunctionIterationRuntimeContext(IterationRuntimeContext iterationRuntimeContext) {
+ super(iterationRuntimeContext);
+
+ this.iterationRuntimeContext = Preconditions.checkNotNull(iterationRuntimeContext);
}
@Override
- public ValueState getState(ValueStateDescriptor stateProperties) {
- throw new UnsupportedOperationException("State is not supported in rich async function");
+ public int getSuperstepNumber() {
+ return iterationRuntimeContext.getSuperstepNumber();
}
+ // -----------------------------------------------------------------------------------
+ // Unsupported operations
+ // -----------------------------------------------------------------------------------
+
@Override
- public ListState getListState(ListStateDescriptor stateProperties) {
- throw new UnsupportedOperationException("State is not supported in rich async function");
+ public > T getIterationAggregator(String name) {
+ throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
}
@Override
- public ReducingState getReducingState(ReducingStateDescriptor stateProperties) {
- throw new UnsupportedOperationException("State is not supported in rich async function");
+ public T getPreviousIterationAggregate(String name) {
+ throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
}
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AbstractBufferEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AbstractBufferEntry.java
deleted file mode 100644
index 29643fd204f20..0000000000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AbstractBufferEntry.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.async.buffer;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Abstract implementation for {@link StreamElementEntry}
- *
- * @param Output type.
- */
-public abstract class AbstractBufferEntry implements StreamElementEntry {
- private final StreamElement streamElement;
-
- protected AbstractBufferEntry(StreamElement element) {
- this.streamElement = Preconditions.checkNotNull(element, "Reference to StreamElement should not be null");
- }
-
- @Override
- public List getResult() throws IOException {
- throw new UnsupportedOperationException("It is only available for StreamRecordEntry");
- }
-
- @Override
- public void markDone() {
- throw new UnsupportedOperationException("It is only available for StreamRecordEntry");
- }
-
- @Override
- public boolean isDone() {
- throw new UnsupportedOperationException("It must be overriden by the concrete entry");
- }
-
- @Override
- public boolean isStreamRecord() {
- return streamElement.isRecord();
- }
-
- @Override
- public boolean isWatermark() {
- return streamElement.isWatermark();
- }
-
- @Override
- public boolean isLatencyMarker() {
- return streamElement.isLatencyMarker();
- }
-
- @Override
- public StreamElement getStreamElement() {
- return streamElement;
- }
-
- @Override
- public String toString() {
- return "StreamElementEntry for @" + streamElement;
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AsyncCollectorBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AsyncCollectorBuffer.java
deleted file mode 100644
index ee176d91a855a..0000000000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AsyncCollectorBuffer.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.async.buffer;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.streaming.api.datastream.AsyncDataStream;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-/**
- * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer,
- * and emit results from {@link AsyncCollector} to the next operators following it by
- * calling {@link Output#collect(Object)}
- */
-@Internal
-public class AsyncCollectorBuffer {
-
- /**
- * The logger.
- */
- private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class);
-
- /**
- * Max number of {@link AsyncCollector} in the buffer.
- */
- private final int bufferSize;
-
- private final AsyncDataStream.OutputMode mode;
-
- private final AsyncWaitOperator operator;
-
- /**
- * Keep all {@link StreamElementEntry}
- */
- private final Set> queue = new LinkedHashSet<>();
-
- /**
- * Keep all {@link StreamElementEntry} to their corresponding {@link Watermark} or {@link LatencyMarker}
- * If the inputs are: SR1, SR2, WM1, SR3, SR4. Then SR1 and SR2 belong to WM1, and
- * SR3 and SR4 will be kept in {@link #lonelyEntries}
- */
- private final Map, StreamElement> entriesToMarkers = new HashMap<>();
-
- private final List> lonelyEntries = new LinkedList<>();
-
- /**
- * Keep finished AsyncCollector belonging to the oldest Watermark or LatencyMarker in UNORDERED mode.
- */
- private final Map>> markerToFinishedEntries = new LinkedHashMap<>();
- private Set>lonelyFinishedEntries = new HashSet<>();
-
- /**
- * For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the
- * {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue}
- * is full since main thread waits on this lock. The StreamElement in
- * {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements
- * in its queue. It will be kept in the operator state while snapshotting.
- */
- private StreamElement extraStreamElement;
-
- /**
- * {@link TimestampedCollector} and {@link Output} to collect results and watermarks.
- */
- private final Output> output;
- private final TimestampedCollector timestampedCollector;
-
- /**
- * Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
- */
- private final Object lock;
-
- private final Emitter emitter;
- private final Thread emitThread;
-
- /**
- * Exception from async operation or internal error
- */
- private Exception error;
-
- /**
- * Flag telling Emitter thread to work or not.
- */
- private volatile boolean workwork = false;
-
- public AsyncCollectorBuffer(
- int bufferSize,
- AsyncDataStream.OutputMode mode,
- Output> output,
- TimestampedCollector collector,
- Object lock,
- AsyncWaitOperator operator) {
- Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0.");
-
- this.bufferSize = bufferSize;
-
- this.mode = Preconditions.checkNotNull(mode, "Processing mode should not be NULL.");
- this.output = Preconditions.checkNotNull(output, "Output should not be NULL.");
- this.timestampedCollector = Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL.");
- this.operator = Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL.");
- this.lock = Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL.");
-
- this.emitter = new Emitter();
- this.emitThread = new Thread(emitter);
- this.emitThread.setDaemon(true);
- }
-
- /**
- * Add an {@link StreamRecord} into the buffer. A new {@link AsyncCollector} will be created and returned
- * corresponding to the input StreamRecord.
- *
- * If buffer is full, caller will wait until a new space is available.
- *
- * @param record StreamRecord
- * @return An AsyncCollector
- * @throws Exception Exception from AsyncCollector.
- */
- public AsyncCollector addStreamRecord(StreamRecord record) throws Exception {
- assert(Thread.holdsLock(lock));
-
- while (queue.size() >= bufferSize) {
- // hold the input StreamRecord until it is placed in the buffer
- extraStreamElement = record;
-
- lock.wait();
- }
-
- if (error != null) {
- throw error;
- }
-
- StreamElementEntry entry = new StreamRecordEntry<>(record, this);
-
- queue.add(entry);
-
- if (mode == AsyncDataStream.OutputMode.UNORDERED) {
- lonelyEntries.add(entry);
- }
-
- extraStreamElement = null;
-
- return (AsyncCollector)entry;
- }
-
- /**
- * Add a {@link Watermark} into buffer.
- *
- * If queue is full, caller will wait here.
- *
- * @param watermark Watermark
- * @throws Exception Exception from AsyncCollector.
- */
- public void addWatermark(Watermark watermark) throws Exception {
- processMark(new WatermarkEntry(watermark));
- }
-
- /**
- * Add a {@link LatencyMarker} into buffer.
- *
- * If queue is full, caller will wait here.
- *
- * @param latencyMarker LatencyMarker
- * @throws Exception Exception from AsyncCollector.
- */
- public void addLatencyMarker(LatencyMarker latencyMarker) throws Exception {
- processMark(new LatencyMarkerEntry(latencyMarker));
- }
-
- /**
- * Notify the emitter thread and main thread that an AsyncCollector has completed.
- *
- * @param entry Completed AsyncCollector
- */
- public void markCollectorCompleted(StreamElementEntry entry) {
- synchronized (lock) {
- entry.markDone();
-
- if (mode == AsyncDataStream.OutputMode.UNORDERED) {
- StreamElement marker = entriesToMarkers.get(entry);
-
- if (marker != null) {
- markerToFinishedEntries.get(marker).add(entry);
- }
- else {
- lonelyFinishedEntries.add(entry);
- }
- }
-
- // if workwork is true, it is not necessary to check it again
- if (!workwork && shouldNotifyEmitterThread(entry)) {
- workwork = true;
-
- lock.notifyAll();
- }
- }
- }
-
- /**
- * Caller will wait here if buffer is not empty, meaning that not all async i/o tasks have returned yet.
- *
- * @throws Exception IOException from AsyncCollector.
- */
- public void waitEmpty() throws Exception {
- assert(Thread.holdsLock(lock));
-
- while (queue.size() != 0) {
- if (error != null) {
- throw error;
- }
-
- lock.wait();
- }
- }
-
- public void startEmitterThread() {
- emitThread.start();
- }
-
- public void stopEmitterThread() {
- emitter.stop();
-
- emitThread.interrupt();
-
- while (emitThread.isAlive()) {
- // temporarily release the lock first, since caller of this method may also hold the lock.
- if (Thread.holdsLock(lock)) {
- try {
- lock.wait(1000);
- }
- catch (InterruptedException e) {
- // do nothing
- }
- }
-
- try {
- emitThread.join(10000);
- } catch (InterruptedException e) {
- // do nothing
- }
-
- // get the stack trace
- StringBuilder sb = new StringBuilder();
- StackTraceElement[] stack = emitThread.getStackTrace();
-
- for (StackTraceElement e : stack) {
- sb.append(e).append('\n');
- }
-
- LOG.warn("Emitter thread blocks due to {}", sb.toString());
-
- emitThread.interrupt();
- }
- }
-
- /**
- * Get all StreamElements in the AsyncCollector queue.
- *
- * Emitter Thread can not output records and will wait for a while due to checkpoiting procedure
- * holding the checkpoint lock.
- *
- * @return An {@link Iterator} to the StreamElements in the buffer, including the extra one.
- */
- public Iterator getStreamElementsInBuffer() {
- final Iterator> iterator = queue.iterator();
- final StreamElement extra = extraStreamElement;
-
- return new Iterator() {
- boolean shouldSendExtraElement = (extra != null);
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext() || shouldSendExtraElement;
- }
-
- @Override
- public StreamElement next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- if (iterator.hasNext()) {
- return iterator.next().getStreamElement();
- }
- else {
- shouldSendExtraElement = false;
-
- return extra;
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("remove");
- }
- };
- }
-
- private void processMark(StreamElementEntry entry) throws Exception {
- assert(Thread.holdsLock(lock));
-
- StreamElement mark = entry.getStreamElement();
-
- while (queue.size() >= bufferSize) {
- // hold the input StreamRecord until it is placed in the buffer
- extraStreamElement = mark;
-
- lock.wait();
- }
-
- if (error != null) {
- throw error;
- }
-
- queue.add(entry);
-
- if (mode == AsyncDataStream.OutputMode.UNORDERED) {
- // update AsyncCollector to Watermark / LatencyMarker map
- for (StreamElementEntry e : lonelyEntries) {
- entriesToMarkers.put(e, mark);
- }
-
- lonelyEntries.clear();
-
- // update Watermark / LatencyMarker to finished AsyncCollector map
- markerToFinishedEntries.put(mark, lonelyFinishedEntries);
-
- lonelyFinishedEntries = new HashSet<>();
- }
-
- extraStreamElement = null;
-
- // notify Emitter thread if the head of buffer is Watermark or LatencyMarker
- // this is for the case when LatencyMarkers keep coming but there is no StreamRecords.
- StreamElementEntry element = queue.iterator().next();
-
- if (element.isLatencyMarker() || element.isWatermark()) {
- workwork = true;
-
- lock.notifyAll();
- }
- }
-
- private boolean shouldNotifyEmitterThread(StreamElementEntry entry) {
-
- switch (mode) {
-
- case ORDERED:
- Iterator> queueIterator = queue.iterator();
-
- // get to work as long as the first AsyncCollect is done.
- return queueIterator.hasNext() && (queueIterator.next().isDone());
-
- case UNORDERED:
- Iterator>>> iteratorMarker =
- markerToFinishedEntries.entrySet().iterator();
-
- // get to work only the finished AsyncCollector belongs to the oldest Watermark or LatencyMarker
- // or no Watermark / LatencyMarker is in the buffer yet.
- return iteratorMarker.hasNext() ? iteratorMarker.next().getValue().contains(entry)
- : lonelyFinishedEntries.contains(entry);
-
- default:
- // this case should never happen
- return false;
- }
- }
-
- @VisibleForTesting
- public Set> getQueue() {
- return queue;
- }
-
- @VisibleForTesting
- public void setExtraStreamElement(StreamElement element) {
- extraStreamElement = element;
- }
-
- /**
- * A working thread to output results from {@link AsyncCollector} to the next operator.
- */
- private class Emitter implements Runnable {
- private volatile boolean running = true;
-
- private void output(StreamElementEntry entry) throws Exception {
-
- StreamElement element = entry.getStreamElement();
-
- if (element == null) {
- throw new Exception("StreamElement in the buffer entry should not be null");
- }
-
- if (entry.isStreamRecord()) {
- List result = entry.getResult();
-
- if (result == null) {
- throw new Exception("Result for stream record " + element + " is null");
- }
-
- // update the timestamp for the collector
- timestampedCollector.setTimestamp(element.asRecord());
-
- for (OUT val : result) {
- timestampedCollector.collect(val);
- }
- }
- else if (entry.isWatermark()) {
- output.emitWatermark(element.asWatermark());
- }
- else if (entry.isLatencyMarker()) {
- operator.sendLatencyMarker(element.asLatencyMarker());
- }
- else {
- throw new IOException("Unknown input record: " + element);
- }
- }
-
- /**
- * Emit results from the finished head collector and its following finished ones.
- *
- * NOTE: Since {@link #output(StreamElementEntry)} may be blocked if operator chain chained with
- * another {@link AsyncWaitOperator} and its buffer is full, we can not use an {@link Iterator} to
- * go through {@link #queue} because ConcurrentModificationException may be thrown while we remove
- * element in the queue by calling {@link Iterator#remove()}.
- *
- *
Example: Assume operator chain like this: async-wait-operator1(awo1) -> async-wait-operator2(awo2).
- * The buffer for awo1 is full so the main thread is blocked there.
- * The {@link Emitter} thread, named emitter1, in awo1 is outputting
- * data to awo2. Assume that 2 elements have been emitted and the buffer in awo1 has two vacancies. While
- * outputting the third one, the buffer in awo2 is full, so emitter1 will wait for a moment. If we use
- * {@link Iterator}, it is just before calling {@link Iterator#remove()}. Once the {@link #lock} is released
- * and luckily enough, the main thread get the lock. It will modify {@link #queue}, causing
- * ConcurrentModificationException once emitter1 runs to {@link Iterator#remove()}.
- *
- */
- private void orderedProcess() throws Exception {
- StreamElementEntry entry;
-
- while (queue.size() > 0 && (entry = queue.iterator().next()).isDone()) {
- output(entry);
-
- queue.remove(entry);
- }
- }
-
- /**
- * Emit results for each finished collector. Try to emit results prior to the oldest watermark
- * in the buffer.
- *
- * For example, assume the sequence of input StreamElements is:
- * Entry(ac1, record1) -> Entry(ac2, record2) -> Entry(ac3, watermark1) -> Entry(ac4, record3).
- * and both of ac2 and ac3 have finished. For unordered-mode, ac1 and ac2 are prior to watermark1,
- * so ac2 will be emitted. Since ac1 is not ready yet, ac3 have to wait until ac1 is done.
- */
- private void unorderedProcess() throws Exception {
- // try to emit finished AsyncCollectors in markerToFinishedEntries
- if (markerToFinishedEntries.size() != 0) {
- while (markerToFinishedEntries.size() != 0) {
- Map.Entry>> finishedStreamElementEntry =
- markerToFinishedEntries.entrySet().iterator().next();
-
- Set> finishedElementSet = finishedStreamElementEntry.getValue();
-
- // While outputting results to the next operator, output may release lock if the following operator
- // in the chain is another AsyncWaitOperator. During this period, there may be some
- // finished StreamElementEntry coming into the finishedElementSet, and we should
- // output all finished elements after re-acquiring the lock.
- while (finishedElementSet.size() != 0) {
- StreamElementEntry finishedEntry = finishedElementSet.iterator().next();
-
- output(finishedEntry);
-
- queue.remove(finishedEntry);
-
- entriesToMarkers.remove(finishedEntry);
-
- finishedElementSet.remove(finishedEntry);
- }
-
- finishedStreamElementEntry.getValue().clear();
-
-
- // if all StreamElements belonging to current Watermark / LatencyMarker have been emitted,
- // emit current Watermark / LatencyMarker
-
- if (queue.size() == 0) {
- if (markerToFinishedEntries.size() != 0 || entriesToMarkers.size() != 0
- || lonelyEntries.size() != 0 || lonelyFinishedEntries.size() != 0) {
- throw new IOException("Inner data info is not consistent.");
- }
- }
- else {
- // check the head AsyncCollector whether it is a Watermark or LatencyMarker.
- StreamElementEntry queueEntry = queue.iterator().next();
-
- if (!queueEntry.isStreamRecord()) {
- if (finishedStreamElementEntry.getKey() != queueEntry.getStreamElement()) {
- throw new IOException("Watermark / LatencyMarker from finished collector map "
- + "and input buffer are not the same.");
- }
-
- output(queueEntry);
-
- queue.remove(queueEntry);
-
- // remove useless data in markerToFinishedEntries
- markerToFinishedEntries.remove(finishedStreamElementEntry.getKey());
- }
- else {
- break;
- }
- }
- }
- }
-
- if (markerToFinishedEntries.size() == 0) {
- // health check
- if (entriesToMarkers.size() != 0) {
- throw new IOException("Entries to marker map should be zero");
- }
-
- // no Watermark or LatencyMarker in the buffer yet, emit results in lonelyFinishedEntries
- while (lonelyFinishedEntries.size() != 0) {
- StreamElementEntry entry = lonelyFinishedEntries.iterator().next();
-
- output(entry);
-
- queue.remove(entry);
-
- lonelyEntries.remove(entry);
-
- lonelyFinishedEntries.remove(entry);
- }
- }
- }
-
- private void processFinishedAsyncCollector() throws Exception {
- if (mode == AsyncDataStream.OutputMode.ORDERED) {
- orderedProcess();
- } else {
- unorderedProcess();
- }
- }
-
- private void clearAndNotify() {
- // clear all data
- queue.clear();
- entriesToMarkers.clear();
- markerToFinishedEntries.clear();
- lonelyEntries.clear();
-
- running = false;
-
- lock.notifyAll();
- }
-
- @Override
- public void run() {
- while (running) {
- synchronized (lock) {
-
- try {
- while (!workwork) {
- lock.wait();
- }
-
- processFinishedAsyncCollector();
-
- lock.notifyAll();
-
- workwork = false;
- }
- catch (InterruptedException e) {
- // The source of InterruptedException is from:
- // 1. lock.wait() statement in Emit
- // 2. collector waiting for vacant buffer
- // The action for this exception should try to clear all held data and
- // exit Emit thread.
-
- clearAndNotify();
- }
- catch (Exception e) {
- // For exceptions, not InterruptedException, it should be propagated
- // to main thread.
- error = e;
-
- clearAndNotify();
- }
- }
- }
- }
-
- public void stop() {
- running = false;
- }
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamElementEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamElementEntry.java
deleted file mode 100644
index de7f606aab105..0000000000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamElementEntry.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.async.buffer;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * The base class for entries in the {@link AsyncCollectorBuffer}
- *
- * @param Output data type
- */
-
-@Internal
-public interface StreamElementEntry {
- /**
- * Get result. Throw IOException while encountering an error.
- *
- * @return A List of result.
- * @throws IOException IOException wrapping errors from user codes.
- */
- List getResult() throws IOException;
-
- /**
- * Set the internal flag, marking the async operator has finished.
- */
- void markDone();
-
- /**
- * Get the flag indicating the async operator has finished or not.
- *
- * @return True for finished async operator.
- */
- boolean isDone();
-
- /**
- * Check inner element is StreamRecord or not.
- *
- * @return True if element is StreamRecord.
- */
- boolean isStreamRecord();
-
- /**
- * Check inner element is Watermark or not.
- *
- * @return True if element is Watermark.
- */
- boolean isWatermark();
-
- /**
- * Check inner element is LatencyMarker or not.
- *
- * @return True if element is LatencyMarker.
- */
- boolean isLatencyMarker();
-
- /**
- * Get inner stream element.
- *
- * @return Inner {@link StreamElement}.
- */
- StreamElement getStreamElement();
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamRecordEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamRecordEntry.java
deleted file mode 100644
index fb0dc3b0c0907..0000000000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamRecordEntry.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.async.buffer;
-
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * {@link AsyncCollectorBuffer} entry for {@link StreamRecord}
- *
- * @param Input data type
- * @param Output data type
- */
-public class StreamRecordEntry extends AbstractBufferEntry implements AsyncCollector {
- private List result;
- private Throwable error;
-
- private boolean isDone = false;
-
- private final AsyncCollectorBuffer buffer;
-
- public StreamRecordEntry(StreamRecord element, AsyncCollectorBuffer buffer) {
- super(element);
- this.buffer = Preconditions.checkNotNull(buffer, "Reference to AsyncCollectorBuffer should not be null");
- }
-
- @Override
- public void collect(List result) {
- this.result = result;
-
- this.buffer.markCollectorCompleted(this);
- }
-
- @Override
- public void collect(Throwable error) {
- this.error = error;
-
- this.buffer.markCollectorCompleted(this);
- }
-
- public List getResult() throws IOException {
- if (error != null) {
- throw new IOException(error.getMessage());
- }
- return result;
- }
-
- public void markDone() {
- isDone = true;
- }
-
- public boolean isDone() {
- return isDone;
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
index b2a58d2fdb393..a072acaa4300d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
@@ -18,16 +18,16 @@
package org.apache.flink.streaming.api.functions.async.collector;
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
-import java.util.List;
+import java.util.Collection;
/**
* {@link AsyncCollector} collects data / error in user codes while processing async i/o.
*
* @param Output type
*/
-@Internal
+@PublicEvolving
public interface AsyncCollector {
/**
* Set result.
@@ -35,14 +35,15 @@ public interface AsyncCollector {
* Note that it should be called for exactly one time in the user code.
* Calling this function for multiple times will cause data lose.
*
- * Put all results in a {@link List} and then issue {@link AsyncCollector#collect(List)}.
+ * Put all results in a {@link Collection} and then issue
+ * {@link AsyncCollector#collect(Collection)}.
*
* If the result is NULL, it will cause task fail. If collecting empty result set is allowable and
* should not cause task fail-over, then try to collect an empty list collection.
*
* @param result A list of results.
*/
- void collect(List result);
+ void collect(Collection result);
/**
* Set error
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
index 56fa14d89c0d5..dc80e81108121 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
@@ -63,6 +63,10 @@ public void setAbsoluteTimestamp(long timestamp) {
reuse.setTimestamp(timestamp);
}
+ public void eraseTimestamp() {
+ reuse.eraseTimestamp();
+ }
+
@Override
public void close() {
output.close();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 9166865e9600d..88fc833a82f9b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -19,110 +19,154 @@
package org.apache.flink.streaming.api.operators.async;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.apache.flink.streaming.api.functions.async.buffer.AsyncCollectorBuffer;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
+import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
+import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
+import org.apache.flink.streaming.api.operators.async.queue.WatermarkQueueEntry;
+import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
+import org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
-import java.util.Iterator;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
- * The {@link AsyncWaitOperator} will accept input {@link StreamElement} from previous operators,
- * pass them into {@link AsyncFunction}, make a snapshot for the inputs in the {@link AsyncCollectorBuffer}
- * while checkpointing, and restore the {@link AsyncCollectorBuffer} from previous state.
+ * The {@link AsyncWaitOperator} allows to asynchronously process incoming stream records. For that
+ * the operator creates an {@link AsyncCollector} which is passed to an {@link AsyncFunction}.
+ * Within the async function, the user can complete the async collector arbitrarily. Once the async
+ * collector has been completed, the result is emitted by the operator's emitter to downstream
+ * operators.
*
- * Note that due to newly added working thread, named {@link AsyncCollectorBuffer.Emitter},
- * if {@link AsyncWaitOperator} is chained with other operators, {@link StreamTask} has to make sure that
- * the the order to open operators in the operator chain should be from the tail operator to the head operator,
- * and order to close operators in the operator chain should be from the head operator to the tail operator.
+ * The operator offers different output modes depending on the chosen
+ * {@link AsyncDataStream.OutputMode}. In order to give exactly once processing guarantees, the
+ * operator stores all currently in-flight {@link StreamElement} in it's operator state. Upon
+ * recovery the recorded set of stream elements is replayed.
+ *
+ * In case of chaining of this operator, it has to be made sure that the operators in the chain are
+ * opened tail to head. The reason for this is that an opened {@link AsyncWaitOperator} starts
+ * already emitting recovered {@link StreamElement} to downstream operators.
*
* @param Input type for the operator.
* @param Output type for the operator.
*/
@Internal
public class AsyncWaitOperator
- extends AbstractUdfStreamOperator>
- implements OneInputStreamOperator
-{
+ extends AbstractUdfStreamOperator>
+ implements OneInputStreamOperator, OperatorActions {
private static final long serialVersionUID = 1L;
- private final static String STATE_NAME = "_async_wait_operator_state_";
+ private static final String STATE_NAME = "_async_wait_operator_state_";
- /**
- * {@link TypeSerializer} for inputs while making snapshots.
- */
+ /** Capacity of the stream element queue */
+ private final int capacity;
+
+ /** Output mode for this operator */
+ private final AsyncDataStream.OutputMode outputMode;
+
+ /** Timeout for the async collectors */
+ private final long timeout;
+
+ private transient Object checkpointingLock;
+
+ /** {@link TypeSerializer} for inputs while making snapshots. */
private transient StreamElementSerializer inStreamElementSerializer;
- /**
- * input stream elements from the state
- */
+ /** Recovered input stream elements */
private transient ListState recoveredStreamElements;
- private transient TimestampedCollector collector;
+ /** Queue to store the currently in-flight stream elements into */
+ private transient StreamElementQueue queue;
- private transient AsyncCollectorBuffer buffer;
+ /** Pending stream element which could not yet added to the queue */
+ private transient StreamElementQueueEntry> pendingStreamElementQueueEntry;
- /**
- * Checkpoint lock from {@link StreamTask#lock}
- */
- private transient Object checkpointLock;
+ private transient ExecutorService executor;
+
+ /** Emitter for the completed stream element queue entries */
+ private transient Emitter emitter;
- private final int bufferSize;
- private final AsyncDataStream.OutputMode mode;
+ /** Thread running the emitter */
+ private transient Thread emitterThread;
- public AsyncWaitOperator(AsyncFunction asyncFunction, int bufferSize, AsyncDataStream.OutputMode mode) {
+ public AsyncWaitOperator(
+ AsyncFunction asyncFunction,
+ int capacity,
+ AsyncDataStream.OutputMode outputMode) {
super(asyncFunction);
chainingStrategy = ChainingStrategy.ALWAYS;
- Preconditions.checkArgument(bufferSize > 0, "The number of concurrent async operation should be greater than 0.");
- this.bufferSize = bufferSize;
+ Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0.");
+ this.capacity = capacity;
- this.mode = mode;
+ this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");
+
+ this.timeout = -1L;
}
@Override
public void setup(StreamTask, ?> containingTask, StreamConfig config, Output> output) {
super.setup(containingTask, config, output);
- this.inStreamElementSerializer =
- new StreamElementSerializer(this.getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()));
-
- this.collector = new TimestampedCollector<>(output);
-
- this.checkpointLock = containingTask.getCheckpointLock();
-
- this.buffer = new AsyncCollectorBuffer<>(bufferSize, mode, output, collector, this.checkpointLock, this);
+ this.checkpointingLock = getContainingTask().getCheckpointLock();
+
+ this.inStreamElementSerializer = new StreamElementSerializer<>(
+ getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()));
+
+ // create the operators executor for the complete operations of the queue entries
+ this.executor = Executors.newSingleThreadExecutor();
+
+ switch (outputMode) {
+ case ORDERED:
+ queue = new OrderedStreamElementQueue(
+ capacity,
+ executor,
+ this);
+ break;
+ case UNORDERED:
+ queue = new UnorderedStreamElementQueue(
+ capacity,
+ executor,
+ this);
+ break;
+ default:
+ throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
+ }
}
@Override
public void open() throws Exception {
super.open();
- // process stream elements from state, since the Emit thread will start soon as all elements from
- // previous state are in the AsyncCollectorBuffer, we have to make sure that the order to open all
- // operators in the operator chain should be from the tail operator to the head operator.
- if (this.recoveredStreamElements != null) {
- for (StreamElement element : this.recoveredStreamElements.get()) {
+ // process stream elements from state, since the Emit thread will start as soon as all
+ // elements from previous state are in the StreamElementQueue, we have to make sure that the
+ // order to open all operators in the operator chain proceeds from the tail operator to the
+ // head operator.
+ if (recoveredStreamElements != null) {
+ for (StreamElement element : recoveredStreamElements.get()) {
if (element.isRecord()) {
processElement(element.asRecord());
}
@@ -133,30 +177,52 @@ else if (element.isLatencyMarker()) {
processLatencyMarker(element.asLatencyMarker());
}
else {
- throw new Exception("Unknown record type: "+element.getClass());
+ throw new IllegalStateException("Unknown record type " + element.getClass() +
+ " encountered while opening the operator.");
}
}
- this.recoveredStreamElements = null;
+ recoveredStreamElements = null;
}
- buffer.startEmitterThread();
+ // create the emitter
+ this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
+
+ // start the emitter thread
+ this.emitterThread = new Thread(emitter);
+ emitterThread.setDaemon(true);
+ emitterThread.start();
+
}
@Override
public void processElement(StreamRecord element) throws Exception {
- AsyncCollector collector = buffer.addStreamRecord(element);
+ final StreamRecordQueueEntry streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
+
+ if (timeout > 0L) {
+ // register a timeout for this AsyncStreamRecordBufferEntry
+ long timeoutTimestamp = timeout + System.currentTimeMillis();
+
+ getProcessingTimeService().registerTimer(
+ timeoutTimestamp,
+ new ProcessingTimeCallback() {
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ streamRecordBufferEntry.collect(
+ new TimeoutException("Async function call has timed out."));
+ }
+ });
+ }
- userFunction.asyncInvoke(element.getValue(), collector);
+ addAsyncBufferEntry(streamRecordBufferEntry);
+
+ userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
- buffer.addWatermark(mark);
- }
+ WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark);
- @Override
- public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
- buffer.addLatencyMarker(latencyMarker);
+ addAsyncBufferEntry(watermarkBufferEntry);
}
@Override
@@ -167,45 +233,155 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();
- Iterator iterator = buffer.getStreamElementsInBuffer();
- while (iterator.hasNext()) {
- partitionableState.add(iterator.next());
+ Collection> values = queue.values();
+
+ for (StreamElementQueueEntry> value : values) {
+ partitionableState.add(value.getStreamElement());
+ }
+
+ // add the pending stream element queue entry if the stream element queue is currently full
+ if (pendingStreamElementQueueEntry != null) {
+ partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
}
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
- recoveredStreamElements =
- context.getOperatorStateStore().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
+ recoveredStreamElements = context
+ .getOperatorStateStore()
+ .getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
}
@Override
public void close() throws Exception {
try {
- buffer.waitEmpty();
+ assert(Thread.holdsLock(checkpointingLock));
+
+ while (!queue.isEmpty()) {
+ // wait for the emitter thread to output the remaining elements
+ // for that he needs the checkpointing lock and thus we have to free it
+ checkpointingLock.wait();
+ }
}
finally {
- // make sure Emitter thread exits and close user function
- buffer.stopEmitterThread();
+ Exception exception = null;
+
+ try {
+ super.close();
+ } catch (InterruptedException interrupted) {
+ exception = interrupted;
+
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ try {
+ // terminate the emitter, the emitter thread and the executor
+ stopResources(true);
+ } catch (InterruptedException interrupted) {
+ exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);
+
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
- super.close();
+ if (exception != null) {
+ LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception);
+ }
}
}
@Override
public void dispose() throws Exception {
- super.dispose();
+ Exception exception = null;
+
+ try {
+ super.dispose();
+ } catch (InterruptedException interrupted) {
+ exception = interrupted;
+
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ try {
+ stopResources(false);
+ } catch (InterruptedException interrupted) {
+ exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);
+
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ if (exception != null) {
+ throw exception;
+ }
+ }
- buffer.stopEmitterThread();
+ /**
+ * Close the operator's resources. They include the emitter thread and the executor to run
+ * the queue's complete operation.
+ *
+ * @param waitForShutdown is true if the method should wait for the resources to be freed;
+ * otherwise false.
+ * @throws InterruptedException if current thread has been interrupted
+ */
+ private void stopResources(boolean waitForShutdown) throws InterruptedException {
+ emitter.stop();
+ emitterThread.interrupt();
+
+ executor.shutdown();
+
+ if (waitForShutdown) {
+ try {
+ if (!executor.awaitTermination(365L, TimeUnit.DAYS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+
+ Thread.currentThread().interrupt();
+ }
+
+ emitterThread.join();
+ } else {
+ executor.shutdownNow();
+ }
}
- public void sendLatencyMarker(LatencyMarker marker) throws Exception {
- super.processLatencyMarker(marker);
+ /**
+ * Add the given stream element queue entry to the operator's stream element queue. This
+ * operation blocks until the element has been added.
+ *
+ * For that it tries to put the element into the queue and if not successful then it waits on
+ * the checkpointing lock. The checkpointing lock is also used by the {@link Emitter} to output
+ * elements. The emitter is also responsible for notifying this method if the queue has capacity
+ * left again, by calling notifyAll on the checkpointing lock.
+ *
+ * @param streamElementQueueEntry to add to the operator's queue
+ * @param Type of the stream element queue entry's result
+ * @throws InterruptedException if the current thread has been interrupted
+ */
+ private void addAsyncBufferEntry(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException {
+ assert(Thread.holdsLock(checkpointingLock));
+
+ pendingStreamElementQueueEntry = streamElementQueueEntry;
+
+ while (!queue.tryPut(streamElementQueueEntry)) {
+ // we wait for the emitter to notify us if the queue has space left again
+ checkpointingLock.wait();
+ }
+
+ pendingStreamElementQueueEntry = null;
}
- @VisibleForTesting
- public AsyncCollectorBuffer getBuffer() {
- return buffer;
+ @Override
+ public void failOperator(Throwable throwable) {
+ getContainingTask().getEnvironment().failExternally(throwable);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
new file mode 100644
index 0000000000000..4b22aaa262762
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.async.queue.AsyncCollectionResult;
+import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
+import org.apache.flink.streaming.api.operators.async.queue.AsyncResult;
+import org.apache.flink.streaming.api.operators.async.queue.AsyncWatermarkResult;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * Runnable responsible for consuming elements from the given queue and outputting them to the
+ * given output/timestampedCollector.
+ *
+ * @param Type of the output elements
+ */
+public class Emitter implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);
+
+ /** Lock to hold before outputting */
+ private final Object checkpointLock;
+
+ /** Output for the watermark elements */
+ private final Output> output;
+
+ /** Queue to consume the async results from */
+ private final StreamElementQueue streamElementQueue;
+
+ private final OperatorActions operatorActions;
+
+ /** Output for stream records */
+ private final TimestampedCollector timestampedCollector;
+
+ private volatile boolean running;
+
+ public Emitter(
+ final Object checkpointLock,
+ final Output> output,
+ final StreamElementQueue streamElementQueue,
+ final OperatorActions operatorActions) {
+
+ this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
+ this.output = Preconditions.checkNotNull(output, "output");
+ this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "asyncCollectorBuffer");
+ this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
+
+ this.timestampedCollector = new TimestampedCollector<>(this.output);
+ this.running = true;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (running) {
+ LOG.debug("Wait for next completed async stream element result.");
+ AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
+
+ output(streamElementEntry);
+ }
+ } catch (InterruptedException e) {
+ if (running) {
+ operatorActions.failOperator(e);
+ } else {
+ // Thread got interrupted which means that it should shut down
+ LOG.debug("Emitter thread got interrupted. This indicates that the emitter should " +
+ "shut down.");
+ }
+ } catch (Throwable t) {
+ operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an " +
+ "unexpected throwable.", t));
+ }
+ }
+
+ private void output(AsyncResult asyncResult) throws InterruptedException {
+ if (asyncResult.isWatermark()) {
+ synchronized (checkpointLock) {
+ // remove the peeked element from the async collector buffer so that it is no longer
+ // checkpointed
+ streamElementQueue.poll();
+
+ // notify the main thread that there is again space left in the async collector
+ // buffer
+ checkpointLock.notifyAll();
+
+ AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();
+
+ LOG.debug("Output async watermark.");
+ output.emitWatermark(asyncWatermarkResult.getWatermark());
+ }
+ } else {
+ AsyncCollectionResult streamRecordResult = asyncResult.asResultCollection();
+
+ if (streamRecordResult.hasTimestamp()) {
+ timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp());
+ } else {
+ timestampedCollector.eraseTimestamp();
+ }
+
+ synchronized (checkpointLock) {
+ // remove the peeked element from the async collector buffer so that it is no longer
+ // checkpointed
+ streamElementQueue.poll();
+
+ // notify the main thread that there is again space left in the async collector
+ // buffer
+ checkpointLock.notifyAll();
+
+ LOG.debug("Output async stream element collection result.");
+
+ try {
+ Collection resultCollection = streamRecordResult.get();
+
+ for (OUT result : resultCollection) {
+ timestampedCollector.collect(result);
+ }
+ } catch (Exception e) {
+ operatorActions.failOperator(
+ new Exception("An async function call terminated with an exception. " +
+ "Failing the AsyncWaitOperator.", e));
+ }
+ }
+ }
+ }
+
+ public void stop() {
+ running = false;
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/LatencyMarkerEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
similarity index 67%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/LatencyMarkerEntry.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
index 1705c2d3d5e90..5a2e43c0b3de5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/LatencyMarkerEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
@@ -16,21 +16,19 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.functions.async.buffer;
+package org.apache.flink.streaming.api.operators.async;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.api.operators.StreamOperator;
/**
- * {@link AsyncCollectorBuffer} entry for {@link LatencyMarker}
- *
+ * Interface for {@link StreamOperator} actions.
*/
-public class LatencyMarkerEntry extends AbstractBufferEntry {
- public LatencyMarkerEntry(LatencyMarker marker) {
- super(marker);
- }
+public interface OperatorActions {
- @Override
- public boolean isDone() {
- return true;
- }
+ /**
+ * Fail the respective stream operator with the given throwable.
+ *
+ * @param throwable to fail the stream operator with
+ */
+ void failOperator(Throwable throwable);
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
new file mode 100644
index 0000000000000..8088bf0e64504
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import java.util.Collection;
+
+/**
+ * {@link AsyncResult} sub class for asynchronous result collections.
+ *
+ * @param Type of the collection elements.
+ */
+public interface AsyncCollectionResult extends AsyncResult {
+
+ boolean hasTimestamp();
+
+ long getTimestamp();
+
+ /**
+ * Return the asynchronous result collection.
+ *
+ * @return the asynchronous result collection
+ * @throws Exception if the asynchronous result collection could not be completed
+ */
+ Collection get() throws Exception;
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
new file mode 100644
index 0000000000000..1a99928150176
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * Asynchronous result returned by the {@link StreamElementQueue}. The asynchronous result can
+ * either be a {@link Watermark} or a collection of new output elements produced by the
+ * {@link AsyncFunction}.
+ */
+public interface AsyncResult {
+
+ /**
+ * True if the async result is a {@link Watermark}; otherwise false.
+ *
+ * @return True if the async result is a {@link Watermark}; otherwise false.
+ */
+ boolean isWatermark();
+
+ /**
+ * True fi the async result is a collection of output elements; otherwise false.
+ *
+ * @return True if the async reuslt is a collection of output elements; otherwise false
+ */
+ boolean isResultCollection();
+
+ /**
+ * Return this async result as a async watermark result.
+ *
+ * @return this result as a {@link AsyncWatermarkResult}.
+ */
+ AsyncWatermarkResult asWatermark();
+
+ /**
+ * Return this async result as a async result collection.
+ *
+ * @param Type of the result collection's elements
+ * @return this result as a {@link AsyncCollectionResult}.
+ */
+ AsyncCollectionResult asResultCollection();
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/WatermarkEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
similarity index 73%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/WatermarkEntry.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
index 8883a2d6073eb..c19b520d2cba4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/WatermarkEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
@@ -16,21 +16,18 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.functions.async.buffer;
+package org.apache.flink.streaming.api.operators.async.queue;
import org.apache.flink.streaming.api.watermark.Watermark;
/**
- * {@link AsyncCollectorBuffer} entry for {@link Watermark}
- *
+ * {@link AsyncResult} subclass for asynchronous result {@link Watermark}.
*/
-public class WatermarkEntry extends AbstractBufferEntry {
- public WatermarkEntry(Watermark watermark) {
- super(watermark);
- }
-
- @Override
- public boolean isDone() {
- return true;
- }
+public interface AsyncWatermarkResult extends AsyncResult {
+ /**
+ * Get the resulting watermark.
+ *
+ * @return the asynchronous result watermark
+ */
+ Watermark getWatermark();
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
new file mode 100644
index 0000000000000..2bbcb6c1b826d
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Ordered {@link StreamElementQueue} implementation. The ordered stream element queue emits
+ * asynchronous results in the order in which the {@link StreamElementQueueEntry} have been added
+ * to the queue. Thus, even if the completion order can be arbitrary, the output order strictly
+ * follows the insertion order (element cannot overtake each other).
+ */
+public class OrderedStreamElementQueue implements StreamElementQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);
+
+ /** Capacity of this queue */
+ private final int capacity;
+
+ /** Executor to run the onCompletion callback */
+ private final Executor executor;
+
+ /** Operator actions to signal a failure to the operator */
+ private final OperatorActions operatorActions;
+
+ /** Lock and conditions for the blocking queue */
+ private final ReentrantLock lock;
+ private final Condition notFull;
+ private final Condition headIsCompleted;
+
+ /** Queue for the inserted StreamElementQueueEntries */
+ private final ArrayDeque> queue;
+
+ public OrderedStreamElementQueue(
+ int capacity,
+ Executor executor,
+ OperatorActions operatorActions) {
+
+ Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
+ this.capacity = capacity;
+
+ this.executor = Preconditions.checkNotNull(executor, "executor");
+
+ this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
+
+ this.lock = new ReentrantLock(false);
+ this.headIsCompleted = lock.newCondition();
+ this.notFull = lock.newCondition();
+
+ this.queue = new ArrayDeque<>(capacity);
+ }
+
+ @Override
+ public AsyncResult peekBlockingly() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (queue.isEmpty() || !queue.peek().isDone()) {
+ headIsCompleted.await();
+ }
+
+ LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
+ "({}/{}).", queue.size(), capacity);
+
+ return queue.peek();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public AsyncResult poll() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (queue.isEmpty() || !queue.peek().isDone()) {
+ headIsCompleted.await();
+ }
+
+ notFull.signalAll();
+
+ LOG.debug("Polled head element from ordered stream element queue. New filling degree " +
+ "({}/{}).", queue.size() - 1, capacity);
+
+ return queue.poll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Collection> values() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ StreamElementQueueEntry>[] array = new StreamElementQueueEntry[queue.size()];
+
+ array = queue.toArray(array);
+
+ return Arrays.asList(array);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return queue.isEmpty();
+ }
+
+ @Override
+ public int size() {
+ return queue.size();
+ }
+
+ @Override
+ public void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (queue.size() >= capacity) {
+ notFull.await();
+ }
+
+ addEntry(streamElementQueueEntry);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ if (queue.size() < capacity) {
+ addEntry(streamElementQueueEntry);
+
+ LOG.debug("Put element into ordered stream element queue. New filling degree " +
+ "({}/{}).", queue.size(), capacity);
+
+ return true;
+ } else {
+ LOG.debug("Failed to put element into ordered stream element queue because it " +
+ "was full ({}/{}).", queue.size(), capacity);
+
+ return false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Add the given {@link StreamElementQueueEntry} to the queue. Additionally, this method
+ * registers a onComplete callback which is triggered once the given queue entry is completed.
+ *
+ * @param streamElementQueueEntry to be inserted
+ * @param Type of the stream element queue entry's result
+ */
+ private void addEntry(StreamElementQueueEntry streamElementQueueEntry) {
+ assert(lock.isHeldByCurrentThread());
+
+ queue.addLast(streamElementQueueEntry);
+
+ streamElementQueueEntry.onComplete(new AcceptFunction>() {
+ @Override
+ public void accept(StreamElementQueueEntry value) {
+ try {
+ onCompleteHandler(value);
+ } catch (InterruptedException e) {
+ // we got interrupted. This indicates a shutdown of the executor
+ LOG.debug("AsyncBufferEntry could not be properly completed because the " +
+ "executor thread has been interrupted.", e);
+ } catch (Throwable t) {
+ operatorActions.failOperator(new Exception("Could not complete the " +
+ "stream element queue entry: " + value + '.', t));
+ }
+ }
+ }, executor);
+ }
+
+ /**
+ * Check if the completed {@link StreamElementQueueEntry} is the current head. If this is the
+ * case, then notify the consumer thread about a new consumable entry.
+ *
+ * @param streamElementQueueEntry which has been completed
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ private void onCompleteHandler(StreamElementQueueEntry> streamElementQueueEntry) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ if (!queue.isEmpty() && queue.peek().isDone()) {
+ LOG.debug("Signal ordered stream element queue has completed head element.");
+ headIsCompleted.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
new file mode 100644
index 0000000000000..1a2c4a859b291
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+
+import java.util.Collection;
+
+/**
+ * Interface for blocking stream element queues for the {@link AsyncWaitOperator}.
+ */
+public interface StreamElementQueue {
+
+ /**
+ * Put the given element in the queue if capacity is left. If not, then block until this is
+ * the case.
+ *
+ * @param streamElementQueueEntry to be put into the queue
+ * @param Type of the entries future value
+ * @throws InterruptedException if the calling thread has been interrupted while waiting to
+ * insert the given element
+ */
+ void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException;
+
+ /**
+ * Try to put the given element in the queue. This operation succeeds if the queue has capacity
+ * left and fails if the queue is full.
+ *
+ * @param streamElementQueueEntry to be inserted
+ * @param Type of the entries future value
+ * @return True if the entry could be inserted; otherwise false
+ * @throws InterruptedException if the calling thread has been interrupted while waiting to
+ * insert the given element
+ */
+ boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException;
+
+ /**
+ * Peek at the head of the queue and return the first completed {@link AsyncResult}. This
+ * operation is a blocking operation and only returns once a completed async result has been
+ * found.
+ *
+ * @return Completed {@link AsyncResult}
+ * @throws InterruptedException if the current thread has been interrupted while waiting for a
+ * completed async result.
+ */
+ AsyncResult peekBlockingly() throws InterruptedException;
+
+ /**
+ * Poll the first completed {@link AsyncResult} from the head of this queue. This operation is
+ * blocking and only returns once a completed async result has been found.
+ *
+ * @return Completed {@link AsyncResult} which has been removed from the queue
+ * @throws InterruptedException if the current thread has been interrupted while waiting for a
+ * completed async result.
+ */
+ AsyncResult poll() throws InterruptedException;
+
+ /**
+ * Return the collection of {@link StreamElementQueueEntry} currently contained in this queue.
+ *
+ * @return Collection of currently contained {@link StreamElementQueueEntry}.
+ * @throws InterruptedException if the current thread has been interrupted while retrieving the
+ * stream element queue entries of this queue.
+ */
+ Collection> values() throws InterruptedException;
+
+ /**
+ * True if the queue is empty; otherwise false.
+ *
+ * @return True if the queue is empty; otherwise false.
+ */
+ boolean isEmpty();
+
+ /**
+ * Return the size of the queue.
+ *
+ * @return The number of elements contained in this queue.
+ */
+ int size();
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
new file mode 100644
index 0000000000000..06ebf3c30ce76
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Entry class for the {@link StreamElementQueue}. The stream element queue entry stores the
+ * {@link StreamElement} for which the stream element queue entry has been instantiated.
+ * Furthermore, it allows to register callbacks for when the queue entry is completed.
+ *
+ * @param Type of the result
+ */
+public abstract class StreamElementQueueEntry implements AsyncResult {
+
+ /** Stream element */
+ private final StreamElement streamElement;
+
+ public StreamElementQueueEntry(StreamElement streamElement) {
+ this.streamElement = Preconditions.checkNotNull(streamElement);
+ }
+
+ public StreamElement getStreamElement() {
+ return streamElement;
+ }
+
+ /**
+ * True if the stream element queue entry has been completed; otherwise false.
+ *
+ * @return True if the stream element queue entry has been completed; otherwise false.
+ */
+ public boolean isDone() {
+ return getFuture().isDone();
+ }
+
+ /**
+ * Register the given complete function to be called once this queue entry has been completed.
+ *
+ * @param completeFunction to call when the queue entry has been completed
+ * @param executor to run the complete function
+ */
+ public void onComplete(
+ final AcceptFunction> completeFunction,
+ Executor executor) {
+ final StreamElementQueueEntry thisReference = this;
+
+ getFuture().thenAcceptAsync(new AcceptFunction() {
+ @Override
+ public void accept(T value) {
+ completeFunction.accept(thisReference);
+ }
+ }, executor);
+ }
+
+ protected abstract Future getFuture();
+
+ @Override
+ public final boolean isWatermark() {
+ return AsyncWatermarkResult.class.isAssignableFrom(getClass());
+ }
+
+ @Override
+ public final boolean isResultCollection() {
+ return AsyncCollectionResult.class.isAssignableFrom(getClass());
+ }
+
+ @Override
+ public final AsyncWatermarkResult asWatermark() {
+ return (AsyncWatermarkResult) this;
+ }
+
+ @Override
+ public final AsyncCollectionResult asResultCollection() {
+ return (AsyncCollectionResult) this;
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
new file mode 100644
index 0000000000000..f0e707e4bfbda
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collection;
+
+/**
+ * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts
+ * as the {@link AsyncCollector} implementation which is given to the {@link AsyncFunction}. The
+ * async function completes this class with a collection of results.
+ *
+ * @param Type of the asynchronous collection result
+ */
+public class StreamRecordQueueEntry extends StreamElementQueueEntry>
+ implements AsyncCollectionResult, AsyncCollector {
+
+ /** Timestamp information */
+ private final boolean hasTimestamp;
+ private final long timestamp;
+
+ /** Future containing the collection result */
+ private final CompletableFuture> resultFuture;
+
+ public StreamRecordQueueEntry(StreamRecord> streamRecord) {
+ super(streamRecord);
+
+ hasTimestamp = streamRecord.hasTimestamp();
+ timestamp = streamRecord.getTimestamp();
+
+ resultFuture = new FlinkCompletableFuture<>();
+ }
+
+ @Override
+ public boolean hasTimestamp() {
+ return hasTimestamp;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public Collection get() throws Exception {
+ return resultFuture.get();
+ }
+
+ @Override
+ protected Future> getFuture() {
+ return resultFuture;
+ }
+
+ @Override
+ public void collect(Collection result) {
+ resultFuture.complete(result);
+ }
+
+ @Override
+ public void collect(Throwable error) {
+ resultFuture.completeExceptionally(error);
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
new file mode 100644
index 0000000000000..603d8cc60c2a5
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue
+ * emits asynchronous results as soon as they are completed. Additionally it maintains the
+ * watermark-stream record order. This means that no stream record can be overtaken by a watermark
+ * and no watermark can overtake a stream record. However, stream records falling in the same
+ * segment between two watermarks can overtake each other (their emission order is not guaranteed).
+ */
+public class UnorderedStreamElementQueue implements StreamElementQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
+
+ /** Capacity of this queue */
+ private final int capacity;
+
+ /** Executor to run the onComplete callbacks */
+ private final Executor executor;
+
+ /** OperatorActions to signal the owning operator a failure */
+ private final OperatorActions operatorActions;
+
+ /** Queue of uncompleted stream element queue entries segmented by watermarks */
+ private final ArrayDeque>> uncompletedQueue;
+
+ /** Queue of completed stream element queue entries */
+ private final ArrayDeque> completedQueue;
+
+ /** First (chronologically oldest) uncompleted set of stream element queue entries */
+ private Set> firstSet;
+
+ // Last (chronologically youngest) uncompleted set of stream element queue entries. New
+ // stream element queue entries are inserted into this set.
+ private Set> lastSet;
+ private volatile int numberEntries;
+
+ /** Locks and conditions for the blocking queue */
+ private final ReentrantLock lock;
+ private final Condition notFull;
+ private final Condition hasCompletedEntries;
+
+ public UnorderedStreamElementQueue(
+ int capacity,
+ Executor executor,
+ OperatorActions operatorActions) {
+
+ Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
+ this.capacity = capacity;
+
+ this.executor = Preconditions.checkNotNull(executor, "executor");
+
+ this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
+
+ this.uncompletedQueue = new ArrayDeque<>(capacity);
+ this.completedQueue = new ArrayDeque<>(capacity);
+
+ this.firstSet = new HashSet<>(capacity);
+ this.lastSet = firstSet;
+
+ this.numberEntries = 0;
+
+ this.lock = new ReentrantLock();
+ this.notFull = lock.newCondition();
+ this.hasCompletedEntries = lock.newCondition();
+ }
+
+ @Override
+ public void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (numberEntries >= capacity) {
+ notFull.await();
+ }
+
+ addEntry(streamElementQueueEntry);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ if (numberEntries < capacity) {
+ addEntry(streamElementQueueEntry);
+
+ LOG.debug("Put element into ordered stream element queue. New filling degree " +
+ "({}/{}).", numberEntries, capacity);
+
+ return true;
+ } else {
+ LOG.debug("Failed to put element into ordered stream element queue because it " +
+ "was full ({}/{}).", numberEntries, capacity);
+
+ return false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public AsyncResult peekBlockingly() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (completedQueue.isEmpty()) {
+ hasCompletedEntries.await();
+ }
+
+ LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
+ "({}/{}).", numberEntries, capacity);
+
+ return completedQueue.peek();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public AsyncResult poll() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (completedQueue.isEmpty()) {
+ hasCompletedEntries.await();
+ }
+
+ numberEntries--;
+ notFull.signalAll();
+
+ LOG.debug("Polled element from unordered stream element queue. New filling degree " +
+ "({}/{}).", numberEntries, capacity);
+
+ return completedQueue.poll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Collection> values() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ StreamElementQueueEntry>[] array = new StreamElementQueueEntry[numberEntries];
+
+ array = completedQueue.toArray(array);
+
+ int counter = completedQueue.size();
+
+ for (StreamElementQueueEntry> entry: firstSet) {
+ array[counter] = entry;
+ counter++;
+ }
+
+ for (Set> asyncBufferEntries : uncompletedQueue) {
+
+ for (StreamElementQueueEntry> streamElementQueueEntry : asyncBufferEntries) {
+ array[counter] = streamElementQueueEntry;
+ counter++;
+ }
+ }
+
+ return Arrays.asList(array);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return numberEntries == 0;
+ }
+
+ @Override
+ public int size() {
+ return numberEntries;
+ }
+
+ /**
+ * Callback for onComplete events for the given stream element queue entry. Whenever a queue
+ * entry is completed, it is checked whether this entry belogns to the first set. If this is the
+ * case, then the element is added to the completed entries queue from where it can be consumed.
+ * If the first set becomes empty, then the next set is polled from the uncompleted entries
+ * queue. Completed entries from this new set are then added to the completed entries queue.
+ *
+ * @param streamElementQueueEntry which has been completed
+ * @throws InterruptedException if the current thread has been interrupted while performing the
+ * on complete callback.
+ */
+ public void onCompleteHandler(StreamElementQueueEntry> streamElementQueueEntry) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ if (firstSet.remove(streamElementQueueEntry)) {
+ completedQueue.offer(streamElementQueueEntry);
+
+ while (firstSet.isEmpty() && firstSet != lastSet) {
+ firstSet = uncompletedQueue.poll();
+
+ Iterator> it = firstSet.iterator();
+
+ while (it.hasNext()) {
+ StreamElementQueueEntry> bufferEntry = it.next();
+
+ if (bufferEntry.isDone()) {
+ completedQueue.offer(bufferEntry);
+ it.remove();
+ }
+ }
+ }
+
+ LOG.debug("Signal unordered stream element queue has completed entries.");
+ hasCompletedEntries.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Add the given stream element queue entry to the current last set if it is not a watermark.
+ * If it is a watermark, then stop adding to the current last set, insert the watermark into its
+ * own set and add a new last set.
+ *
+ * @param streamElementQueueEntry to be inserted
+ * @param Type of the stream element queue entry's result
+ */
+ private void addEntry(StreamElementQueueEntry streamElementQueueEntry) {
+ assert(lock.isHeldByCurrentThread());
+
+ if (streamElementQueueEntry.isWatermark()) {
+ lastSet = new HashSet<>(capacity);
+
+ if (firstSet.isEmpty()) {
+ firstSet.add(streamElementQueueEntry);
+ } else {
+ Set> watermarkSet = new HashSet<>(1);
+ watermarkSet.add(streamElementQueueEntry);
+ uncompletedQueue.offer(watermarkSet);
+ }
+ uncompletedQueue.offer(lastSet);
+ } else {
+ lastSet.add(streamElementQueueEntry);
+ }
+
+ streamElementQueueEntry.onComplete(new AcceptFunction>() {
+ @Override
+ public void accept(StreamElementQueueEntry value) {
+ try {
+ onCompleteHandler(value);
+ } catch (InterruptedException e) {
+ // The accept executor thread got interrupted. This is probably cause by
+ // the shutdown of the executor.
+ LOG.debug("AsyncBufferEntry could not be properly completed because the " +
+ "executor thread has been interrupted.", e);
+ } catch (Throwable t) {
+ operatorActions.failOperator(new Exception("Could not complete the " +
+ "stream element queue entry: " + value + '.', t));
+ }
+ }
+ }, executor);
+
+ numberEntries++;
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
new file mode 100644
index 0000000000000..6fe4f440e003d
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * {@link StreamElementQueueEntry} implementation for the {@link Watermark}.
+ */
+public class WatermarkQueueEntry extends StreamElementQueueEntry implements AsyncWatermarkResult {
+
+ private final Future future;
+
+ public WatermarkQueueEntry(Watermark watermark) {
+ super(watermark);
+
+ this.future = FlinkCompletableFuture.completed(watermark);
+ }
+
+ @Override
+ public Watermark getWatermark() {
+ return (Watermark) getStreamElement();
+ }
+
+ @Override
+ protected Future getFuture() {
+ return future;
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 680cc291608af..7771064e2e2ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -52,7 +52,6 @@
import java.util.Map;
import java.util.Random;
-
/**
* The {@code OperatorChain} contains all operators that are executed as one chain within a single
* {@link StreamTask}.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 0fb22b887bd12..bd9215ab093ff 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -326,11 +326,6 @@ public Long getValue() {
LOG.error("Could not shut down async checkpoint threads", t);
}
- // release the output resources. this method should never fail.
- if (operatorChain != null) {
- operatorChain.releaseOutputs();
- }
-
// we must! perform this cleanup
try {
cleanup();
@@ -344,6 +339,11 @@ public Long getValue() {
if (!disposed) {
disposeAllOperators();
}
+
+ // release the output resources. this method should never fail.
+ if (operatorChain != null) {
+ operatorChain.releaseOutputs();
+ }
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index b8788c6a94daa..12ac69352382e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -18,147 +18,252 @@
package org.apache.flink.streaming.api.functions.async;
-import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.junit.Assert;
import org.junit.Test;
-import static org.mockito.Matchers.anyString;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
- * Test case for {@link RichAsyncFunction}
+ * Test cases for {@link RichAsyncFunction}
*/
public class RichAsyncFunctionTest {
- private RichAsyncFunction initFunction() {
- RichAsyncFunction function = new RichAsyncFunction() {
+ /**
+ * Test the set of iteration runtime context methods in the context of a
+ * {@link RichAsyncFunction}.
+ */
+ @Test
+ public void testIterationRuntimeContext() throws Exception {
+ RichAsyncFunction function = new RichAsyncFunction() {
+ private static final long serialVersionUID = -2023923961609455894L;
+
@Override
- public void asyncInvoke(String input, AsyncCollector collector) throws Exception {
- getRuntimeContext().getState(mock(ValueStateDescriptor.class));
+ public void asyncInvoke(Integer input, AsyncCollector collector) throws Exception {
+ // no op
}
};
- return function;
+ int superstepNumber = 42;
+
+ IterationRuntimeContext mockedIterationRuntimeContext = mock(IterationRuntimeContext.class);
+ when(mockedIterationRuntimeContext.getSuperstepNumber()).thenReturn(superstepNumber);
+ function.setRuntimeContext(mockedIterationRuntimeContext);
+
+ IterationRuntimeContext iterationRuntimeContext = function.getIterationRuntimeContext();
+
+ assertEquals(superstepNumber, iterationRuntimeContext.getSuperstepNumber());
+
+ try {
+ iterationRuntimeContext.getIterationAggregator("foobar");
+ fail("Expected getIterationAggregator to fail with unsupported operation exception");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ iterationRuntimeContext.getPreviousIterationAggregate("foobar");
+ fail("Expected getPreviousIterationAggregator to fail with unsupported operation exception");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
}
+ /**
+ * Test the set of runtime context methods in the context of a {@link RichAsyncFunction}.
+ */
@Test
- public void testIterationRuntimeContext() throws Exception {
- // test runtime context is not set
- RichAsyncFunction function = new RichAsyncFunction() {
+ public void testRuntimeContext() throws Exception {
+ RichAsyncFunction function = new RichAsyncFunction() {
+ private static final long serialVersionUID = 1707630162838967972L;
+
@Override
- public void asyncInvoke(String input, AsyncCollector collector) throws Exception {
- getIterationRuntimeContext().getIterationAggregator("test");
+ public void asyncInvoke(Integer input, AsyncCollector collector) throws Exception {
+ // no op
}
};
+ final String taskName = "foobarTask";
+ final MetricGroup metricGroup = mock(MetricGroup.class);
+ final int numberOfParallelSubtasks = 42;
+ final int indexOfSubtask = 43;
+ final int attemptNumber = 1337;
+ final String taskNameWithSubtask = "barfoo";
+ final ExecutionConfig executionConfig = mock(ExecutionConfig.class);
+ final ClassLoader userCodeClassLoader = mock(ClassLoader.class);
+
+ RuntimeContext mockedRuntimeContext = mock(RuntimeContext.class);
+
+ when(mockedRuntimeContext.getTaskName()).thenReturn(taskName);
+ when(mockedRuntimeContext.getMetricGroup()).thenReturn(metricGroup);
+ when(mockedRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(numberOfParallelSubtasks);
+ when(mockedRuntimeContext.getIndexOfThisSubtask()).thenReturn(indexOfSubtask);
+ when(mockedRuntimeContext.getAttemptNumber()).thenReturn(attemptNumber);
+ when(mockedRuntimeContext.getTaskNameWithSubtasks()).thenReturn(taskNameWithSubtask);
+ when(mockedRuntimeContext.getExecutionConfig()).thenReturn(executionConfig);
+ when(mockedRuntimeContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
+
+ function.setRuntimeContext(mockedRuntimeContext);
+
+ RuntimeContext runtimeContext = function.getRuntimeContext();
+
+ assertEquals(taskName, runtimeContext.getTaskName());
+ assertEquals(metricGroup, runtimeContext.getMetricGroup());
+ assertEquals(numberOfParallelSubtasks, runtimeContext.getNumberOfParallelSubtasks());
+ assertEquals(indexOfSubtask, runtimeContext.getIndexOfThisSubtask());
+ assertEquals(attemptNumber, runtimeContext.getAttemptNumber());
+ assertEquals(taskNameWithSubtask, runtimeContext.getTaskNameWithSubtasks());
+ assertEquals(executionConfig, runtimeContext.getExecutionConfig());
+ assertEquals(userCodeClassLoader, runtimeContext.getUserCodeClassLoader());
+
try {
- function.asyncInvoke("test", mock(AsyncCollector.class));
- }
- catch (Exception e) {
- Assert.assertEquals("The runtime context has not been initialized.", e.getMessage());
+ runtimeContext.getDistributedCache();
+ fail("Expected getDistributedCached to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
}
- // test get agg from iteration runtime context
- function.setRuntimeContext(mock(IterationRuntimeContext.class));
-
try {
- function.asyncInvoke("test", mock(AsyncCollector.class));
+ runtimeContext.getState(new ValueStateDescriptor<>("foobar", Integer.class, 42));
+ fail("Expected getState to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
}
- catch (Exception e) {
- Assert.assertEquals("Get iteration aggregator is not supported in rich async function", e.getMessage());
+
+ try {
+ runtimeContext.getListState(new ListStateDescriptor<>("foobar", Integer.class));
+ fail("Expected getListState to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
}
- // get state from iteration runtime context
- function = new RichAsyncFunction() {
- @Override
- public void asyncInvoke(String input, AsyncCollector collector) throws Exception {
- getIterationRuntimeContext().getState(mock(ValueStateDescriptor.class));
- }
- };
+ try {
+ runtimeContext.getReducingState(new ReducingStateDescriptor<>("foobar", new ReduceFunction() {
+ private static final long serialVersionUID = 2136425961884441050L;
- function.setRuntimeContext(mock(RuntimeContext.class));
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1;
+ }
+ }, Integer.class));
+ fail("Expected getReducingState to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
try {
- function.asyncInvoke("test", mock(AsyncCollector.class));
- }
- catch (Exception e) {
- Assert.assertEquals("State is not supported in rich async function", e.getMessage());
- }
+ runtimeContext.addAccumulator("foobar", new Accumulator() {
+ private static final long serialVersionUID = -4673320336846482358L;
- // test getting a counter from iteration runtime context
- function = new RichAsyncFunction() {
- @Override
- public void asyncInvoke(String input, AsyncCollector collector) throws Exception {
- getIterationRuntimeContext().getIntCounter("test").add(6);
- }
- };
+ @Override
+ public void add(Integer value) {
+ // no op
+ }
- IterationRuntimeContext context = mock(IterationRuntimeContext.class);
- IntCounter counter = new IntCounter(0);
- when(context.getIntCounter(anyString())).thenReturn(counter);
+ @Override
+ public Integer getLocalValue() {
+ return null;
+ }
- function.setRuntimeContext(context);
+ @Override
+ public void resetLocal() {
- function.asyncInvoke("test", mock(AsyncCollector.class));
+ }
- Assert.assertTrue(6 == counter.getLocalValue());
- }
+ @Override
+ public void merge(Accumulator other) {
- @Test
- public void testRuntimeContext() throws Exception {
- // test run time context is not set
- RichAsyncFunction function = new RichAsyncFunction() {
- @Override
- public void asyncInvoke(String input, AsyncCollector collector) throws Exception {
- getRuntimeContext().getState(mock(ValueStateDescriptor.class));
- }
- };
+ }
- try {
- function.asyncInvoke("test", mock(AsyncCollector.class));
- }
- catch (Exception e) {
- Assert.assertEquals("The runtime context has not been initialized.", e.getMessage());
+ @Override
+ public Accumulator clone() {
+ return null;
+ }
+ });
+ fail("Expected addAccumulator to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
}
- // test get state
- function = new RichAsyncFunction() {
- @Override
- public void asyncInvoke(String input, AsyncCollector collector) throws Exception {
- getRuntimeContext().getState(mock(ValueStateDescriptor.class));
- }
- };
+ try {
+ runtimeContext.getAccumulator("foobar");
+ fail("Expected getAccumulator to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- function.setRuntimeContext(mock(RuntimeContext.class));
+ try {
+ runtimeContext.getAllAccumulators();
+ fail("Expected getAllAccumulators to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
try {
- function.asyncInvoke("test", mock(AsyncCollector.class));
+ runtimeContext.getIntCounter("foobar");
+ fail("Expected getIntCounter to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
}
- catch (Exception e) {
- Assert.assertEquals("State is not supported in rich async function", e.getMessage());
+
+ try {
+ runtimeContext.getLongCounter("foobar");
+ fail("Expected getLongCounter to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
}
- // test getting a counter from runtime context
- function = new RichAsyncFunction() {
- @Override
- public void asyncInvoke(String input, AsyncCollector collector) throws Exception {
- getIterationRuntimeContext().getIntCounter("test").add(6);
- }
- };
+ try {
+ runtimeContext.getDoubleCounter("foobar");
+ fail("Expected getDoubleCounter to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- IterationRuntimeContext context = mock(IterationRuntimeContext.class);
- IntCounter counter = new IntCounter(0);
- when(context.getIntCounter(anyString())).thenReturn(counter);
+ try {
+ runtimeContext.getHistogram("foobar");
+ fail("Expected getHistogram to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- function.setRuntimeContext(context);
+ try {
+ runtimeContext.hasBroadcastVariable("foobar");
+ fail("Expected hasBroadcastVariable to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- function.asyncInvoke("test", mock(AsyncCollector.class));
+ try {
+ runtimeContext.getBroadcastVariable("foobar");
+ fail("Expected getBroadcastVariable to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- Assert.assertTrue(6 == counter.getLocalValue());
+ try {
+ runtimeContext.getBroadcastVariableWithInitializer("foobar", new BroadcastVariableInitializer() {
+ @Override
+ public Object initializeBroadcastVariable(Iterable data) {
+ return null;
+ }
+ });
+ fail("Expected getBroadcastVariableWithInitializer to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
deleted file mode 100644
index d118d8029c300..0000000000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
+++ /dev/null
@@ -1,656 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.async;
-
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.streaming.api.datastream.AsyncDataStream;
-import org.apache.flink.streaming.api.functions.async.buffer.StreamElementEntry;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.apache.flink.streaming.api.functions.async.buffer.AsyncCollectorBuffer;
-import org.apache.flink.streaming.api.functions.async.AsyncFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link AsyncCollectorBuffer}. These test that:
- *
- *
- * Add a new item into the buffer
- * Ordered mode processing
- * Unordered mode processing
- * Error handling
- *
- */
-public class AsyncCollectorBufferTest {
- private final static ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);
-
- private final Random RANDOM = new Random();
-
- private AsyncFunction function;
-
- private AsyncWaitOperator operator;
-
- private AsyncCollectorBuffer buffer;
-
- private Output> output;
-
- private Object lock = new Object();
-
- public AsyncCollectorBuffer getBuffer(int bufferSize, AsyncDataStream.OutputMode mode) throws Exception {
- function = new AsyncFunction() {
- @Override
- public void asyncInvoke(Integer input, AsyncCollector