Skip to content

Commit

Permalink
Merge pull request apache#11190 from lukecwik/beam9565
Browse files Browse the repository at this point in the history
[BEAM-9565] Fix threading issue with WatermarkEstimatorsTest
  • Loading branch information
lukecwik committed Mar 23, 2020
2 parents 3384572 + a28d800 commit 7310ec2
Showing 1 changed file with 13 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
Expand All @@ -35,46 +34,40 @@
/** Tests for {@link WatermarkEstimators}. */
@RunWith(JUnit4.class)
public class WatermarkEstimatorsTest {

@Test
public void testThreadSafeWatermarkEstimator() throws Exception {
Instant[] reference = new Instant[] {GlobalWindow.TIMESTAMP_MIN_VALUE};
WatermarkEstimator<Instant> watermarkEstimator =
new WatermarkEstimator<Instant>() {
private Instant currentWatermark = GlobalWindow.TIMESTAMP_MIN_VALUE;

@Override
public Instant currentWatermark() {
return reference[0];
currentWatermark = reference[0];
return currentWatermark;
}

@Override
public Instant getState() {
return reference[0];
return currentWatermark;
}
};
testWatermarkEstimatorSnapshotsStateWithCompetingThread(
watermarkEstimator, (instant) -> reference[0] = instant);
}

@Test
public void testThreadSafeManualWatermarkEstimator() throws Exception {
ManualWatermarkEstimator<Instant> watermarkEstimator =
new org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual(
GlobalWindow.TIMESTAMP_MIN_VALUE);
testWatermarkEstimatorSnapshotsStateWithCompetingThread(
watermarkEstimator, watermarkEstimator::setWatermark);
WatermarkEstimators.threadSafe(watermarkEstimator), (instant) -> reference[0] = instant);
}

@Test
public void testThreadSafeTimestampObservingWatermarkEstimator() throws Exception {
TimestampObservingWatermarkEstimator<Instant> watermarkEstimator =
new org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators
.MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE);
WatermarkEstimators.WatermarkAndStateObserver<Instant> threadsafeWatermarkEstimator =
WatermarkEstimators.threadSafe(
new org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators
.MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE));
testWatermarkEstimatorSnapshotsStateWithCompetingThread(
watermarkEstimator, watermarkEstimator::observeTimestamp);
threadsafeWatermarkEstimator,
((TimestampObservingWatermarkEstimator) threadsafeWatermarkEstimator)::observeTimestamp);
}

public <WatermarkEstimatorT extends WatermarkEstimator<Instant>>
public <WatermarkEstimatorT extends WatermarkEstimators.WatermarkAndStateObserver<Instant>>
void testWatermarkEstimatorSnapshotsStateWithCompetingThread(
WatermarkEstimatorT watermarkEstimator, Consumer<Instant> watermarkUpdater)
throws Exception {
Expand All @@ -89,14 +82,11 @@ void testWatermarkEstimatorSnapshotsStateWithCompetingThread(
});
t.start();

WatermarkEstimators.WatermarkAndStateObserver<Instant> threadsafeWatermarkEstimator =
WatermarkEstimators.threadSafe(watermarkEstimator);

// Ensure the thread has started before we start fetching values.
countDownLatch.await();
Instant currentMinimum = GlobalWindow.TIMESTAMP_MIN_VALUE;
for (int i = 0; i < 100; ++i) {
KV<Instant, Instant> value = threadsafeWatermarkEstimator.getWatermarkAndState();
KV<Instant, Instant> value = watermarkEstimator.getWatermarkAndState();
// The watermark estimators that we use ensure that state == current watermark so test that
// they are equal here
assertEquals(value.getKey(), value.getValue());
Expand Down

0 comments on commit 7310ec2

Please sign in to comment.