diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java index baeb5913bc324..96ce98b294d8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java @@ -36,7 +36,7 @@ public interface PriorityQueueSetFactory { * @return the queue with the specified unique name. */ @Nonnull - + & Keyed> KeyGroupedInternalPriorityQueue create( @Nonnull String stateName, @Nonnull TypeSerializer byteOrderedElementSerializer); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 0b42a323dbc84..8e6c356681bc5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -157,13 +157,13 @@ public HeapKeyedStateBackend( @SuppressWarnings("unchecked") @Nonnull @Override - public + public & Keyed> KeyGroupedInternalPriorityQueue create( @Nonnull String stateName, @Nonnull TypeSerializer byteOrderedElementSerializer) { - final HeapPriorityQueueSnapshotRestoreWrapper existingState = - registeredPQStates.get(stateName); + final HeapPriorityQueueSnapshotRestoreWrapper existingState = + (HeapPriorityQueueSnapshotRestoreWrapper) registeredPQStates.get(stateName); if (existingState != null) { // TODO we implement the simple way of supporting the current functionality, mimicking @@ -197,7 +197,7 @@ KeyGroupedInternalPriorityQueue create( } @Nonnull - private + private & Keyed> KeyGroupedInternalPriorityQueue createInternal( RegisteredPriorityQueueStateBackendMetaInfo metaInfo) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java index 8074c1a4fdc05..6646d5fe340de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java @@ -50,7 +50,7 @@ public HeapPriorityQueueSetFactory( @Nonnull @Override - public + public & Keyed> HeapPriorityQueueSet create( @Nonnull String stateName, @Nonnull TypeSerializer byteOrderedElementSerializer) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java index d3d3757a2c917..c9463658019b1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java @@ -278,7 +278,7 @@ private static void copyEntry( @Nonnull @Override - public + public & Keyed> KeyGroupedInternalPriorityQueue create( @Nonnull String stateName, @Nonnull TypeSerializer byteOrderedElementSerializer) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index d6c7cff688601..8aab8b8ead60c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -455,7 +455,7 @@ public void dispose() { @Nonnull @Override - public + public & Keyed> KeyGroupedInternalPriorityQueue create( @Nonnull String stateName, @Nonnull TypeSerializer byteOrderedElementSerializer) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java index 717c2b8f08b58..fb063a7af59d6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java @@ -98,7 +98,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { @Nonnull @Override - public + public & Keyed> KeyGroupedInternalPriorityQueue create( @Nonnull String stateName, @Nonnull TypeSerializer byteOrderedElementSerializer) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java index 0bf5a5a3db001..128abb66e33b0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java @@ -244,8 +244,8 @@ private IS createState( @Nonnull @Override - @SuppressWarnings({"rawtypes", "unchecked"}) - public + @SuppressWarnings({"unchecked"}) + public & Keyed> KeyGroupedInternalPriorityQueue create( @Nonnull String stateName, @Nonnull TypeSerializer byteOrderedElementSerializer) {