Skip to content

Commit

Permalink
[FLINK-8062][QS] Make getKvState() with namespace private.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Nov 17, 2017
1 parent a0838de commit ff7e3cf
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ public <K, S extends State, V> CompletableFuture<S> getKvState(
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the immutable {@link State} object containing the result.
*/
@PublicEvolving
public <K, N, S extends State, V> CompletableFuture<S> getKvState(
private <K, N, S extends State, V> CompletableFuture<S> getKvState(
final JobID jobId,
final String queryableStateName,
final K key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.client.VoidNamespaceTypeInfo;
import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
Expand Down Expand Up @@ -491,9 +490,7 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
jobId,
"wrong-hankuna", // this is the wrong name.
0,
VoidNamespace.INSTANCE,
BasicTypeInfo.INT_TYPE_INFO,
VoidNamespaceTypeInfo.INSTANCE,
valueState);

try {
Expand Down Expand Up @@ -572,9 +569,7 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
jobId,
queryableState.getQueryableStateName(),
0,
VoidNamespace.INSTANCE,
BasicTypeInfo.INT_TYPE_INFO,
VoidNamespaceTypeInfo.INSTANCE,
valueState);

cluster.submitJobDetached(jobGraph);
Expand Down Expand Up @@ -1486,7 +1481,7 @@ private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(

if (!resultFuture.isDone()) {
Thread.sleep(100L);
CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
expected.whenCompleteAsync((result, throwable) -> {
if (throwable != null) {
if (
Expand Down

0 comments on commit ff7e3cf

Please sign in to comment.