Skip to content

Commit

Permalink
[FLINK-9887][state] Integrate priority queue state with existing seri…
Browse files Browse the repository at this point in the history
…alizer upgrade mechanism

This closes apache#6467.
  • Loading branch information
StefanRRichter committed Aug 2, 2018
1 parent 1fe5e4b commit 9d273a3
Show file tree
Hide file tree
Showing 23 changed files with 1,011 additions and 380 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
public class StateMigrationException extends FlinkException {
private static final long serialVersionUID = 8268516412747670839L;

public static final String MIGRATION_NOT_SUPPORTED_MSG = "State migration is currently not supported.";

public StateMigrationException(String message) {
super(message);
}
Expand All @@ -35,4 +37,8 @@ public StateMigrationException(Throwable cause) {
public StateMigrationException(String message, Throwable cause) {
super(message, cause);
}

public static StateMigrationException notSupported() {
return new StateMigrationException(MIGRATION_NOT_SUPPORTED_MSG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ StreamCompressionDecorator getKeyGroupCompressionDecorator() {
* Returns the total number of state entries across all keys/namespaces.
*/
@VisibleForTesting
public abstract int numStateEntries();
public abstract int numKeyValueStateEntries();

// TODO remove this once heap-based timers are working with RocksDB incremental snapshots!
public boolean requiresLegacySynchronousTimerSnapshots() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K,
// the new serializer; we're deliberately failing here for now to have equal functionality with
// the RocksDB backend to avoid confusion for users.

throw new StateMigrationException("State migration isn't supported, yet.");
throw StateMigrationException.notSupported();
}
}

Expand Down Expand Up @@ -781,7 +781,7 @@ private <S> ListState<S> getListState(
// the new serializer; we're deliberately failing here for now to have equal functionality with
// the RocksDB backend to avoid confusion for users.

throw new StateMigrationException("State migration isn't supported, yet.");
throw StateMigrationException.notSupported();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ public static <N, S> RegisteredKeyValueStateBackendMetaInfo<N, S> resolveKvState
TypeSerializer<N> newNamespaceSerializer,
StateDescriptor<?, S> newStateDescriptor) throws StateMigrationException {

Preconditions.checkState(restoredStateMetaInfoSnapshot.getBackendStateType()
== StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
"Incompatible state types. " +
"Was [" + restoredStateMetaInfoSnapshot.getBackendStateType() + "], " +
"registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");

Preconditions.checkState(
Objects.equals(newStateDescriptor.getName(), restoredStateMetaInfoSnapshot.getName()),
"Incompatible state names. " +
Expand All @@ -160,7 +166,7 @@ public static <N, S> RegisteredKeyValueStateBackendMetaInfo<N, S> resolveKvState

Preconditions.checkState(
newStateDescriptor.getType() == restoredType,
"Incompatible state types. " +
"Incompatible key/value state types. " +
"Was [" + restoredType + "], " +
"registered with [" + newStateDescriptor.getType() + "].");
}
Expand All @@ -184,7 +190,7 @@ public static <N, S> RegisteredKeyValueStateBackendMetaInfo<N, S> resolveKvState

if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
// TODO state migration currently isn't possible.
throw new StateMigrationException("State migration isn't supported, yet.");
throw StateMigrationException.notSupported();
} else {
return new RegisteredKeyValueStateBackendMetaInfo<>(
newStateDescriptor.getType(),
Expand Down
Loading

0 comments on commit 9d273a3

Please sign in to comment.