Skip to content

Commit

Permalink
[FLINK-14958][table] ProgramTargetDescriptor#jobID can be of type JobID
Browse files Browse the repository at this point in the history
This closes apache#10505 .
  • Loading branch information
AT-Fieldless authored and tisonkun committed Dec 11, 2019
1 parent 4286fca commit 6e2c9a9
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
*/
public class ProgramTargetDescriptor {

private final String jobId;
private final JobID jobId;

public ProgramTargetDescriptor(String jobId) {
public ProgramTargetDescriptor(JobID jobId) {
this.jobId = checkNotNull(jobId);
}

public String getJobId() {
public JobID getJobId() {
return jobId;
}

Expand All @@ -49,6 +49,6 @@ public String toString() {
* @return program target descriptor
*/
public static ProgramTargetDescriptor of(JobID jobId) {
return new ProgramTargetDescriptor(jobId.toString());
return new ProgramTargetDescriptor(jobId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.client.cli;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.cli.utils.TerminalUtils;
Expand Down Expand Up @@ -361,7 +362,8 @@ public ProgramTargetDescriptor executeUpdate(String sessionId, String statement)
if (failExecution) {
throw new SqlExecutionException("Fail execution.");
}
return new ProgramTargetDescriptor("testJobId");
JobID jobID = JobID.generate();
return new ProgramTargetDescriptor(jobID);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.flink.table.client.gateway.local;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down Expand Up @@ -1054,7 +1053,7 @@ private void executeAndVerifySinkResult(
boolean isRunning = true;
while (isRunning) {
Thread.sleep(50); // slow the processing down
final JobStatus jobStatus = clusterClient.getJobStatus(JobID.fromHexString(targetDescriptor.getJobId())).get();
final JobStatus jobStatus = clusterClient.getJobStatus(targetDescriptor.getJobId()).get();
switch (jobStatus) {
case CREATED:
case RUNNING:
Expand Down

0 comments on commit 6e2c9a9

Please sign in to comment.