Skip to content

Commit

Permalink
PCollectionCustomCoderTest updates to fix test to actually function.
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonkuster committed Nov 2, 2018
1 parent 0f9ef7c commit 728ba71
Showing 1 changed file with 113 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,46 @@
*/
package org.apache.beam.sdk.coders;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.junit.Assert.assertThat;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/** Tests for coder exception handling in runners. */
@RunWith(JUnit4.class)
public class PCollectionCustomCoderTest {
private static final Logger LOG = LoggerFactory.getLogger(PCollectionCustomCoderTest.class);
/**
* A custom test coder that can throw various exceptions during:
*
Expand All @@ -58,7 +70,8 @@ public class PCollectionCustomCoderTest {
static final String NULL_POINTER_EXCEPTION = "java.lang.NullPointerException";
static final String EXCEPTION_MESSAGE = "Super Unique Message!!!";

@Rule public ExpectedException thrown = ExpectedException.none();
@Rule public final transient ExpectedException thrown = ExpectedException.none();
@Rule public final transient TestPipeline pipeline = TestPipeline.create();

/** Wrapper of StringUtf8Coder with customizable exception-throwing. */
public static class CustomTestCoder extends CustomCoder<String> {
Expand Down Expand Up @@ -148,104 +161,164 @@ private void throwIfPresent(String exceptionClassName) throws IOException {
@Test
@Category(NeedsRunner.class)
public void testDecodingIOException() throws Exception {
thrown.expect(Exception.class);
thrown.expectCause(instanceOf(IOException.class));
Pipeline p =
runPipelineWith(new CustomTestCoder(IO_EXCEPTION, null, null, null, EXCEPTION_MESSAGE));
pipelineWith(new CustomTestCoder(IO_EXCEPTION, null, null, null, EXCEPTION_MESSAGE));

thrown.expect(new ExceptionMatcher("java.io.IOException: Super Unique Message!!!"));
p.run().waitUntilFinish();
}

@Test
@Category(NeedsRunner.class)
public void testDecodingNPException() throws Exception {
thrown.expect(RuntimeException.class);
thrown.expectMessage("java.lang.NullPointerException: Super Unique Message!!!");

Pipeline p =
runPipelineWith(
pipelineWith(
new CustomTestCoder(NULL_POINTER_EXCEPTION, null, null, null, EXCEPTION_MESSAGE));
thrown.expect(new ExceptionMatcher("java.lang.NullPointerException: Super Unique Message!!!"));

p.run().waitUntilFinish();
}

@Test
@Category(NeedsRunner.class)
public void testEncodingIOException() throws Exception {
thrown.expect(Exception.class);
thrown.expectCause(instanceOf(IOException.class));

Pipeline p =
runPipelineWith(new CustomTestCoder(null, IO_EXCEPTION, null, null, EXCEPTION_MESSAGE));
pipelineWith(new CustomTestCoder(null, IO_EXCEPTION, null, null, EXCEPTION_MESSAGE));
thrown.expect(new ExceptionMatcher("java.io.IOException: Super Unique Message!!!"));

p.run().waitUntilFinish();
}

@Test
@Category(NeedsRunner.class)
public void testEncodingNPException() throws Exception {
thrown.expect(RuntimeException.class);
thrown.expectMessage("java.lang.NullPointerException: Super Unique Message!!!");
Pipeline p =
runPipelineWith(
pipelineWith(
new CustomTestCoder(null, NULL_POINTER_EXCEPTION, null, null, EXCEPTION_MESSAGE));
thrown.expect(new ExceptionMatcher("java.lang.NullPointerException: Super Unique Message!!!"));
p.run().waitUntilFinish();
}

@Test
@Category(NeedsRunner.class)
public void testSerializationIOException() throws Exception {
thrown.expect(Exception.class);
thrown.expectCause(instanceOf(IOException.class));
Pipeline p =
runPipelineWith(new CustomTestCoder(null, null, IO_EXCEPTION, null, EXCEPTION_MESSAGE));
pipelineWith(new CustomTestCoder(null, null, IO_EXCEPTION, null, EXCEPTION_MESSAGE));
thrown.expect(new ExceptionMatcher("java.io.IOException: Super Unique Message!!!"));
p.run().waitUntilFinish();
}

@Test
@Category(NeedsRunner.class)
public void testSerializationNPException() throws Exception {
thrown.expect(RuntimeException.class);
thrown.expectMessage("java.lang.NullPointerException: Super Unique Message!!!");

Pipeline p =
runPipelineWith(
pipelineWith(
new CustomTestCoder(null, null, NULL_POINTER_EXCEPTION, null, EXCEPTION_MESSAGE));
thrown.expect(new ExceptionMatcher("java.lang.NullPointerException: Super Unique Message!!!"));

p.run().waitUntilFinish();
}

@Test
@Category(NeedsRunner.class)
public void testDeserializationIOException() throws Exception {
thrown.expect(Exception.class);
thrown.expectCause(instanceOf(IOException.class));
Pipeline p =
runPipelineWith(new CustomTestCoder(null, null, null, IO_EXCEPTION, EXCEPTION_MESSAGE));
pipelineWith(new CustomTestCoder(null, null, null, IO_EXCEPTION, EXCEPTION_MESSAGE));
thrown.expect(new ExceptionMatcher("java.io.IOException: Super Unique Message!!!"));
p.run().waitUntilFinish();
}

@Test
@Category(NeedsRunner.class)
public void testDeserializationNPException() throws Exception {
thrown.expect(RuntimeException.class);
thrown.expectMessage("java.lang.NullPointerException: Super Unique Message!!!");

Pipeline p =
runPipelineWith(
pipelineWith(
new CustomTestCoder(null, null, null, NULL_POINTER_EXCEPTION, EXCEPTION_MESSAGE));
thrown.expect(new ExceptionMatcher("java.lang.NullPointerException: Super Unique Message!!!"));
p.run().waitUntilFinish();
}

@Test
@Category(NeedsRunner.class)
public void testNoException() throws Exception {
Pipeline p = runPipelineWith(new CustomTestCoder(null, null, null, null, null));
Pipeline p = pipelineWith(new CustomTestCoder(null, null, null, null, null));
p.run().waitUntilFinish();
}

public static Pipeline runPipelineWith(CustomTestCoder coder) throws Exception {
PipelineOptions options = PipelineOptionsFactory.fromArgs().as(PipelineOptions.class);

public Pipeline pipelineWith(CustomTestCoder coder) throws Exception {
List<String> pipelineContents =
Arrays.asList("String", "Testing", "Custom", "Coder", "In", "Beam");

// Create input.
Pipeline pipeline = TestPipeline.create(options);
PCollection<String> customCoderPC =
pipeline.begin().apply("ReadStrings", Create.of(pipelineContents)).setCoder(coder);
PAssert.that(customCoderPC).containsInAnyOrder(pipelineContents);
pipeline.run().waitUntilFinish();
pipeline.begin()
.apply("ReadStrings", Create.of(pipelineContents))
.apply(Reshuffle.viaRandomKey());
customCoderPC.setCoder(coder);
PCollection<String> fixedCoderPC =
customCoderPC.apply("Identity", ParDo.of(new IdentityDoFn()));
fixedCoderPC.setCoder(StringUtf8Coder.of());
ContentReader r = ContentReader.elementsEqual(pipelineContents);
// PAssert.that relies on
PAssert.that(fixedCoderPC).satisfies(r);

return pipeline;
}

static class IdentityDoFn extends DoFn<String, String> {
@ProcessElement
public void process(ProcessContext c) {
c.output(c.element());
}
}

static class ContentReader implements SerializableFunction<Iterable<String>, Void> {
private final String[] expected;

public static ContentReader elementsEqual(Iterable<String> expected) {
return new ContentReader(expected);
}

private ContentReader(Iterable<String> expected) {
ArrayList<String> ret = new ArrayList<>();
for (String t : expected) {
ret.add(t);
}
this.expected = ret.toArray(new String[ret.size()]);
}

@Override
public Void apply(Iterable<String> contents) {
assertThat(contents, containsInAnyOrder(expected));
return null;
}
}

static class ExceptionMatcher extends BaseMatcher<Object> {
private String expectedError;

public ExceptionMatcher(String expected) {
this.expectedError = expected;
}

@Override
public boolean matches(Object result) {
Throwable cause = ((Throwable) result).getCause();
while (null != cause) {
String causeString = cause.toString();
if (causeString.contains(expectedError)) {
return true;
}
cause = cause.getCause();
}

return false;
}

@Override
public void describeTo(Description descr) {
descr.appendText("exception with text matching: " + expectedError);
}
}
}

0 comments on commit 728ba71

Please sign in to comment.