Skip to content

Commit

Permalink
[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queri…
Browse files Browse the repository at this point in the history
…es with unknown jobIds
  • Loading branch information
kl0u committed Nov 17, 2017
1 parent 5e059e9 commit 2fe078f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,6 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {

/**
* Tests that duplicate query registrations fail the job at the JobManager.
*
* <b>NOTE: </b> This test is only in the non-HA variant of the tests because
* in the HA mode we use the actual JM code which does not recognize the
* {@code NotifyWhenJobStatus} message.
*/
@Test
public void testDuplicateRegistrationFailsJob() throws Exception {
Expand Down Expand Up @@ -435,10 +431,10 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {

/**
* Tests that the correct exception is thrown if the query
* contains a wrong queryable state name.
* contains a wrong jobId or wrong queryable state name.
*/
@Test
public void testWrongQueryableStateName() throws Exception {
public void testWrongJobIdAndWrongQueryableStateName() throws Exception {
// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();

Expand Down Expand Up @@ -486,15 +482,35 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
assertEquals(JobStatus.RUNNING, jobStatus.state());

CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
final JobID wrongJobId = new JobID();

CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownJobFuture = client.getKvState(
wrongJobId, // this is the wrong job id
"hankuna",
0,
BasicTypeInfo.INT_TYPE_INFO,
valueState);

try {
unknownJobFuture.get();
fail(); // by now the job must have failed.
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof RuntimeException);
Assert.assertTrue(e.getCause().getMessage().contains(
"FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")"));
} catch (Exception ignored) {
fail("Unexpected type of exception.");
}

CompletableFuture<ValueState<Tuple2<Integer, Long>>> unknownQSName = client.getKvState(
jobId,
"wrong-hankuna", // this is the wrong name.
0,
BasicTypeInfo.INT_TYPE_INFO,
valueState);

try {
future.get();
unknownQSName.get();
fail(); // by now the job must have failed.
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof RuntimeException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
import org.apache.flink.runtime.messages.accumulators._
import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
import org.apache.flink.runtime.messages.{Acknowledge, FlinkJobNotFoundException, StackTrace}
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
Expand Down Expand Up @@ -1503,7 +1503,7 @@ class JobManager(
}

case None =>
sender() ! Status.Failure(new IllegalStateException(s"Job ${msg.getJobId} not found"))
sender() ! Status.Failure(new FlinkJobNotFoundException(msg.getJobId))
}

// TaskManager KvState registration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
Expand Down Expand Up @@ -672,7 +673,7 @@ public void testKvStateMessages() throws Exception {
try {
Await.result(lookupFuture, deadline.timeLeft());
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
} catch (FlinkJobNotFoundException ignored) {
// Expected
}

Expand Down Expand Up @@ -735,7 +736,7 @@ public void testKvStateMessages() throws Exception {
try {
Await.result(lookupFuture, deadline.timeLeft());
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
} catch (FlinkJobNotFoundException ignored) {
// Expected
}

Expand Down

0 comments on commit 2fe078f

Please sign in to comment.