Skip to content

Commit

Permalink
Partial Revert "[FLINK-15669] [sql-client] Fix SQL client can't cance…
Browse files Browse the repository at this point in the history
…l flink job"

This partially reverts commit 0650e4b.
Only the tests are reverted because the fix is still good.  The added
tests don't work because of a race condition: if the query finishes
before the test cancels them the job status will be FINISHED, and not
CANCELLED, as the text expects.
  • Loading branch information
aljoscha committed Apr 26, 2020
1 parent ba54a8d commit cbaea29
Showing 1 changed file with 0 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@

package org.apache.flink.table.client.gateway.local;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
Expand Down Expand Up @@ -86,7 +84,6 @@
import static org.apache.flink.table.client.gateway.local.ExecutionContextTest.CATALOGS_ENVIRONMENT_FILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -486,51 +483,6 @@ public void testStreamQueryExecutionChangelog() throws Exception {
}
}

@Test(timeout = 90_000L)
public void testStreamQueryCancel() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_PLANNER", planner);
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
replaceVars.put("$VAR_RESULT_MODE", "changelog");
replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
replaceVars.put("$VAR_MAX_ROWS", "100");

final LocalExecutor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
String sessionId = executor.openSession(session);
assertEquals("test-session", sessionId);

try {
final ResultDescriptor desc = executor.executeQuery(sessionId, "SELECT * FROM TestView1");
final JobID jobId = JobID.fromHexString(desc.getResultId());

assertFalse(desc.isMaterialized());

JobStatus jobStatus1 = getJobStatus(executor, sessionId, jobId);

assertNotEquals(JobStatus.CANCELED, jobStatus1);

executor.cancelQuery(sessionId, desc.getResultId());

JobStatus jobStatus2 = null;
// wait up to 30 seconds
for (int i = 0; i < 300; ++i) {
jobStatus2 = getJobStatus(executor, sessionId, jobId);
if (jobStatus2 != JobStatus.CANCELED) {
Thread.sleep(100);
} else {
break;
}
}
assertEquals(JobStatus.CANCELED, jobStatus2);
} finally {
executor.closeSession(sessionId);
}
}

@Test(timeout = 90_000L)
public void testStreamQueryExecutionChangelogMultipleTimes() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Expand Down Expand Up @@ -697,50 +649,6 @@ public void testBatchQueryExecution() throws Exception {
}
}

@Test(timeout = 90_000L)
public void testBatchQueryCancel() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
final Map<String, String> replaceVars = new HashMap<>();
replaceVars.put("$VAR_PLANNER", planner);
replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
replaceVars.put("$VAR_RESULT_MODE", "table");
replaceVars.put("$VAR_UPDATE_MODE", "");
replaceVars.put("$VAR_MAX_ROWS", "100");

final LocalExecutor executor = createModifiedExecutor(clusterClient, replaceVars);
final SessionContext session = new SessionContext("test-session", new Environment());
String sessionId = executor.openSession(session);
assertEquals("test-session", sessionId);

try {
final ResultDescriptor desc = executor.executeQuery(sessionId, "SELECT * FROM TestView1");
final JobID jobId = JobID.fromHexString(desc.getResultId());
assertTrue(desc.isMaterialized());

JobStatus jobStatus1 = getJobStatus(executor, sessionId, jobId);

assertNotEquals(JobStatus.CANCELED, jobStatus1);

executor.cancelQuery(sessionId, desc.getResultId());

JobStatus jobStatus2 = null;
// wait up to 30 seconds
for (int i = 0; i < 300; ++i) {
jobStatus2 = getJobStatus(executor, sessionId, jobId);
if (jobStatus2 != JobStatus.CANCELED) {
Thread.sleep(100);
} else {
break;
}
}
assertEquals(JobStatus.CANCELED, jobStatus2);
} finally {
executor.closeSession(sessionId);
}
}

@Test(timeout = 90_000L)
public void testBatchQueryExecutionMultipleTimes() throws Exception {
final URL url = getClass().getClassLoader().getResource("test-data.csv");
Expand Down Expand Up @@ -1396,21 +1304,4 @@ private List<String> retrieveChangelogResult(
}
return actualResults;
}

private JobStatus getJobStatus(LocalExecutor executor, String sessionId, JobID jobId) {
final ExecutionContext<?> context = executor.getExecutionContext(sessionId);
return getJobStatusInternal(context, jobId);
}

private <T> JobStatus getJobStatusInternal(ExecutionContext<T> context, JobID jobId) {
try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
// retrieve existing cluster
ClusterClient<T> clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
return clusterClient.getJobStatus(jobId).get();
} catch (SqlExecutionException e) {
throw e;
} catch (Exception e) {
throw new SqlExecutionException("Could not locate a cluster.", e);
}
}
}

0 comments on commit cbaea29

Please sign in to comment.