Skip to content

Commit

Permalink
[hotfix] [tests] Speed up queryable state IT tests by removing sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Dec 14, 2017
1 parent 0ef7fdd commit 917fbcb
Showing 1 changed file with 22 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
public abstract class AbstractQueryableStateTestBase extends TestLogger {

private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS);
public static final long RETRY_TIMEOUT = 50L;

private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
Expand Down Expand Up @@ -196,11 +197,12 @@ public Integer getKey(Tuple2<Integer, Long> value) {

final AtomicLongArray counts = new AtomicLongArray(numKeys);

final List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new ArrayList<>(numKeys);

boolean allNonZero = false;
while (!allNonZero && deadline.hasTimeLeft()) {
allNonZero = true;

final List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new ArrayList<>(numKeys);
futures.clear();

for (int i = 0; i < numKeys; i++) {
final int key = i;
Expand Down Expand Up @@ -712,7 +714,7 @@ public Integer getKey(Tuple2<Integer, Long> value) {
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -785,7 +787,7 @@ public Integer getKey(Tuple2<Integer, Long> value) {
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -877,7 +879,7 @@ public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<O
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -973,7 +975,7 @@ public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<O
results.put(key, res);
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -1050,7 +1052,7 @@ public Integer getKey(Tuple2<Integer, Long> value) {
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -1129,6 +1131,7 @@ private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple
private final int numKeys;
private final ThreadLocalRandom random = ThreadLocalRandom.current();
private volatile boolean isRunning = true;
private int counter = 0;

TestKeyRangeSource(int numKeys) {
this.numKeys = numKeys;
Expand All @@ -1151,9 +1154,13 @@ public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
synchronized (ctx.getCheckpointLock()) {
record.f0 = random.nextInt(numKeys);
ctx.collect(record);
counter++;
}

if (counter % 50 == 0) {
// mild slow down
Thread.sleep(1L);
}
// mild slow down
Thread.sleep(1L);
}
}

Expand Down Expand Up @@ -1327,7 +1334,7 @@ private static <K, S extends State, V> CompletableFuture<S> getKvState(
final TypeInformation<K> keyTypeInfo,
final StateDescriptor<S, V> stateDescriptor,
final boolean failForUnknownKeyOrNamespace,
final ScheduledExecutor executor) throws InterruptedException {
final ScheduledExecutor executor) {

final CompletableFuture<S> resultFuture = new CompletableFuture<>();
getKvStateIgnoringCertainExceptions(
Expand All @@ -1346,10 +1353,9 @@ private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
final TypeInformation<K> keyTypeInfo,
final StateDescriptor<S, V> stateDescriptor,
final boolean failForUnknownKeyOrNamespace,
final ScheduledExecutor executor) throws InterruptedException {
final ScheduledExecutor executor) {

if (!resultFuture.isDone()) {
Thread.sleep(100L);
CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
expected.whenCompleteAsync((result, throwable) -> {
if (throwable != null) {
Expand All @@ -1360,13 +1366,9 @@ private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
) {
resultFuture.completeExceptionally(throwable.getCause());
} else if (deadline.hasTimeLeft()) {
try {
getKvStateIgnoringCertainExceptions(
deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
stateDescriptor, failForUnknownKeyOrNamespace, executor);
} catch (InterruptedException e) {
e.printStackTrace();
}
getKvStateIgnoringCertainExceptions(
deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
stateDescriptor, failForUnknownKeyOrNamespace, executor);
}
} else {
resultFuture.complete(result);
Expand Down Expand Up @@ -1410,7 +1412,7 @@ private void executeValueQuery(
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down

0 comments on commit 917fbcb

Please sign in to comment.