Skip to content

Commit

Permalink
Merge pull request apache#7736: [BEAM-5638] Exception handling for Ja…
Browse files Browse the repository at this point in the history
…va MapElements and FlatMapElements
  • Loading branch information
reuvenlax committed Mar 21, 2019
2 parents 133c56d + b12dafd commit 261b401
Show file tree
Hide file tree
Showing 7 changed files with 877 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,8 @@ enum Kind {

/** PCollection Schema support in Beam. */
SCHEMAS,

/** Experimental APIs related to exception handling in PTransforms. */
WITH_EXCEPTIONS,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.transforms.Contextful.Fn;
import org.apache.beam.sdk.transforms.WithFailures.ExceptionElement;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;

Expand Down Expand Up @@ -182,4 +186,194 @@ public void populateDisplayData(DisplayData.Builder builder) {
builder.include("fn", (HasDisplayData) originalFnForDisplayData);
}
}

/**
* Returns a new {@link FlatMapWithFailures} transform that catches exceptions raised while
* mapping elements, with the given type descriptor used for the failure collection but the
* exception handler yet to be specified using {@link
* FlatMapWithFailures#exceptionsVia(ProcessFunction)}.
*
* <p>See {@link WithFailures} documentation for usage patterns of the returned {@link
* WithFailures.Result}.
*/
@Experimental(Experimental.Kind.WITH_EXCEPTIONS)
public <NewFailureT> FlatMapWithFailures<InputT, OutputT, NewFailureT> exceptionsInto(
TypeDescriptor<NewFailureT> failureTypeDescriptor) {
return new FlatMapWithFailures<>(
fn, originalFnForDisplayData, inputType, outputType, null, failureTypeDescriptor);
}

/**
* Returns a new {@link FlatMapWithFailures} transform that catches exceptions raised while
* mapping elements, passing the raised exception instance and the input element being processed
* through the given {@code exceptionHandler} and emitting the result to a failure collection.
*
* <p>This method takes advantage of the type information provided by {@link InferableFunction},
* meaning that a call to {@link #exceptionsInto(TypeDescriptor)} may not be necessary.
*
* <p>See {@link WithFailures} documentation for usage patterns of the returned {@link
* WithFailures.Result}.
*
* <p>Example usage:
*
* <pre>{@code
* Result<PCollection<String>, String>> result = words.apply(
* FlatMapElements
* .into(TypeDescriptors.strings())
* // Could throw ArrayIndexOutOfBoundsException
* .via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
* .exceptionsVia(new WithFailures.ExceptionAsMapHandler<String>() {}));
* PCollection<String> output = result.output();
* PCollection<String> failures = result.failures();
* }</pre>
*/
@Experimental(Experimental.Kind.WITH_EXCEPTIONS)
public <FailureT> FlatMapWithFailures<InputT, OutputT, FailureT> exceptionsVia(
InferableFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
return new FlatMapWithFailures<>(
fn,
originalFnForDisplayData,
inputType,
outputType,
exceptionHandler,
exceptionHandler.getOutputTypeDescriptor());
}

/** A {@code PTransform} that adds exception handling to {@link FlatMapElements}. */
@Experimental(Experimental.Kind.WITH_EXCEPTIONS)
public static class FlatMapWithFailures<InputT, OutputT, FailureT>
extends PTransform<PCollection<InputT>, WithFailures.Result<PCollection<OutputT>, FailureT>> {

private final transient TypeDescriptor<InputT> inputType;
private final transient TypeDescriptor<OutputT> outputType;
@Nullable private final transient TypeDescriptor<FailureT> failureType;
private final transient Object originalFnForDisplayData;
@Nullable private final Contextful<Fn<InputT, Iterable<OutputT>>> fn;
@Nullable private final ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler;

FlatMapWithFailures(
@Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn,
Object originalFnForDisplayData,
TypeDescriptor<InputT> inputType,
TypeDescriptor<OutputT> outputType,
@Nullable ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler,
@Nullable TypeDescriptor<FailureT> failureType) {
this.fn = fn;
this.originalFnForDisplayData = originalFnForDisplayData;
this.inputType = inputType;
this.outputType = outputType;
this.exceptionHandler = exceptionHandler;
this.failureType = failureType;
}

/**
* Returns a new {@link FlatMapWithFailures} transform that catches exceptions raised while
* mapping elements, passing the raised exception instance and the input element being processed
* through the given {@code exceptionHandler} and emitting the result to a failure collection.
*
* <p>Example usage:
*
* <pre>{@code
* Result<PCollection<String>, String>> result = words.apply(
* FlatMapElements
* .into(TypeDescriptors.strings())
* // Could throw ArrayIndexOutOfBoundsException
* .via((String line) -> Arrays.asList(Arrays.copyOfRange(line.split(" "), 1, 5)))
* .exceptionsInto(TypeDescriptors.strings())
* .exceptionsVia((ExceptionElement<String> ee) -> ee.exception().getMessage()));
* PCollection<String> output = result.output();
* PCollection<String> failures = result.failures();
* }</pre>
*/
public FlatMapWithFailures<InputT, OutputT, FailureT> exceptionsVia(
ProcessFunction<ExceptionElement<InputT>, FailureT> exceptionHandler) {
return new FlatMapWithFailures<>(
fn, originalFnForDisplayData, inputType, outputType, exceptionHandler, failureType);
}

@Override
public WithFailures.Result<PCollection<OutputT>, FailureT> expand(PCollection<InputT> input) {
checkArgument(exceptionHandler != null, ".exceptionsVia() is required");
MapFn doFn = new MapFn();
PCollectionTuple tuple =
input.apply(
FlatMapWithFailures.class.getSimpleName(),
ParDo.of(doFn)
.withOutputTags(doFn.outputTag, TupleTagList.of(doFn.failureTag))
.withSideInputs(this.fn.getRequirements().getSideInputs()));
return WithFailures.Result.of(tuple, doFn.outputTag, doFn.failureTag);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("class", originalFnForDisplayData.getClass()));
if (originalFnForDisplayData instanceof HasDisplayData) {
builder.include("fn", (HasDisplayData) originalFnForDisplayData);
}
builder.add(DisplayData.item("exceptionHandler.class", exceptionHandler.getClass()));
if (exceptionHandler instanceof HasDisplayData) {
builder.include("exceptionHandler", (HasDisplayData) exceptionHandler);
}
}

/** A concrete TupleTag that allows coder inference based on failureType. */
private class FailureTag extends TupleTag<FailureT> {
@Override
public TypeDescriptor<FailureT> getTypeDescriptor() {
return failureType;
}
}

/** A DoFn implementation that handles exceptions and outputs a secondary failure collection. */
private class MapFn extends DoFn<InputT, OutputT> {

final TupleTag<OutputT> outputTag = new TupleTag<OutputT>() {};
final TupleTag<FailureT> failureTag = new FailureTag();

@ProcessElement
public void processElement(@Element InputT element, MultiOutputReceiver r, ProcessContext c)
throws Exception {
boolean exceptionWasThrown = false;
Iterable<OutputT> res = null;
try {
res = fn.getClosure().apply(c.element(), Fn.Context.wrapProcessContext(c));
} catch (Exception e) {
exceptionWasThrown = true;
ExceptionElement<InputT> exceptionElement = ExceptionElement.of(element, e);
r.get(failureTag).output(exceptionHandler.apply(exceptionElement));
}
// We make sure our outputs occur outside the try block, since runners may implement
// fusion by having output() directly call the body of another DoFn, potentially catching
// exceptions unrelated to this transform.
if (!exceptionWasThrown) {
for (OutputT output : res) {
r.get(outputTag).output(output);
}
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.delegate(FlatMapWithFailures.this);
}

@Override
public TypeDescriptor<InputT> getInputTypeDescriptor() {
return inputType;
}

@Override
public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
checkState(
outputType != null,
"%s output type descriptor was null; "
+ "this probably means that getOutputTypeDescriptor() was called after "
+ "serialization/deserialization, but it is only available prior to "
+ "serialization, for constructing a pipeline and inferring coders",
FlatMapWithFailures.class.getSimpleName());
return outputType;
}
}
}
}
Loading

0 comments on commit 261b401

Please sign in to comment.