Skip to content

Commit

Permalink
[BEAM-9565] Fix threading issue with WatermarkEstimatorsTest
Browse files Browse the repository at this point in the history
The issue was that the thread was calling an update method that wasn't going through the wrapper object which was providing synchronization.
  • Loading branch information
lukecwik committed Mar 22, 2020
1 parent 0351b49 commit a28d800
Showing 1 changed file with 13 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
Expand All @@ -35,46 +34,40 @@
/** Tests for {@link WatermarkEstimators}. */
@RunWith(JUnit4.class)
public class WatermarkEstimatorsTest {

@Test
public void testThreadSafeWatermarkEstimator() throws Exception {
Instant[] reference = new Instant[] {GlobalWindow.TIMESTAMP_MIN_VALUE};
WatermarkEstimator<Instant> watermarkEstimator =
new WatermarkEstimator<Instant>() {
private Instant currentWatermark = GlobalWindow.TIMESTAMP_MIN_VALUE;

@Override
public Instant currentWatermark() {
return reference[0];
currentWatermark = reference[0];
return currentWatermark;
}

@Override
public Instant getState() {
return reference[0];
return currentWatermark;
}
};
testWatermarkEstimatorSnapshotsStateWithCompetingThread(
watermarkEstimator, (instant) -> reference[0] = instant);
}

@Test
public void testThreadSafeManualWatermarkEstimator() throws Exception {
ManualWatermarkEstimator<Instant> watermarkEstimator =
new org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual(
GlobalWindow.TIMESTAMP_MIN_VALUE);
testWatermarkEstimatorSnapshotsStateWithCompetingThread(
watermarkEstimator, watermarkEstimator::setWatermark);
WatermarkEstimators.threadSafe(watermarkEstimator), (instant) -> reference[0] = instant);
}

@Test
public void testThreadSafeTimestampObservingWatermarkEstimator() throws Exception {
TimestampObservingWatermarkEstimator<Instant> watermarkEstimator =
new org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators
.MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE);
WatermarkEstimators.WatermarkAndStateObserver<Instant> threadsafeWatermarkEstimator =
WatermarkEstimators.threadSafe(
new org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators
.MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE));
testWatermarkEstimatorSnapshotsStateWithCompetingThread(
watermarkEstimator, watermarkEstimator::observeTimestamp);
threadsafeWatermarkEstimator,
((TimestampObservingWatermarkEstimator) threadsafeWatermarkEstimator)::observeTimestamp);
}

public <WatermarkEstimatorT extends WatermarkEstimator<Instant>>
public <WatermarkEstimatorT extends WatermarkEstimators.WatermarkAndStateObserver<Instant>>
void testWatermarkEstimatorSnapshotsStateWithCompetingThread(
WatermarkEstimatorT watermarkEstimator, Consumer<Instant> watermarkUpdater)
throws Exception {
Expand All @@ -89,14 +82,11 @@ void testWatermarkEstimatorSnapshotsStateWithCompetingThread(
});
t.start();

WatermarkEstimators.WatermarkAndStateObserver<Instant> threadsafeWatermarkEstimator =
WatermarkEstimators.threadSafe(watermarkEstimator);

// Ensure the thread has started before we start fetching values.
countDownLatch.await();
Instant currentMinimum = GlobalWindow.TIMESTAMP_MIN_VALUE;
for (int i = 0; i < 100; ++i) {
KV<Instant, Instant> value = threadsafeWatermarkEstimator.getWatermarkAndState();
KV<Instant, Instant> value = watermarkEstimator.getWatermarkAndState();
// The watermark estimators that we use ensure that state == current watermark so test that
// they are equal here
assertEquals(value.getKey(), value.getValue());
Expand Down

0 comments on commit a28d800

Please sign in to comment.