Skip to content

Commit

Permalink
[FLINK-10531][e2e] Fix unstable TTL end-to-end test.
Browse files Browse the repository at this point in the history
This closes apache#7036.
  • Loading branch information
kl0u committed Nov 16, 2018
1 parent f994ca9 commit 52c2ad0
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;

Expand Down Expand Up @@ -74,6 +75,8 @@ public static void main(String[] args) throws Exception {

setupEnvironment(env, pt);

final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);

int keySpace = pt.getInt(UPDATE_GENERATOR_SRC_KEYSPACE.key(), UPDATE_GENERATOR_SRC_KEYSPACE.defaultValue());
long sleepAfterElements = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue());
Expand All @@ -90,11 +93,27 @@ public static void main(String[] args) throws Exception {
.addSource(new TtlStateUpdateSource(keySpace, sleepAfterElements, sleepTime))
.name("TtlStateUpdateSource")
.keyBy(TtlStateUpdate::getKey)
.flatMap(new TtlVerifyUpdateFunction(ttlConfig, reportStatAfterUpdatesNum))
.flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, reportStatAfterUpdatesNum))
.name("TtlVerifyUpdateFunction")
.addSink(new PrintSinkFunction<>())
.name("PrintFailedVerifications");

env.execute("State TTL test job");
}

/**
* Sets the state backend to a new {@link StubStateBackend} which has a {@link MonotonicTTLTimeProvider}.
*
* @param env The {@link StreamExecutionEnvironment} of the job.
* @return The {@link MonotonicTTLTimeProvider}.
*/
private static MonotonicTTLTimeProvider setBackendWithCustomTTLTimeProvider(StreamExecutionEnvironment env) {
final MonotonicTTLTimeProvider ttlTimeProvider = new MonotonicTTLTimeProvider();

final StateBackend configuredBackend = env.getStateBackend();
final StateBackend stubBackend = new StubStateBackend(configuredBackend, ttlTimeProvider);
env.setStateBackend(stubBackend);

return ttlTimeProvider;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.tests;

import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.Serializable;

/**
* A stub implementation of a {@link TtlTimeProvider} which guarantees that
* processing time increases monotonically.
*/
@NotThreadSafe
final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable {

private static final long serialVersionUID = 1L;

/*
* The following variables are static because the whole TTLTimeProvider will go
* through serialization and, eventually, the state backend and the task executing
* the TtlVerifyUpdateFunction will have different instances of it.
*
* If these were not static, then the TtlVerifyUpdateFunction would e.g. freeze
* the time, but the backend would not be notified about it, resulting in inconsistent
* state.
*
* If the number of task slots per TM changes, then we may need to add also synchronization.
*/

private static boolean timeIsFrozen = false;

private static long lastReturnedProcessingTime = Long.MIN_VALUE;

@Override
public long currentTimestamp() {
if (timeIsFrozen && lastReturnedProcessingTime != Long.MIN_VALUE) {
return lastReturnedProcessingTime;
}

timeIsFrozen = true;

final long currentProcessingTime = System.currentTimeMillis();
if (currentProcessingTime < lastReturnedProcessingTime) {
return lastReturnedProcessingTime;
}

lastReturnedProcessingTime = currentProcessingTime;
return lastReturnedProcessingTime;
}

long unfreezeTime() {
timeIsFrozen = false;
return lastReturnedProcessingTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.tests;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A stub implementation of the {@link StateBackend} that allows the use of
* a custom {@link TtlTimeProvider}.
*/
final class StubStateBackend implements StateBackend {

private static final long serialVersionUID = 1L;

private final TtlTimeProvider ttlTimeProvider;

private final StateBackend backend;

StubStateBackend(final StateBackend wrappedBackend, final TtlTimeProvider ttlTimeProvider) {
this.backend = checkNotNull(wrappedBackend);
this.ttlTimeProvider = checkNotNull(ttlTimeProvider);
}

@Override
public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
return backend.resolveCheckpoint(externalPointer);
}

@Override
public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
return backend.createCheckpointStorage(jobId);
}

@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup) throws Exception {

return backend.createKeyedStateBackend(
env,
jobID,
operatorIdentifier,
keySerializer,
numberOfKeyGroups,
keyGroupRange,
kvStateRegistry,
this.ttlTimeProvider,
metricGroup
);
}

@Override
public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
return backend.createOperatorStateBackend(env, operatorIdentifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

/** Randomly generated keyed state updates per state type. */
class TtlStateUpdate implements Serializable {

private static final long serialVersionUID = 1L;

private final int key;

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
* and waits for {@code sleepTime} to continue generation.
*/
class TtlStateUpdateSource extends RichParallelSourceFunction<TtlStateUpdate> {

private static final long serialVersionUID = 1L;

private final int maxKey;
private final long sleepAfterElements;
private final long sleepTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,21 @@
import org.apache.flink.streaming.tests.verify.TtlVerificationContext;
import org.apache.flink.streaming.tests.verify.ValueWithTs;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Update state with TTL for each verifier.
*
Expand All @@ -62,21 +63,26 @@
* - verifies last update against previous updates
* - emits verification context in case of failure
*/
class TtlVerifyUpdateFunction
extends RichFlatMapFunction<TtlStateUpdate, String> implements CheckpointedFunction {
class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, String> implements CheckpointedFunction {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(TtlVerifyUpdateFunction.class);

@Nonnull
private final StateTtlConfig ttlConfig;
private final long ttl;
private final MonotonicTTLTimeProvider ttlTimeProvider;
private final UpdateStat stat;

private transient Map<String, State> states;
private transient Map<String, ListState<ValueWithTs<?>>> prevUpdatesByVerifierId;

TtlVerifyUpdateFunction(@Nonnull StateTtlConfig ttlConfig, long reportStatAfterUpdatesNum) {
TtlVerifyUpdateFunction(
@Nonnull StateTtlConfig ttlConfig,
MonotonicTTLTimeProvider ttlTimeProvider,
long reportStatAfterUpdatesNum) {
this.ttlConfig = ttlConfig;
this.ttl = ttlConfig.getTtl().toMilliseconds();
this.ttlTimeProvider = checkNotNull(ttlTimeProvider);
this.stat = new UpdateStat(reportStatAfterUpdatesNum);
}

Expand All @@ -91,17 +97,13 @@ public void flatMap(TtlStateUpdate updates, Collector<String> out) throws Except
}

private TtlVerificationContext<?, ?> generateUpdateAndVerificationContext(
TtlStateUpdate updates, TtlStateVerifier<?, ?> verifier) throws Exception {
TtlStateUpdate updates,
TtlStateVerifier<?, ?> verifier) throws Exception {

List<ValueWithTs<?>> prevUpdates = getPrevUpdates(verifier.getId());
Object update = updates.getUpdate(verifier.getId());
TtlUpdateContext<?, ?> updateContext = performUpdate(verifier, update);
boolean clashes = updateClashesWithPrevUpdates(updateContext.getUpdateWithTs(), prevUpdates);
if (clashes) {
resetState(verifier.getId());
prevUpdates = Collections.emptyList();
updateContext = performUpdate(verifier, update);
}
stat.update(clashes, prevUpdates.size());
stat.update(prevUpdates.size());
prevUpdatesByVerifierId.get(verifier.getId()).add(updateContext.getUpdateWithTs());
return new TtlVerificationContext<>(updates.getKey(), verifier.getId(), prevUpdates, updateContext);
}
Expand All @@ -113,33 +115,22 @@ private List<ValueWithTs<?>> getPrevUpdates(String verifierId) throws Exception
}

private TtlUpdateContext<?, ?> performUpdate(
TtlStateVerifier<?, ?> verifier, Object update) throws Exception {
TtlStateVerifier<?, ?> verifier,
Object update) throws Exception {

final long timestampBeforeUpdate = ttlTimeProvider.currentTimestamp();
State state = states.get(verifier.getId());
long timestampBeforeUpdate = System.currentTimeMillis();
Object valueBeforeUpdate = verifier.get(state);
verifier.update(state, update);
Object updatedValue = verifier.get(state);
return new TtlUpdateContext<>(timestampBeforeUpdate,
valueBeforeUpdate, update, updatedValue, System.currentTimeMillis());
}
final long timestampAfterUpdate = ttlTimeProvider.unfreezeTime();

private boolean updateClashesWithPrevUpdates(ValueWithTs<?> update, List<ValueWithTs<?>> prevUpdates) {
return tooSlow(update) ||
(!prevUpdates.isEmpty() && prevUpdates.stream().anyMatch(pu -> updatesClash(pu, update)));
}

private boolean tooSlow(ValueWithTs<?> update) {
return update.getTimestampAfterUpdate() - update.getTimestampBeforeUpdate() >= ttl;
}
checkState(
timestampAfterUpdate == timestampBeforeUpdate,
"Timestamps before and after the update do not match."
);

private boolean updatesClash(ValueWithTs<?> prevUpdate, ValueWithTs<?> nextUpdate) {
return prevUpdate.getTimestampAfterUpdate() + ttl >= nextUpdate.getTimestampBeforeUpdate() &&
prevUpdate.getTimestampBeforeUpdate() + ttl <= nextUpdate.getTimestampAfterUpdate();
}

private void resetState(String verifierId) {
states.get(verifierId).clear();
prevUpdatesByVerifierId.get(verifierId).clear();
return new TtlUpdateContext<>(valueBeforeUpdate, update, updatedValue, timestampAfterUpdate);
}

@Override
Expand All @@ -153,7 +144,7 @@ public void initializeState(FunctionInitializationContext context) {
.collect(Collectors.toMap(TtlStateVerifier::getId, v -> v.createState(context, ttlConfig)));
prevUpdatesByVerifierId = TtlStateVerifier.VERIFIERS.stream()
.collect(Collectors.toMap(TtlStateVerifier::getId, v -> {
Preconditions.checkNotNull(v);
checkNotNull(v);
TypeSerializer<ValueWithTs<?>> typeSerializer = new ValueWithTs.Serializer(v.getUpdateSerializer());
ListStateDescriptor<ValueWithTs<?>> stateDesc = new ListStateDescriptor<>(
"TtlPrevValueState_" + v.getId(), typeSerializer);
Expand All @@ -165,22 +156,17 @@ public void initializeState(FunctionInitializationContext context) {
private static class UpdateStat implements Serializable {
final long reportStatAfterUpdatesNum;
long updates = 0;
long clashes = 0;
long prevUpdatesNum = 0;

UpdateStat(long reportStatAfterUpdatesNum) {
this.reportStatAfterUpdatesNum = reportStatAfterUpdatesNum;
}

void update(boolean clash, long prevUpdatesSize) {
void update(long prevUpdatesSize) {
updates++;
if (clash) {
clashes++;
}
prevUpdatesNum += prevUpdatesSize;
if (updates % reportStatAfterUpdatesNum == 0) {
LOG.info(String.format("Avg update chain length: %d, clash stat: %d/%d",
prevUpdatesNum / updates, clashes, updates));
LOG.info(String.format("Avg update chain length: %d", prevUpdatesNum / updates));
}
}
}
Expand Down
Loading

0 comments on commit 52c2ad0

Please sign in to comment.