Skip to content

Commit

Permalink
[FLINK-17376] Remove deprecated state access methods
Browse files Browse the repository at this point in the history
  • Loading branch information
mghildiy authored and aljoscha committed Jun 3, 2020
1 parent 4ff270e commit 276332e
Show file tree
Hide file tree
Showing 19 changed files with 10 additions and 257 deletions.
10 changes: 1 addition & 9 deletions docs/dev/stream/state/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ added to the state. Contrary to `ReducingState`, the aggregate type may be diffe
of elements that are added to the state. The interface is the same as for `ListState` but elements
added using `add(IN)` are aggregated using a specified `AggregateFunction`.

* `FoldingState<T, ACC>`: This keeps a single value that represents the aggregation of all values
added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type
of elements that are added to the state. The interface is similar to `ListState` but elements
added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.

* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and
retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or
`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable
Expand All @@ -129,8 +124,6 @@ You can also use `isEmpty()` to check whether this map contains any key-value ma
All types of state also have a method `clear()` that clears the state for the currently
active key, i.e. the key of the input element.

<span class="label label-danger">Attention</span> `FoldingState` and `FoldingStateDescriptor` have been deprecated in Flink 1.4 and will be completely removed in the future. Please use `AggregatingState` and `AggregatingStateDescriptor` instead.

It is important to keep in mind that these state objects are only used for interfacing
with state. The state is not necessarily stored inside but might reside on disk or somewhere else.
The second thing to keep in mind is that the value you get from the state
Expand All @@ -142,7 +135,7 @@ To get a state handle, you have to create a `StateDescriptor`. This holds the na
that you can reference them), the type of the values that the state holds, and possibly
a user-specified function, such as a `ReduceFunction`. Depending on what type of state you
want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor`,
a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a `MapStateDescriptor`.
a `ReducingStateDescriptor`, or a `MapStateDescriptor`.

State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*.
Please see [here]({% link dev/user_defined_functions.md %}#rich-functions) for
Expand All @@ -153,7 +146,6 @@ is available in a `RichFunction` has these methods for accessing state:
* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
* `ListState<T> getListState(ListStateDescriptor<T>)`
* `AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)`
* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`

This is an example `FlatMapFunction` that shows how all of the parts fit together:
Expand Down
9 changes: 1 addition & 8 deletions docs/dev/stream/state/state.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,17 @@ keyed state 接口提供不同类型状态的访问接口,这些状态都作
* `AggregatingState<IN, OUT>`: 保留一个单值,表示添加到状态的所有值的聚合。和 `ReducingState` 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。
接口与 `ListState` 类似,但使用 `add(IN)` 添加的元素会用指定的 `AggregateFunction` 进行聚合。

* `FoldingState<T, ACC>`: 保留一个单值,表示添加到状态的所有值的聚合。 与 `ReducingState` 相反,聚合类型可能与添加到状态的元素类型不同。
接口与 `ListState` 类似,但使用`add(T)`添加的元素会用指定的 `FoldFunction` 折叠成聚合值。

* `MapState<UK, UV>`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 `put(UK,UV)` 或者 `putAll(Map<UK,UV>)` 添加映射。
使用 `get(UK)` 检索特定 key。 使用 `entries()``keys()``values()` 分别检索映射、键和值的可迭代视图。你还可以通过 `isEmpty()` 来判断是否包含任何键值对。

所有类型的状态还有一个`clear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。

<span class="label label-danger">注意</span> `FoldingState``FoldingStateDescriptor` 从 Flink 1.4 开始就已经被启用,将会在未来被删除。
作为替代请使用 `AggregatingState``AggregatingStateDescriptor`

请牢记,这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。
另外需要牢记的是从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。

你必须创建一个 `StateDescriptor`,才能得到对应的状态句柄。 这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们),
状态所持有值的类型,并且可能包含用户指定的函数,例如`ReduceFunction`。 根据不同的状态类型,可以创建`ValueStateDescriptor``ListStateDescriptor`
`ReducingStateDescriptor``FoldingStateDescriptor``MapStateDescriptor`
`ReducingStateDescriptor``MapStateDescriptor`

状态通过 `RuntimeContext` 进行访问,因此只能在 *rich functions* 中使用。请参阅[这里]({% link dev/user_defined_functions.zh.md %}#rich-functions)获取相关信息,
但是我们很快也会看到一个例子。`RichFunction``RuntimeContext` 提供如下方法:
Expand All @@ -132,7 +126,6 @@ keyed state 接口提供不同类型状态的访问接口,这些状态都作
* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
* `ListState<T> getListState(ListStateDescriptor<T>)`
* `AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)`
* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`

下面是一个 `FlatMapFunction` 的例子,展示了如何将这些部分组合起来:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1388,19 +1388,6 @@ public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor
return (ListState<S>) mockRestoredUnionListState;
}

@Override
public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
// return empty state for the legacy 1.2 Kafka consumer state
return new TestingListState<>();
}

// ------------------------------------------------------------------------

@Override
public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
throw new UnsupportedOperationException();
}

@Override
public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -97,7 +98,7 @@ FunctionInitializationContext getMockContext() throws Exception {
OperatorStateStore mockStore = Mockito.mock(OperatorStateStore.class);
FunctionInitializationContext mockContext = Mockito.mock(FunctionInitializationContext.class);
Mockito.when(mockContext.getOperatorStateStore()).thenReturn(mockStore);
Mockito.when(mockStore.getSerializableListState(any(String.class))).thenReturn(null);
Mockito.when(mockStore.getListState(any(ListStateDescriptor.class))).thenReturn(null);
return mockContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
Expand Down Expand Up @@ -414,50 +412,6 @@ public interface RuntimeContext {
@PublicEvolving
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);

/**
* Gets a handle to the system's key/value folding state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values with different types.
*
* <p>This state is only accessible if the function is executed on a KeyedStream.
*
* <pre>{@code
* DataStream<MyType> stream = ...;
* KeyedStream<MyType> keyedStream = stream.keyBy("id");
*
* keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
*
* private FoldingState<MyType, Long> state;
*
* public void open(Configuration cfg) {
* state = getRuntimeContext().getFoldingState(
* new FoldingStateDescriptor<>("sum", 0L, (a, b) -> a.count() + b, Long.class));
* }
*
* public Tuple2<MyType, Long> map(MyType value) {
* state.add(value);
* return new Tuple2<>(value, state.get());
* }
* });
*
* }</pre>
*
* @param stateProperties The descriptor defining the properties of the stats.
*
* @param <T> Type of the values folded in the other state
* @param <ACC> Type of the value in the state
*
* @return The partitioned state object.
*
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*
* @deprecated will be removed in a future version in favor of {@link AggregatingState}
*/
@PublicEvolving
@Deprecated
<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);

/**
* Gets a handle to the system's key/value map state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
Expand Down Expand Up @@ -226,14 +224,6 @@ public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingS
"This state is only accessible by functions executed on a KeyedStream");
}

@Override
@PublicEvolving
@Deprecated
public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}

@Override
@PublicEvolving
public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;
import java.util.Set;

/**
Expand Down Expand Up @@ -111,41 +110,4 @@ public interface OperatorStateStore {
* @return set of names for all registered broadcast states.
*/
Set<String> getRegisteredBroadcastStateNames();

// -------------------------------------------------------------------------------------------
// Deprecated methods
// -------------------------------------------------------------------------------------------

/**
* Creates (or restores) a list state. Each state is registered under a unique name.
* The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).
*
* <p>The items in the list are repartitionable by the system in case of changed operator parallelism.
*
* @param stateDescriptor The descriptor for this state, providing a name and serializer.
* @param <S> The generic type of the state
*
* @return A list for all state partitions.
*
* @deprecated since 1.3.0. This was deprecated as part of a refinement to the function names.
* Please use {@link #getListState(ListStateDescriptor)} instead.
*/
@Deprecated
<S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception;

/**
* Creates a state of the given name that uses Java serialization to persist the state. The items in the list
* are repartitionable by the system in case of changed operator parallelism.
*
* <p>This is a simple convenience method. For more flexibility on how state serialization
* should happen, use the {@link #getListState(ListStateDescriptor)} method.
*
* @param stateName The name of state to create
* @return A list state using Java serialization to serialize state objects.
*
* @deprecated since 1.3.0. Using Java serialization for persisting state is not encouraged.
* Please use {@link #getListState(ListStateDescriptor)} instead.
*/
@Deprecated
<T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
Expand Down Expand Up @@ -201,11 +199,6 @@ public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
throw new UnsupportedOperationException("State is not supported.");
}

@Override
public <T, ACC> FoldingState<T, ACC> getFoldingState(final FoldingStateDescriptor<T, ACC> stateProperties) {
throw new UnsupportedOperationException("State is not supported.");
}

@Override
public <UK, UV> MapState<UK, UV> getMapState(final MapStateDescriptor<UK, UV> stateProperties) {
throw new UnsupportedOperationException("State is not supported.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
Expand Down Expand Up @@ -178,17 +176,6 @@ public void testCepRuntimeContext() {
// expected
}

try {
runtimeContext.getFoldingState(new FoldingStateDescriptor<>(
"foobar",
0,
mock(FoldFunction.class),
Integer.class));
fail("Expected getFoldingState to fail with unsupported operation exception.");
} catch (UnsupportedOperationException e) {
// expected
}

try {
runtimeContext.getMapState(new MapStateDescriptor<>("foobar", Integer.class, String.class));
fail("Expected getMapState to fail with unsupported operation exception.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
Expand Down Expand Up @@ -229,17 +227,6 @@ public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingS
return keyedStateStore.getAggregatingState(stateProperties);
}

@Override
@Deprecated
public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
if (!stateRegistrationAllowed) {
throw new RuntimeException(REGISTRATION_EXCEPTION_MSG);
}

registeredDescriptors.add(stateProperties);
return keyedStateStore.getFoldingState(stateProperties);
}

@Override
public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
if (!stateRegistrationAllowed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,31 +219,6 @@ public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor
return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
}

// -------------------------------------------------------------------------------------------
// Deprecated state access methods
// -------------------------------------------------------------------------------------------

/**
* @deprecated This was deprecated as part of a refinement to the function names.
* Please use {@link #getListState(ListStateDescriptor)} instead.
*/
@Deprecated
@Override
public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
return getListState(stateDescriptor);
}

/**
* @deprecated Using Java serialization for persisting state is not encouraged.
* Please use {@link #getListState(ListStateDescriptor)} instead.
*/
@SuppressWarnings("unchecked")
@Deprecated
@Override
public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
return (ListState<T>) getListState(new ListStateDescriptor<>(stateName, deprecatedDefaultJavaSerializer));
}

// -------------------------------------------------------------------------------------------
// Snapshot
// -------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ public void testSnapshotAsyncClose() throws Exception {
ListStateDescriptor<MutableType> stateDescriptor1 =
new ListStateDescriptor<>("test1", new JavaSerializer<MutableType>());

ListState<MutableType> listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
ListState<MutableType> listState1 = operatorStateBackend.getListState(stateDescriptor1);

listState1.add(MutableType.of(42));
listState1.add(MutableType.of(4711));
Expand Down Expand Up @@ -841,7 +841,7 @@ public void testSnapshotAsyncCancel() throws Exception {
ListStateDescriptor<MutableType> stateDescriptor1 =
new ListStateDescriptor<>("test1", new JavaSerializer<MutableType>());

ListState<MutableType> listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
ListState<MutableType> listState1 = operatorStateBackend.getListState(stateDescriptor1);

listState1.add(MutableType.of(42));
listState1.add(MutableType.of(4711));
Expand Down
Loading

0 comments on commit 276332e

Please sign in to comment.