Skip to content

Commit

Permalink
[FLINK-4391] Add timeout parameter for asynchronous I/O
Browse files Browse the repository at this point in the history
The timeout defines how long an asynchronous I/O operation can take. If the operation
takes longer than the timeout, then it is failed with an TimeoutException.

Annotate classes with internal Annotation
  • Loading branch information
tillrohrmann committed Dec 20, 2016
1 parent ad603d5 commit 6c5a871
Show file tree
Hide file tree
Showing 18 changed files with 277 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ private static void printUsage() {
"[--maxCount <max number of input from source, -1 for infinite input>] " +
"[--sleepFactor <interval to sleep for each stream element>] [--failRatio <possibility to throw exception>] " +
"[--waitMode <ordered or unordered>] [--waitOperatorParallelism <parallelism for async wait operator>] " +
"[--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>]");
"[--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>]" +
"[--timeout <Timeout for the asynchronous operations>]");
}

public static void main(String[] args) throws Exception {
Expand All @@ -226,6 +227,7 @@ public static void main(String[] args) throws Exception {
final int taskNum;
final String timeType;
final long shutdownWaitTS;
final long timeout;

try {
// check the configuration for the job
Expand All @@ -238,6 +240,7 @@ public static void main(String[] args) throws Exception {
taskNum = params.getInt("waitOperatorParallelism", 1);
timeType = params.get("eventType", "EventTime");
shutdownWaitTS = params.getLong("shutdownWaitTS", 20000);
timeout = params.getLong("timeout", 10000L);
} catch (Exception e) {
printUsage();

Expand Down Expand Up @@ -292,10 +295,20 @@ else if (INGESTION_TIME.equals(timeType)) {
// add async operator to streaming job
DataStream<String> result;
if (ORDERED.equals(mode)) {
result = AsyncDataStream.orderedWait(inputStream, function, 20).setParallelism(taskNum);
result = AsyncDataStream.orderedWait(
inputStream,
function,
timeout,
TimeUnit.MILLISECONDS,
20).setParallelism(taskNum);
}
else {
result = AsyncDataStream.unorderedWait(inputStream, function, 20).setParallelism(taskNum);
result = AsyncDataStream.unorderedWait(
inputStream,
function,
timeout,
TimeUnit.MILLISECONDS,
20).setParallelism(taskNum);
}

// add a reduce to get the sum of each keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;

import java.util.concurrent.TimeUnit;

/**
* A helper class to apply {@link AsyncFunction} to a data stream.
* <p>
* <pre>{@code
* DataStream<String> input = ...
* AsyncFunction<String, Tuple<String, String>> asyncFunc = ...
*
* AsyncDataStream.orderedWait(input, asyncFunc, 100);
* AsyncDataStream.orderedWait(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100);
* }
* </pre>
*/
Expand All @@ -40,13 +42,14 @@
public class AsyncDataStream {
public enum OutputMode { ORDERED, UNORDERED }

private static final int DEFAULT_BUFFER_SIZE = 100;
private static final int DEFAULT_QUEUE_CAPACITY = 100;

/**
* Add an AsyncWaitOperator.
*
* @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added.
* @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}.
* @param timeout for the asynchronous operation to complete
* @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside.
* @param mode Processing mode for {@link AsyncWaitOperator}.
* @param <IN> Input type.
Expand All @@ -56,6 +59,7 @@ public enum OutputMode { ORDERED, UNORDERED }
private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
int bufSize,
OutputMode mode) {

Expand All @@ -64,8 +68,11 @@ private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
true, in.getType(), Utils.getCallLocationName(), true);

// create transform
AsyncWaitOperator<IN, OUT> operator =
new AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func), bufSize, mode);
AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
in.getExecutionEnvironment().clean(func),
timeout,
bufSize,
mode);

return in.transform("async wait operator", outTypeInfo, operator);
}
Expand All @@ -75,61 +82,89 @@ private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
*
* @param in Input {@link DataStream}
* @param func {@link AsyncFunction}
* @param bufSize The max number of async i/o operation that can be triggered
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the given timeout
* @param capacity The max number of async i/o operation that can be triggered
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}.
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
int bufSize) {
return addOperator(in, func, bufSize, OutputMode.UNORDERED);
long timeout,
TimeUnit timeUnit,
int capacity) {
return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);
}

/**
* Add an AsyncWaitOperator. The order of output stream records may be reordered.
* @param in Input {@link DataStream}
* @param func {@link AsyncFunction}
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the given timeout
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}.
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func) {
return addOperator(in, func, DEFAULT_BUFFER_SIZE, OutputMode.UNORDERED);
AsyncFunction<IN, OUT> func,
long timeout,
TimeUnit timeUnit) {
return addOperator(
in,
func,
timeUnit.toMillis(timeout),
DEFAULT_QUEUE_CAPACITY,
OutputMode.UNORDERED);
}

/**
* Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as input ones.
* Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as
* input ones.
*
* @param in Input {@link DataStream}
* @param func {@link AsyncFunction}
* @param bufSize The max number of async i/o operation that can be triggered
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the given timeout
* @param capacity The max number of async i/o operation that can be triggered
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}.
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
int bufSize) {
return addOperator(in, func, bufSize, OutputMode.ORDERED);
long timeout,
TimeUnit timeUnit,
int capacity) {
return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
}

/**
* Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as input ones.
* Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as
* input ones.
*
* @param in Input {@link DataStream}
* @param func {@link AsyncFunction}
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the given timeout
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}.
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func) {
return addOperator(in, func, DEFAULT_BUFFER_SIZE, OutputMode.ORDERED);
AsyncFunction<IN, OUT> func,
long timeout,
TimeUnit timeUnit) {
return addOperator(
in,
func,
timeUnit.toMillis(timeout),
DEFAULT_QUEUE_CAPACITY,
OutputMode.ORDERED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class AsyncWaitOperator<IN, OUT>

public AsyncWaitOperator(
AsyncFunction<IN, OUT> asyncFunction,
long timeout,
int capacity,
AsyncDataStream.OutputMode outputMode) {
super(asyncFunction);
Expand All @@ -124,7 +125,7 @@ public AsyncWaitOperator(

this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

this.timeout = -1L;
this.timeout = timeout;
}

@Override
Expand Down Expand Up @@ -200,7 +201,7 @@ public void processElement(StreamRecord<IN> element) throws Exception {

if (timeout > 0L) {
// register a timeout for this AsyncStreamRecordBufferEntry
long timeoutTimestamp = timeout + System.currentTimeMillis();
long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();

getProcessingTimeService().registerTimer(
timeoutTimestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.operators.async;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.async.queue.AsyncCollectionResult;
Expand All @@ -37,6 +38,7 @@
*
* @param <OUT> Type of the output elements
*/
@Internal
public class Emitter<OUT> implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.flink.streaming.api.operators.async;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.StreamOperator;

/**
* Interface for {@link StreamOperator} actions.
*/
@Internal
public interface OperatorActions {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

package org.apache.flink.streaming.api.operators.async.queue;

import org.apache.flink.annotation.Internal;

import java.util.Collection;

/**
* {@link AsyncResult} sub class for asynchronous result collections.
*
* @param <T> Type of the collection elements.
*/
@Internal
public interface AsyncCollectionResult<T> extends AsyncResult {

boolean hasTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.operators.async.queue;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.watermark.Watermark;

Expand All @@ -26,6 +27,7 @@
* either be a {@link Watermark} or a collection of new output elements produced by the
* {@link AsyncFunction}.
*/
@Internal
public interface AsyncResult {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.flink.streaming.api.operators.async.queue;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.watermark.Watermark;

/**
* {@link AsyncResult} subclass for asynchronous result {@link Watermark}.
*/
@Internal
public interface AsyncWatermarkResult extends AsyncResult {
/**
* Get the resulting watermark.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.operators.async.queue;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.util.Preconditions;
Expand All @@ -37,6 +38,7 @@
* to the queue. Thus, even if the completion order can be arbitrary, the output order strictly
* follows the insertion order (element cannot overtake each other).
*/
@Internal
public class OrderedStreamElementQueue implements StreamElementQueue {

private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.flink.streaming.api.operators.async.queue;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;

import java.util.Collection;

/**
* Interface for blocking stream element queues for the {@link AsyncWaitOperator}.
*/
@Internal
public interface StreamElementQueue {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.operators.async.queue;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
Expand All @@ -32,6 +33,7 @@
*
* @param <T> Type of the result
*/
@Internal
public abstract class StreamElementQueueEntry<T> implements AsyncResult {

/** Stream element */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.operators.async.queue;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
Expand All @@ -34,6 +35,7 @@
*
* @param <OUT> Type of the asynchronous collection result
*/
@Internal
public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
implements AsyncCollectionResult<OUT>, AsyncCollector<OUT> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.operators.async.queue;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.util.Preconditions;
Expand All @@ -41,6 +42,7 @@
* and no watermark can overtake a stream record. However, stream records falling in the same
* segment between two watermarks can overtake each other (their emission order is not guaranteed).
*/
@Internal
public class UnorderedStreamElementQueue implements StreamElementQueue {

private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
Expand Down
Loading

0 comments on commit 6c5a871

Please sign in to comment.