Skip to content

Commit

Permalink
[FLINK-9572] Extend InternalAppendingState with internal stored state…
Browse files Browse the repository at this point in the history
… access

This closes apache#6156.
  • Loading branch information
azagrebin authored and StefanRRichter committed Jun 14, 2018
1 parent 975f9b1 commit 09fbf23
Show file tree
Hide file tree
Showing 19 changed files with 315 additions and 214 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.heap;

import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalAppendingState;

/**
* Base class for {@link AppendingState} ({@link InternalAppendingState}) that is stored on the heap.
*
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <IN> The type of the input elements.
* @param <SV> The type of the values in the state.
* @param <OUT> The type of the output elements.
*/
abstract class AbstractHeapAppendingState<K, N, IN, SV, OUT>
extends AbstractHeapState<K, N, SV>
implements InternalAppendingState<K, N, IN, SV, OUT> {
/**
* Creates a new key/value state for the given hash map of key/value pairs.
*
* @param stateTable The state table for which this state is associated to.
* @param keySerializer The serializer for the keys.
* @param valueSerializer The serializer for the state.
* @param namespaceSerializer The serializer for the namespace.
* @param defaultValue The default value for the state.
*/
AbstractHeapAppendingState(
StateTable<K, N, SV> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<SV> valueSerializer,
TypeSerializer<N> namespaceSerializer,
SV defaultValue) {
super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
}

@Override
public SV getInternal() {
return stateTable.get(currentNamespace);
}

@Override
public void updateInternal(SV valueToStore) {
stateTable.put(currentNamespace, valueToStore);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.state.heap;

import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalMergingState;
Expand All @@ -34,11 +33,10 @@
* @param <IN> The type of the input elements.
* @param <SV> The type of the values in the state.
* @param <OUT> The type of the output elements.
* @param <S> The type of State
*/
public abstract class AbstractHeapMergingState<K, N, IN, SV, OUT, S extends State>
extends AbstractHeapState<K, N, SV, S>
implements InternalMergingState<K, N, IN, SV, OUT> {
abstract class AbstractHeapMergingState<K, N, IN, SV, OUT>
extends AbstractHeapAppendingState<K, N, IN, SV, OUT>
implements InternalMergingState<K, N, IN, SV, OUT> {

/**
* The merge transformation function that implements the merge logic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <SV> The type of the values in the state.
* @param <S> The type of State
*/
public abstract class AbstractHeapState<K, N, SV, S extends State> implements InternalKvState<K, N, SV> {
public abstract class AbstractHeapState<K, N, SV> implements InternalKvState<K, N, SV> {

/** Map containing the actual key/value pairs. */
protected final StateTable<K, N, SV> stateTable;
Expand All @@ -60,7 +59,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State> implements In
* @param namespaceSerializer The serializer for the namespace.
* @param defaultValue The default value for the state.
*/
protected AbstractHeapState(
AbstractHeapState(
StateTable<K, N, SV> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<SV> valueSerializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
* @param <ACC> The type of the value stored in the state (the accumulator type).
* @param <OUT> The type of the value returned from the state.
*/
public class HeapAggregatingState<K, N, IN, ACC, OUT>
extends AbstractHeapMergingState<K, N, IN, ACC, OUT, AggregatingState<IN, OUT>>
implements InternalAggregatingState<K, N, IN, ACC, OUT> {
class HeapAggregatingState<K, N, IN, ACC, OUT>
extends AbstractHeapMergingState<K, N, IN, ACC, OUT>
implements InternalAggregatingState<K, N, IN, ACC, OUT> {

private final AggregateTransformation<IN, ACC, OUT> aggregateTransformation;

Expand All @@ -53,13 +53,13 @@ public class HeapAggregatingState<K, N, IN, ACC, OUT>
* @param defaultValue The default value for the state.
* @param aggregateFunction The aggregating function used for aggregating state.
*/
public HeapAggregatingState(
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<ACC> valueSerializer,
TypeSerializer<N> namespaceSerializer,
ACC defaultValue,
AggregateFunction<IN, ACC, OUT> aggregateFunction) {
HeapAggregatingState(
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<ACC> valueSerializer,
TypeSerializer<N> namespaceSerializer,
ACC defaultValue,
AggregateFunction<IN, ACC, OUT> aggregateFunction) {

super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
this.aggregateTransformation = new AggregateTransformation<>(aggregateFunction);
Expand All @@ -86,8 +86,7 @@ public TypeSerializer<ACC> getValueSerializer() {

@Override
public OUT get() {

ACC accumulator = stateTable.get(currentNamespace);
ACC accumulator = getInternal();
return accumulator != null ? aggregateTransformation.aggFunction.getResult(accumulator) : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
* @deprecated will be removed in a future version
*/
@Deprecated
public class HeapFoldingState<K, N, T, ACC>
extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>>
implements InternalFoldingState<K, N, T, ACC> {
class HeapFoldingState<K, N, T, ACC>
extends AbstractHeapAppendingState<K, N, T, ACC, ACC>
implements InternalFoldingState<K, N, T, ACC> {

/** The function used to fold the state */
/** The function used to fold the state. */
private final FoldTransformation foldTransformation;

/**
Expand All @@ -55,13 +55,13 @@ public class HeapFoldingState<K, N, T, ACC>
* @param defaultValue The default value for the state.
* @param foldFunction The fold function used for folding state.
*/
public HeapFoldingState(
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<ACC> valueSerializer,
TypeSerializer<N> namespaceSerializer,
ACC defaultValue,
FoldFunction<T, ACC> foldFunction) {
HeapFoldingState(
StateTable<K, N, ACC> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<ACC> valueSerializer,
TypeSerializer<N> namespaceSerializer,
ACC defaultValue,
FoldFunction<T, ACC> foldFunction) {
super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
this.foldTransformation = new FoldTransformation(foldFunction);
}
Expand All @@ -87,7 +87,7 @@ public TypeSerializer<ACC> getValueSerializer() {

@Override
public ACC get() {
return stateTable.get(currentNamespace);
return getInternal();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
* @param <N> The type of the namespace.
* @param <V> The type of the value.
*/
public class HeapListState<K, N, V>
extends AbstractHeapMergingState<K, N, V, List<V>, Iterable<V>, ListState<V>>
implements InternalListState<K, N, V> {
class HeapListState<K, N, V>
extends AbstractHeapMergingState<K, N, V, List<V>, Iterable<V>>
implements InternalListState<K, N, V> {

/**
* Creates a new key/value state for the given hash map of key/value pairs.
Expand All @@ -51,12 +51,12 @@ public class HeapListState<K, N, V>
* @param namespaceSerializer The serializer for the namespace.
* @param defaultValue The default value for the state.
*/
public HeapListState(
StateTable<K, N, List<V>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<List<V>> valueSerializer,
TypeSerializer<N> namespaceSerializer,
List<V> defaultValue) {
HeapListState(
StateTable<K, N, List<V>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<List<V>> valueSerializer,
TypeSerializer<N> namespaceSerializer,
List<V> defaultValue) {
super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
}

Expand All @@ -81,7 +81,7 @@ public TypeSerializer<List<V>> getValueSerializer() {

@Override
public Iterable<V> get() {
return stateTable.get(currentNamespace);
return getInternal();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
* @param <UK> The type of the keys in the state.
* @param <UV> The type of the values in the state.
*/
public class HeapMapState<K, N, UK, UV>
extends AbstractHeapState<K, N, Map<UK, UV>, MapState<UK, UV>>
implements InternalMapState<K, N, UK, UV> {
class HeapMapState<K, N, UK, UV>
extends AbstractHeapState<K, N, Map<UK, UV>>
implements InternalMapState<K, N, UK, UV> {

/**
* Creates a new key/value state for the given hash map of key/value pairs.
Expand All @@ -51,12 +51,12 @@ public class HeapMapState<K, N, UK, UV>
* @param namespaceSerializer The serializer for the namespace.
* @param defaultValue The default value for the state.
*/
public HeapMapState(
StateTable<K, N, Map<UK, UV>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<Map<UK, UV>> valueSerializer,
TypeSerializer<N> namespaceSerializer,
Map<UK, UV> defaultValue) {
HeapMapState(
StateTable<K, N, Map<UK, UV>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<Map<UK, UV>> valueSerializer,
TypeSerializer<N> namespaceSerializer,
Map<UK, UV> defaultValue) {
super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);

Preconditions.checkState(valueSerializer instanceof MapSerializer, "Unexpected serializer type.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
* @param <N> The type of the namespace.
* @param <V> The type of the value.
*/
public class HeapReducingState<K, N, V>
extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>>
implements InternalReducingState<K, N, V> {
class HeapReducingState<K, N, V>
extends AbstractHeapMergingState<K, N, V, V, V>
implements InternalReducingState<K, N, V> {

private final ReduceTransformation<V> reduceTransformation;

Expand Down Expand Up @@ -83,7 +83,7 @@ public TypeSerializer<V> getValueSerializer() {

@Override
public V get() {
return stateTable.get(currentNamespace);
return getInternal();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
* @param <N> The type of the namespace.
* @param <V> The type of the value.
*/
public class HeapValueState<K, N, V>
extends AbstractHeapState<K, N, V, ValueState<V>>
implements InternalValueState<K, N, V> {
class HeapValueState<K, N, V>
extends AbstractHeapState<K, N, V>
implements InternalValueState<K, N, V> {

/**
* Creates a new key/value state for the given hash map of key/value pairs.
Expand All @@ -42,12 +42,12 @@ public class HeapValueState<K, N, V>
* @param namespaceSerializer The serializer for the namespace.
* @param defaultValue The default value for the state.
*/
public HeapValueState(
StateTable<K, N, V> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<V> valueSerializer,
TypeSerializer<N> namespaceSerializer,
V defaultValue) {
HeapValueState(
StateTable<K, N, V> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<V> valueSerializer,
TypeSerializer<N> namespaceSerializer,
V defaultValue) {
super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

/**
* The peer to the {@link AppendingState} in the internal state type hierarchy.
*
*
* <p>See {@link InternalKvState} for a description of the internal state hierarchy.
*
* @param <K> The type of key the state is associated to
Expand All @@ -31,4 +31,22 @@
* @param <SV> The type of elements in the state
* @param <OUT> The type of the resulting element in the state
*/
public interface InternalAppendingState<K, N, IN, SV, OUT> extends InternalKvState<K, N, SV>, AppendingState<IN, OUT> {}
public interface InternalAppendingState<K, N, IN, SV, OUT> extends InternalKvState<K, N, SV>, AppendingState<IN, OUT> {
/**
* Get internally stored value.
*
* @return internally stored value.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
SV getInternal() throws Exception;

/**
* Update internally stored value.
*
* @param valueToStore new value to store.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void updateInternal(SV valueToStore) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3293,7 +3293,7 @@ protected void testConcurrentMapIfQueryable() throws Exception {
backend.setCurrentKey(1);
state.update(121818273);

StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ? ,?>) kvState).getStateTable();
StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ?>) kvState).getStateTable();
checkConcurrentStateTable(stateTable, numberOfKeyGroups);

}
Expand All @@ -3315,7 +3315,7 @@ protected void testConcurrentMapIfQueryable() throws Exception {
backend.setCurrentKey(1);
state.add(121818273);

StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ? , ?>) kvState).getStateTable();
StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ?>) kvState).getStateTable();
checkConcurrentStateTable(stateTable, numberOfKeyGroups);
}

Expand All @@ -3342,7 +3342,7 @@ public Integer reduce(Integer value1, Integer value2) throws Exception {
backend.setCurrentKey(1);
state.add(121818273);

StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ? ,?>) kvState).getStateTable();
StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ?>) kvState).getStateTable();
checkConcurrentStateTable(stateTable, numberOfKeyGroups);
}

Expand All @@ -3369,7 +3369,7 @@ public Integer fold(Integer accumulator, Integer value) throws Exception {
backend.setCurrentKey(1);
state.add(121818273);

StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ? ,?>) kvState).getStateTable();
StateTable<?, ?, ?> stateTable = ((AbstractHeapState<?, ?, ?>) kvState).getStateTable();
checkConcurrentStateTable(stateTable, numberOfKeyGroups);
}

Expand Down
Loading

0 comments on commit 09fbf23

Please sign in to comment.