Skip to content

Commit

Permalink
[FLINK-8493] [flip6] Integrate queryable state with Flip-6
Browse files Browse the repository at this point in the history
Adapt KvStateRegistry to accept multiple KvStateRegistryListeners. Introduce
the KvStateLocationOracle to retrieve the KvStateLocation. Adapt the KvStateClientProxy
to accept multiple KvStateLocationOracles to retrieve the KvStateLocations for
different jobs. Registered the KvStateRegistryListener and KvStateLocationOracle
in TaskExecutor upon establishing a connection to the JobMaster.

This closes apache#5339.
  • Loading branch information
tillrohrmann committed Feb 6, 2018
1 parent 6316087 commit cef6741
Show file tree
Hide file tree
Showing 22 changed files with 763 additions and 299 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class RestOptions {
*/
public static final ConfigOption<Integer> REST_PORT =
key("rest.port")
.defaultValue(9067)
.defaultValue(9065)
.withDescription("The port that the server listens on / the client connects to.");

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ public class WebOptions {
/**
* Timeout for asynchronous operations by the WebRuntimeMonitor in milliseconds.
*/
public static final ConfigOption<Long> TIMEOUT = ConfigOptions
.key("web.timeout")
public static final ConfigOption<Long> TIMEOUT =
key("web.timeout")
.defaultValue(10L * 1000L);

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@
package org.apache.flink.queryablestate.exceptions;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.FlinkException;

/**
* Exception to fail Future if the Task Manager on which the
* {@code Client Proxy} is running on, does not know the active
* Job Manager.
* {@code Client Proxy} is running on, does not know the location
* of a requested state.
*/
@Internal
public class UnknownJobManagerException extends Exception {
public class UnknownLocationException extends FlinkException {

private static final long serialVersionUID = 9092442511708951209L;

public UnknownJobManagerException() {
super("Unknown JobManager. Either the JobManager has not registered yet or has lost leadership.");
public UnknownLocationException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
import org.apache.flink.queryablestate.exceptions.UnknownKvStateKeyGroupLocationException;
import org.apache.flink.queryablestate.exceptions.UnknownLocationException;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
Expand All @@ -33,18 +34,17 @@
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;

import akka.dispatch.OnComplete;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,12 +53,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;

/**
* This handler acts as an internal (to the Flink cluster) client that receives
* the requests from external clients, executes them by contacting the Job Manager (if necessary) and
Expand Down Expand Up @@ -205,32 +201,32 @@ private CompletableFuture<KvStateLocation> getKvStateLookupInfo(
return cachedFuture;
}

LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName);

final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
lookupCache.put(cacheKey, location);
return proxy.getJobManagerFuture().thenComposeAsync(
jobManagerGateway -> {
final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
.onComplete(new OnComplete<KvStateLocation>() {

@Override
public void onComplete(Throwable failure, KvStateLocation loc) throws Throwable {
if (failure != null) {
if (failure instanceof FlinkJobNotFoundException) {
// if the jobId was wrong, remove the entry from the cache.
lookupCache.remove(cacheKey);
}
location.completeExceptionally(failure);
} else {
location.complete(loc);
}
}
}, Executors.directExecutionContext());
return location;
}, queryExecutor);
final KvStateLocationOracle kvStateLocationOracle = proxy.getKvStateLocationOracle(jobId);

if (kvStateLocationOracle != null) {
LOG.debug("Retrieving location for state={} of job={} from the key-value state location oracle.", jobId, queryableStateName);
final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
lookupCache.put(cacheKey, location);

kvStateLocationOracle
.requestKvStateLocation(jobId, queryableStateName)
.whenComplete(
(KvStateLocation kvStateLocation, Throwable throwable) -> {
if (throwable != null) {
if (ExceptionUtils.stripCompletionException(throwable) instanceof FlinkJobNotFoundException) {
// if the jobId was wrong, remove the entry from the cache.
lookupCache.remove(cacheKey);
}
location.completeExceptionally(throwable);
} else {
location.complete(kvStateLocation);
}
});

return location;
} else {
return FutureUtils.completedExceptionally(new UnknownLocationException("Could not contact the state location oracle to retrieve the state location."));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,24 @@
package org.apache.flink.queryablestate.client.proxy;

import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.exceptions.UnknownJobManagerException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.AbstractServerBase;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -43,18 +45,13 @@
@Internal
public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse> implements KvStateClientProxy {

private static final CompletableFuture<ActorGateway> UNKNOWN_JOB_MANAGER =
FutureUtils.completedExceptionally(new UnknownJobManagerException());

/** Number of threads used to process incoming requests. */
private final int queryExecutorThreads;

/** Statistics collector. */
private final KvStateRequestStats stats;

private final Object leaderLock = new Object();

private CompletableFuture<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;
private final ConcurrentHashMap<JobID, KvStateLocationOracle> kvStateLocationOracles;

/**
* Creates the Queryable State Client Proxy.
Expand Down Expand Up @@ -83,6 +80,8 @@ public KvStateClientProxyImpl(
Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads.");
this.queryExecutorThreads = numQueryThreads;
this.stats = Preconditions.checkNotNull(stats);

this.kvStateLocationOracles = new ConcurrentHashMap<>(4);
}

@Override
Expand All @@ -106,20 +105,25 @@ public void shutdown() {
}

@Override
public void updateJobManager(CompletableFuture<ActorGateway> leadingJobManager) throws Exception {
synchronized (leaderLock) {
if (leadingJobManager == null) {
jobManagerFuture = UNKNOWN_JOB_MANAGER;
} else {
jobManagerFuture = leadingJobManager;
}
public void updateKvStateLocationOracle(JobID jobId, @Nullable KvStateLocationOracle kvStateLocationOracle) {
if (kvStateLocationOracle == null) {
kvStateLocationOracles.remove(jobId);
} else {
kvStateLocationOracles.put(jobId, kvStateLocationOracle);
}
}

@Nullable
@Override
public CompletableFuture<ActorGateway> getJobManagerFuture() {
synchronized (leaderLock) {
return jobManagerFuture;
public KvStateLocationOracle getKvStateLocationOracle(JobID jobId) {
final KvStateLocationOracle legacyKvStateLocationOracle = kvStateLocationOracles.get(HighAvailabilityServices.DEFAULT_JOB_ID);

// we give preference to the oracle registered under the default job id
// to make it work with the pre Flip-6 code paths
if (legacyKvStateLocationOracle != null) {
return legacyKvStateLocationOracle;
} else {
return kvStateLocationOracles.get(jobId);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.queryablestate.client.proxy;

import org.apache.flink.api.common.JobID;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.util.TestLogger;

import org.junit.Before;
import org.junit.Test;

import java.net.InetAddress;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;

/**
* Tests for the {@link KvStateClientProxyImpl}.
*/
public class KvStateClientProxyImplTest extends TestLogger {

private KvStateClientProxyImpl kvStateClientProxy;

@Before
public void setup() {
kvStateClientProxy = new KvStateClientProxyImpl(
InetAddress.getLoopbackAddress(),
Collections.singleton(0).iterator(),
1,
1,
new DisabledKvStateRequestStats());
}

/**
* Tests that we can set and retrieve the {@link KvStateLocationOracle}.
*/
@Test
public void testKvStateLocationOracle() {
final JobID jobId1 = new JobID();
final TestingKvStateLocationOracle kvStateLocationOracle1 = new TestingKvStateLocationOracle();
kvStateClientProxy.updateKvStateLocationOracle(jobId1, kvStateLocationOracle1);
final JobID jobId2 = new JobID();
final TestingKvStateLocationOracle kvStateLocationOracle2 = new TestingKvStateLocationOracle();
kvStateClientProxy.updateKvStateLocationOracle(jobId2, kvStateLocationOracle2);

assertThat(kvStateClientProxy.getKvStateLocationOracle(new JobID()), nullValue());

assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId1), equalTo(kvStateLocationOracle1));
assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId2), equalTo(kvStateLocationOracle2));

kvStateClientProxy.updateKvStateLocationOracle(jobId1, null);
assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId1), nullValue());
}

/**
* Tests that {@link KvStateLocationOracle} registered under {@link HighAvailabilityServices#DEFAULT_JOB_ID}
* will be used for all requests.
*/
@Test
public void testPreFlip6CodePathPreference() {
final TestingKvStateLocationOracle kvStateLocationOracle = new TestingKvStateLocationOracle();
kvStateClientProxy.updateKvStateLocationOracle(HighAvailabilityServices.DEFAULT_JOB_ID, kvStateLocationOracle);
final JobID jobId = new JobID();
kvStateClientProxy.updateKvStateLocationOracle(jobId, new TestingKvStateLocationOracle());

assertThat(kvStateClientProxy.getKvStateLocationOracle(jobId), equalTo(kvStateLocationOracle));
}

/**
* Testing implementation of {@link KvStateLocationOracle}.
*/
private static final class TestingKvStateLocationOracle implements KvStateLocationOracle {

@Override
public CompletableFuture<KvStateLocation> requestKvStateLocation(JobID jobId, String registrationName) {
return null;
}
}

}
Loading

0 comments on commit cef6741

Please sign in to comment.