Skip to content

Commit

Permalink
[FLINK-9571] Repace StateBinder with internal backend-specific state …
Browse files Browse the repository at this point in the history
…factories

This closes apache#6173.
  • Loading branch information
azagrebin authored and StefanRRichter committed Jun 18, 2018
1 parent 0e9b066 commit 0bdde83
Show file tree
Hide file tree
Showing 44 changed files with 510 additions and 814 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,6 @@ public AggregatingStateDescriptor(
this.aggFunction = checkNotNull(aggFunction);
}

// ------------------------------------------------------------------------

@Override
public AggregatingState<IN, OUT> bind(StateBinder stateBinder) throws Exception {
return stateBinder.createAggregatingState(this);
}

/**
* Returns the aggregate function to be used for the state.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,6 @@ public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction<T, ACC
}
}

// ------------------------------------------------------------------------

@Override
public FoldingState<T, ACC> bind(StateBinder stateBinder) throws Exception {
return stateBinder.createFoldingState(this);
}

/**
* Returns the fold function to be used for the folding state.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
super(name, new ListSerializer<>(typeSerializer), null);
}

// ------------------------------------------------------------------------

@Override
public ListState<T> bind(StateBinder stateBinder) throws Exception {
return stateBinder.createListState(this);
}

/**
* Gets the serializer for the elements contained in the list.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@ public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass)
super(name, new MapTypeInfo<>(keyClass, valueClass), null);
}

@Override
public MapState<UK, UV> bind(StateBinder stateBinder) throws Exception {
return stateBinder.createMapState(this);
}

@Override
public Type getType() {
return Type.MAP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,6 @@ public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Ty
this.reduceFunction = checkNotNull(reduceFunction);
}

// ------------------------------------------------------------------------

@Override
public ReducingState<T> bind(StateBinder stateBinder) throws Exception {
return stateBinder.createReducingState(this);
}

/**
* Returns the reduce function to be used for the reducing state.
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@

/**
* Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
* {@link State} in stateful operations. This contains the name and can create an actual state
* object given a {@link StateBinder} using {@link #bind(StateBinder)}.
* {@link State} in stateful operations.
*
* <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
*
Expand Down Expand Up @@ -231,13 +230,6 @@ public boolean isQueryable() {
return queryableStateName != null;
}

/**
* Creates a new {@link State} on the given {@link StateBinder}.
*
* @param stateBinder The {@code StateBackend} on which to create the {@link State}.
*/
public abstract S bind(StateBinder stateBinder) throws Exception;

// ------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,6 @@ public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
super(name, typeSerializer, null);
}

// ------------------------------------------------------------------------

@Override
public ValueState<T> bind(StateBinder stateBinder) throws Exception {
return stateBinder.createValueState(this);
}

@Override
public Type getType() {
return Type.VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,6 @@ private static class TestStateDescriptor<T> extends StateDescriptor<State, T> {
super(name, type, null);
}

@Override
public State bind(StateBinder stateBinder) throws Exception {
throw new UnsupportedOperationException();
}

@Override
public Type getType() {
return Type.VALUE;
Expand All @@ -247,11 +242,6 @@ private static class OtherTestStateDescriptor<T> extends StateDescriptor<State,
super(name, type, null);
}

@Override
public State bind(StateBinder stateBinder) throws Exception {
throw new UnsupportedOperationException();
}

@Override
public Type getType() {
return Type.VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,25 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.FutureUtils;
import org.apache.flink.queryablestate.client.state.ImmutableStateBinder;
import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState;
import org.apache.flink.queryablestate.client.state.ImmutableFoldingState;
import org.apache.flink.queryablestate.client.state.ImmutableListState;
import org.apache.flink.queryablestate.client.state.ImmutableMapState;
import org.apache.flink.queryablestate.client.state.ImmutableReducingState;
import org.apache.flink.queryablestate.client.state.ImmutableValueState;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
Expand All @@ -44,7 +56,10 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Client for querying Flink's managed state.
Expand All @@ -67,6 +82,20 @@ public class QueryableStateClient {

private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);

private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
Stream.of(
Tuple2.of(ValueStateDescriptor.class, (StateFactory) ImmutableValueState::createState),
Tuple2.of(ListStateDescriptor.class, (StateFactory) ImmutableListState::createState),
Tuple2.of(MapStateDescriptor.class, (StateFactory) ImmutableMapState::createState),
Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) ImmutableAggregatingState::createState),
Tuple2.of(ReducingStateDescriptor.class, (StateFactory) ImmutableReducingState::createState),
Tuple2.of(FoldingStateDescriptor.class, (StateFactory) ImmutableFoldingState::createState)
).collect(Collectors.toMap(t -> t.f0, t -> t.f1));

private interface StateFactory {
<T, S extends State> S createState(StateDescriptor<S, T> stateDesc, byte[] serializedState) throws Exception;
}

/** The client that forwards the requests to the proxy. */
private final Client<KvStateRequest, KvStateResponse> client;

Expand Down Expand Up @@ -241,14 +270,24 @@ private <K, N, S extends State, V> CompletableFuture<S> getKvState(
return FutureUtils.getFailedFuture(e);
}

return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
stateResponse -> {
try {
return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
});
return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
.thenApply(stateResponse -> createState(stateResponse, stateDescriptor));
}

private <T, S extends State> S createState(
KvStateResponse stateResponse,
StateDescriptor<S, T> stateDescriptor) {
StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getClass());
if (stateFactory == null) {
String message = String.format("State %s is not supported by %s",
stateDescriptor.getClass(), this.getClass());
throw new FlinkRuntimeException(message);
}
try {
return stateFactory.createState(stateDescriptor, stateResponse.getContent());
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.apache.flink.queryablestate.client.state;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.util.Preconditions;

Expand All @@ -33,7 +34,6 @@
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link AggregatingStateDescriptor}.
*/
@PublicEvolving
public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState implements AggregatingState<IN, OUT> {

private final OUT value;
Expand All @@ -57,15 +57,15 @@ public void clear() {
throw MODIFICATION_ATTEMPT_ERROR;
}

public static <IN, ACC, OUT> ImmutableAggregatingState<IN, OUT> createState(
final AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor,
final byte[] serializedValue) throws IOException {

@SuppressWarnings("unchecked")
public static <OUT, ACC, S extends State> S createState(
StateDescriptor<S, ACC> stateDescriptor,
byte[] serializedState) throws IOException {
final ACC accumulator = KvStateSerializer.deserializeValue(
serializedValue,
stateDescriptor.getSerializer());

final OUT state = stateDescriptor.getAggregateFunction().getResult(accumulator);
return new ImmutableAggregatingState<>(state);
serializedState,
stateDescriptor.getSerializer());
final OUT state = ((AggregatingStateDescriptor<?, ACC, OUT>) stateDescriptor).
getAggregateFunction().getResult(accumulator);
return (S) new ImmutableAggregatingState<>(state);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.apache.flink.queryablestate.client.state;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.util.Preconditions;

Expand All @@ -33,7 +34,6 @@
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link FoldingStateDescriptor}.
*/
@PublicEvolving
@Deprecated
public final class ImmutableFoldingState<IN, ACC> extends ImmutableState implements FoldingState<IN, ACC> {

Expand All @@ -58,13 +58,13 @@ public void clear() {
throw MODIFICATION_ATTEMPT_ERROR;
}

public static <IN, ACC> ImmutableFoldingState<IN, ACC> createState(
final FoldingStateDescriptor<IN, ACC> stateDescriptor,
final byte[] serializedState) throws IOException {

@SuppressWarnings("unchecked")
public static <ACC, S extends State> S createState(
StateDescriptor<S, ACC> stateDescriptor,
byte[] serializedState) throws IOException {
final ACC state = KvStateSerializer.deserializeValue(
serializedState,
stateDescriptor.getSerializer());
return new ImmutableFoldingState<>(state);
serializedState,
stateDescriptor.getSerializer());
return (S) new ImmutableFoldingState<>(state);
}
}
Loading

0 comments on commit 0bdde83

Please sign in to comment.