Skip to content

Commit

Permalink
[FLINK-4391] Polish asynchronous I/O operations
Browse files Browse the repository at this point in the history
Polish AsyncFunction

Move AsyncCollectorBuffer to operators package

Rework AsyncWaitOperator and AsyncStreamElementQueue implementation

Rename AsyncCollectorQueue into StreamElementQueue

Reworked StreamingOperatorsITCase and RichAsyncFunctionTest

Refactor AsyncWaitOperatorTest

Add StreamElementQueueTests

Add EmitterTest case

Add comments
  • Loading branch information
tillrohrmann committed Dec 20, 2016
1 parent f528307 commit ad603d5
Show file tree
Hide file tree
Showing 34 changed files with 2,804 additions and 2,126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@

import org.apache.flink.annotation.Internal;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;

import static org.apache.flink.util.Preconditions.checkNotNull;

@Internal
public final class ExceptionUtils {
public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
Expand Down Expand Up @@ -59,7 +62,56 @@ public static String stringifyException(final Throwable e) {
return e.getClass().getName() + " (error while printing stack trace)";
}
}


/**
* Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
* to a prior exception, or returns the new exception, if no prior exception exists.
*
* <pre>{@code
*
* public void closeAllThings() throws Exception {
* Exception ex = null;
* try {
* component.shutdown();
* } catch (Exception e) {
* ex = firstOrSuppressed(e, ex);
* }
* try {
* anotherComponent.stop();
* } catch (Exception e) {
* ex = firstOrSuppressed(e, ex);
* }
* try {
* lastComponent.shutdown();
* } catch (Exception e) {
* ex = firstOrSuppressed(e, ex);
* }
*
* if (ex != null) {
* throw ex;
* }
* }
* }</pre>
*
* @param newException The newly occurred exception
* @param previous The previously occurred exception, possibly null.
*
* @return The new exception, if no previous exception exists, or the previous exception with the
* new exception in the list of suppressed exceptions.
*/
public static <T extends Throwable> T firstOrSuppressed(T newException, @Nullable T previous) {
checkNotNull(newException, "newException");

if (previous == null) {
return newException;
} else {
previous.addSuppressed(newException);
return previous;
}
}



/**
* Throws the given {@code Throwable} in scenarios where the signatures do not allow you to
* throw an arbitrary Throwable. Errors and RuntimeExceptions are thrown directly, other exceptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -43,10 +45,17 @@
import java.util.concurrent.TimeUnit;

/**
* Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction}
* Example to illustrates how to use {@link AsyncFunction}
*/
public class AsyncIOExample {

private static final Logger LOG = LoggerFactory.getLogger(AsyncIOExample.class);

private static final String EXACTLY_ONCE_MODE = "exactly_once";
private static final String EVENT_TIME = "EventTime";
private static final String INGESTION_TIME = "IngestionTime";
private static final String ORDERED = "ordered";

/**
* A checkpointed source.
*/
Expand Down Expand Up @@ -103,26 +112,28 @@ public void cancel() {
* async client.
*/
private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
transient static ExecutorService executorService;
transient static Random random;
private static final long serialVersionUID = 2098635244857937717L;

private static ExecutorService executorService;
private static Random random;

private int counter;

/**
* The result of multiplying sleepFactor with a random float is used to pause
* the working thread in the thread pool, simulating a time consuming async operation.
*/
final long sleepFactor;
private final long sleepFactor;

/**
* The ratio to generate an exception to simulate an async error. For example, the error
* may be a TimeoutException while visiting HBase.
*/
final float failRatio;
private final float failRatio;

final long shutdownWaitTS;
private final long shutdownWaitTS;

public SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
this.sleepFactor = sleepFactor;
this.failRatio = failRatio;
this.shutdownWaitTS = shutdownWaitTS;
Expand Down Expand Up @@ -155,7 +166,9 @@ public void close() throws Exception {
executorService.shutdown();

try {
executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS);
if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
Expand All @@ -169,14 +182,15 @@ public void asyncInvoke(final Integer input, final AsyncCollector<String> collec
@Override
public void run() {
// wait for while to simulate async operation here
int sleep = (int) (random.nextFloat() * sleepFactor);
long sleep = (long) (random.nextFloat() * sleepFactor);
try {
Thread.sleep(sleep);
List<String> ret = Collections.singletonList("key-" + (input % 10));

if (random.nextFloat() < failRatio) {
collector.collect(new Exception("wahahahaha..."));
} else {
collector.collect(ret);
collector.collect(
Collections.singletonList("key-" + (input % 10)));
}
} catch (InterruptedException e) {
collector.collect(new ArrayList<String>(0));
Expand All @@ -200,47 +214,71 @@ public static void main(String[] args) throws Exception {
// obtain execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

printUsage();

// parse parameters
final ParameterTool params = ParameterTool.fromArgs(args);

// check the configuration for the job
final String statePath = params.getRequired("fsStatePath");
final String cpMode = params.get("checkpointMode", "exactly_once");
final int maxCount = params.getInt("maxCount", 100000);
final int sleepFactor = params.getInt("sleepFactor", 100);
final float failRatio = params.getFloat("failRatio", 0.001f);
final String mode = params.get("waitMode", "ordered");
final int taskNum = params.getInt("waitOperatorParallelism", 1);
final String timeType = params.get("eventType", "EventTime");
final int shutdownWaitTS = params.getInt("shutdownWaitTS", 20000);

System.out.println("Job configuration\n"
+"\tFS state path="+statePath+"\n"
+"\tCheckpoint mode="+cpMode+"\n"
+"\tMax count of input from source="+maxCount+"\n"
+"\tSleep factor="+sleepFactor+"\n"
+"\tFail ratio="+failRatio+"\n"
+"\tWaiting mode="+mode+"\n"
+"\tParallelism for async wait operator="+taskNum+"\n"
+"\tEvent type="+timeType+"\n"
+"\tShutdown wait timestamp="+shutdownWaitTS);

// setup state and checkpoint mode
env.setStateBackend(new FsStateBackend(statePath));
if (cpMode.equals("exactly_once")) {
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
final String statePath;
final String cpMode;
final int maxCount;
final long sleepFactor;
final float failRatio;
final String mode;
final int taskNum;
final String timeType;
final long shutdownWaitTS;

try {
// check the configuration for the job
statePath = params.get("fsStatePath", null);
cpMode = params.get("checkpointMode", "exactly_once");
maxCount = params.getInt("maxCount", 100000);
sleepFactor = params.getLong("sleepFactor", 100);
failRatio = params.getFloat("failRatio", 0.001f);
mode = params.get("waitMode", "ordered");
taskNum = params.getInt("waitOperatorParallelism", 1);
timeType = params.get("eventType", "EventTime");
shutdownWaitTS = params.getLong("shutdownWaitTS", 20000);
} catch (Exception e) {
printUsage();

throw e;
}

StringBuilder configStringBuilder = new StringBuilder();

final String lineSeparator = System.getProperty("line.separator");

configStringBuilder
.append("Job configuration").append(lineSeparator)
.append("FS state path=").append(statePath).append(lineSeparator)
.append("Checkpoint mode=").append(cpMode).append(lineSeparator)
.append("Max count of input from source=").append(maxCount).append(lineSeparator)
.append("Sleep factor=").append(sleepFactor).append(lineSeparator)
.append("Fail ratio=").append(failRatio).append(lineSeparator)
.append("Waiting mode=").append(mode).append(lineSeparator)
.append("Parallelism for async wait operator=").append(taskNum).append(lineSeparator)
.append("Event type=").append(timeType).append(lineSeparator)
.append("Shutdown wait timestamp=").append(shutdownWaitTS);

LOG.info(configStringBuilder.toString());

if (statePath != null) {
// setup state and checkpoint mode
env.setStateBackend(new FsStateBackend(statePath));
}

if (EXACTLY_ONCE_MODE.equals(cpMode)) {
env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
}
else {
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
}

// enable watermark or not
if (timeType.equals("EventTime")) {
if (EVENT_TIME.equals(timeType)) {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
}
else if (timeType.equals("IngestionTime")) {
else if (INGESTION_TIME.equals(timeType)) {
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
}

Expand All @@ -253,7 +291,7 @@ else if (timeType.equals("IngestionTime")) {

// add async operator to streaming job
DataStream<String> result;
if (mode.equals("ordered")) {
if (ORDERED.equals(mode)) {
result = AsyncDataStream.orderedWait(inputStream, function, 20).setParallelism(taskNum);
}
else {
Expand All @@ -262,14 +300,16 @@ else if (timeType.equals("IngestionTime")) {

// add a reduce to get the sum of each keys.
result.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
private static final long serialVersionUID = -938116068682344455L;

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<>(value, 1));
}
}).keyBy(0).sum(1).print();

// execute the program
env.execute("Async I/O Example");
env.execute("Async IO Example");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@
* <p>
* For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
* the result can be collected by calling {@link AsyncCollector#collect}. For each async
* operations, their contexts are buffered in the operator immediately after invoking
* #asyncInvoke, leading to no blocking for each stream input as long as internal buffer is not full.
* operation, its context is stored in the operator immediately after invoking
* #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
* <p>
* {@link AsyncCollector} can be passed into callbacks or futures provided by async client to
* fetch result data. Any error can also be propagate to the operator by {@link AsyncCollector#collect(Throwable)}.
* {@link AsyncCollector} can be passed into callbacks or futures to collect the result data.
* An error can also be propagate to the async IO operator by
* {@link AsyncCollector#collect(Throwable)}.
*
* <p>
* Typical usage for callback:
* Callback example usage:
* <pre>{@code
* public class HBaseAsyncFunc implements AsyncFunction<String, String> {
* @Override
Expand All @@ -46,11 +47,10 @@
* hbase.asyncGet(get, cb);
* }
* }
* }
* </pre>
*
* <p>
* Typical usage for {@link com.google.common.util.concurrent.ListenableFuture}
* Future example usage:
* <pre>{@code
* public class HBaseAsyncFunc implements AsyncFunction<String, String> {
* @Override
Expand All @@ -68,7 +68,6 @@
* });
* }
* }
* }
* </pre>
*
* @param <IN> The type of the input elements.
Expand All @@ -80,9 +79,10 @@ public interface AsyncFunction<IN, OUT> extends Function, Serializable {
/**
* Trigger async operation for each stream input.
*
* @param input Stream Input
* @param collector AsyncCollector
* @exception Exception will make task fail and trigger fail-over process.
* @param input element coming from an upstream task
* @param collector to collect the result data
* @exception Exception in case of a user code error. An exception will make the task fail and
* trigger fail-over process.
*/
void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
}
Loading

0 comments on commit ad603d5

Please sign in to comment.