Skip to content

Commit

Permalink
[BEAM-7193] ParDoLifecycleTest: remove duplicated inner class
Browse files Browse the repository at this point in the history
  • Loading branch information
adude3141 committed Apr 30, 2019
1 parent 21f6320 commit 4474978
Showing 1 changed file with 15 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,62 +49,6 @@ public class ParDoLifecycleTest implements Serializable {

@Rule public final transient TestPipeline p = TestPipeline.create();

private static class CallSequenceEnforcingDoFn<T> extends DoFn<T, T> {
private boolean setupCalled = false;
private int startBundleCalls = 0;
private int finishBundleCalls = 0;
private boolean teardownCalled = false;

@Setup
public void setup() {
assertThat("setup should not be called twice", setupCalled, is(false));
assertThat("setup should be called before startBundle", startBundleCalls, equalTo(0));
assertThat("setup should be called before finishBundle", finishBundleCalls, equalTo(0));
assertThat("setup should be called before teardown", teardownCalled, is(false));
setupCalled = true;
}

@StartBundle
public void startBundle() {
assertThat("setup should have been called", setupCalled, is(true));
assertThat(
"Even number of startBundle and finishBundle calls in startBundle",
startBundleCalls,
equalTo(finishBundleCalls));
assertThat("teardown should not have been called", teardownCalled, is(false));
startBundleCalls++;
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
assertThat(
"there should be one startBundle call with no call to finishBundle",
startBundleCalls,
equalTo(finishBundleCalls + 1));
assertThat("teardown should not have been called", teardownCalled, is(false));
}

@FinishBundle
public void finishBundle() {
assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
assertThat(
"there should be one bundle that has been started but not finished",
startBundleCalls,
equalTo(finishBundleCalls + 1));
assertThat("teardown should not have been called", teardownCalled, is(false));
finishBundleCalls++;
}

@Teardown
public void teardown() {
assertThat(setupCalled, is(true));
assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls)));
assertThat(teardownCalled, is(false));
teardownCalled = true;
}
}

@Test
@Category({ValidatesRunner.class, UsesParDoLifecycle.class})
public void testFnCallSequence() {
Expand Down Expand Up @@ -201,7 +145,7 @@ public void after() {
}

private static class CallSequenceEnforcingStatefulFn<K, V>
extends CallSequenceEnforcingDoFn<KV<K, V>> {
extends CallSequenceEnforcingFn<KV<K, V>> {
private static final String STATE_ID = "foo";

@StateId(STATE_ID)
Expand All @@ -211,161 +155,115 @@ private static class CallSequenceEnforcingStatefulFn<K, V>
@Test
@Category({ValidatesRunner.class, UsesParDoLifecycle.class})
public void testTeardownCalledAfterExceptionInStartBundle() {
ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
ExceptionThrowingFn fn = new ExceptionThrowingFn(MethodForException.START_BUNDLE);
p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
try {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
assertThat(
"Function should have been torn down after exception",
ExceptionThrowingOldFn.teardownCalled.get(),
ExceptionThrowingFn.teardownCalled.get(),
is(true));
}
}

@Test
@Category({ValidatesRunner.class, UsesParDoLifecycle.class})
public void testTeardownCalledAfterExceptionInProcessElement() {
ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
ExceptionThrowingFn fn = new ExceptionThrowingFn(MethodForException.PROCESS_ELEMENT);
p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
try {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
assertThat(
"Function should have been torn down after exception",
ExceptionThrowingOldFn.teardownCalled.get(),
ExceptionThrowingFn.teardownCalled.get(),
is(true));
}
}

@Test
@Category({ValidatesRunner.class, UsesParDoLifecycle.class})
public void testTeardownCalledAfterExceptionInFinishBundle() {
ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
ExceptionThrowingFn fn = new ExceptionThrowingFn(MethodForException.FINISH_BUNDLE);
p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
try {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
assertThat(
"Function should have been torn down after exception",
ExceptionThrowingOldFn.teardownCalled.get(),
ExceptionThrowingFn.teardownCalled.get(),
is(true));
}
}

@Test
@Category({ValidatesRunner.class, UsesParDoLifecycle.class})
public void testWithContextTeardownCalledAfterExceptionInSetup() {
ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP);
ExceptionThrowingFn fn = new ExceptionThrowingFn(MethodForException.SETUP);
p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
try {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
assertThat(
"Function should have been torn down after exception",
ExceptionThrowingOldFn.teardownCalled.get(),
ExceptionThrowingFn.teardownCalled.get(),
is(true));
}
}

@Test
@Category({ValidatesRunner.class, UsesParDoLifecycle.class})
public void testWithContextTeardownCalledAfterExceptionInStartBundle() {
ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE);
ExceptionThrowingFn fn = new ExceptionThrowingFn(MethodForException.START_BUNDLE);
p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
try {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
assertThat(
"Function should have been torn down after exception",
ExceptionThrowingOldFn.teardownCalled.get(),
ExceptionThrowingFn.teardownCalled.get(),
is(true));
}
}

@Test
@Category({ValidatesRunner.class, UsesParDoLifecycle.class})
public void testWithContextTeardownCalledAfterExceptionInProcessElement() {
ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT);
ExceptionThrowingFn fn = new ExceptionThrowingFn(MethodForException.PROCESS_ELEMENT);
p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
try {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
assertThat(
"Function should have been torn down after exception",
ExceptionThrowingOldFn.teardownCalled.get(),
ExceptionThrowingFn.teardownCalled.get(),
is(true));
}
}

@Test
@Category({ValidatesRunner.class, UsesParDoLifecycle.class})
public void testWithContextTeardownCalledAfterExceptionInFinishBundle() {
ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE);
ExceptionThrowingFn fn = new ExceptionThrowingFn(MethodForException.FINISH_BUNDLE);
p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn));
try {
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
assertThat(
"Function should have been torn down after exception",
ExceptionThrowingOldFn.teardownCalled.get(),
ExceptionThrowingFn.teardownCalled.get(),
is(true));
}
}

private static class ExceptionThrowingOldFn extends DoFn<Object, Object> {
static AtomicBoolean teardownCalled = new AtomicBoolean(false);

private final MethodForException toThrow;
private boolean thrown;

private ExceptionThrowingOldFn(MethodForException toThrow) {
this.toThrow = toThrow;
}

@Setup
public void setup() throws Exception {
throwIfNecessary(MethodForException.SETUP);
}

@StartBundle
public void startBundle() throws Exception {
throwIfNecessary(MethodForException.START_BUNDLE);
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
throwIfNecessary(MethodForException.PROCESS_ELEMENT);
}

@FinishBundle
public void finishBundle() throws Exception {
throwIfNecessary(MethodForException.FINISH_BUNDLE);
}

private void throwIfNecessary(MethodForException method) throws Exception {
if (toThrow == method && !thrown) {
thrown = true;
throw new Exception("Hasn't yet thrown");
}
}

@Teardown
public void teardown() {
if (!thrown) {
fail("Excepted to have a processing method throw an exception");
}
teardownCalled.set(true);
}
}

private static class ExceptionThrowingFn extends DoFn<Object, Object> {
static AtomicBoolean teardownCalled = new AtomicBoolean(false);

Expand Down

0 comments on commit 4474978

Please sign in to comment.