Skip to content

Commit

Permalink
[FLINK-7704] [flip6] Add JobPlanHandler for new RestServerEndpoint
Browse files Browse the repository at this point in the history
This closes apache#4768.
  • Loading branch information
yew1eb authored and tillrohrmann committed Oct 10, 2017
1 parent 0a286d0 commit 9829ca0
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
Expand All @@ -51,6 +52,7 @@
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
Expand Down Expand Up @@ -186,6 +188,14 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
executor,
checkpointStatsCache);

JobPlanHandler jobPlanHandler = new JobPlanHandler(
restAddressFuture,
leaderRetriever,
timeout,
JobPlanHeaders.getInstance(),
executionGraphCache,
executor);

final File tmpDir = restConfiguration.getTmpDir();

Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
Expand All @@ -210,6 +220,7 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), checkpointStatisticDetailsHandler));
handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), jobPlanHandler));

BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.apache.flink.runtime.rest.handler.job;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* Handler serving the job execution plan.
*/
public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters> {

public JobPlanHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {

super(
localRestAddress,
leaderRetriever,
timeout,
messageHeaders,
executionGraphCache,
executor);
}

@Override
protected JobPlanInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
return new JobPlanInfo(executionGraph.getJsonPlan());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* Message headers for the {@link JobPlanHandler}.
*/
public class JobPlanHeaders implements MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters> {

private static final JobPlanHeaders INSTANCE = new JobPlanHeaders();

public static final String URL = "/jobs/:jobid/plan";

private JobPlanHeaders() {
}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}

@Override
public Class<JobPlanInfo> getResponseClass() {
return JobPlanInfo.class;
}

@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}

@Override
public JobMessageParameters getUnresolvedMessageParameters() {
return new JobMessageParameters();
}

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}

@Override
public String getTargetRestEndpointURL() {
return URL;
}

public static JobPlanHeaders getInstance() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages;

import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.util.Preconditions;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;

import java.io.IOException;
import java.util.Objects;

/**
* Response type of the {@link JobPlanHandler}.
*/
@JsonSerialize(using = JobPlanInfo.Serializer.class)
@JsonDeserialize(using = JobPlanInfo.Deserializer.class)
public class JobPlanInfo implements ResponseBody {

private final String jsonPlan;

public JobPlanInfo(String jsonPlan) {
this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
}

public String getJsonPlan() {
return jsonPlan;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JobPlanInfo that = (JobPlanInfo) o;
return Objects.equals(jsonPlan, that.jsonPlan);
}

@Override
public int hashCode() {
return Objects.hash(jsonPlan);
}

//---------------------------------------------------------------------------------
// Static helper classes
//---------------------------------------------------------------------------------

/**
* Json serializer for the {@link JobPlanInfo}.
*/
public static final class Serializer extends StdSerializer<JobPlanInfo> {

private static final long serialVersionUID = -1551666039618928811L;

public Serializer() {
super(JobPlanInfo.class);
}

@Override
public void serialize(
JobPlanInfo jobPlanInfo,
JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeString(jobPlanInfo.getJsonPlan());
}
}

/**
* Json deserializer for the {@link JobPlanInfo}.
*/
public static final class Deserializer extends StdDeserializer<JobPlanInfo> {

private static final long serialVersionUID = -3580088509877177213L;

public Deserializer() {
super(JobPlanInfo.class);
}

@Override
public JobPlanInfo deserialize(
JsonParser jsonParser,
DeserializationContext deserializationContext) throws IOException {
final String jsonPlan = jsonParser.getText();
return new JobPlanInfo(jsonPlan);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.legacy.messages;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;

/**
* Tests that the {@link JobPlanInfo} can be marshalled and unmarshalled.
*/
public class JobPlanInfoTest extends RestResponseMarshallingTestBase<JobPlanInfo> {

@Override
protected Class<JobPlanInfo> getTestResponseClass() {
return JobPlanInfo.class;
}

@Override
protected JobPlanInfo getTestResponseInstance() {
JobID jobID = new JobID();
String jobName = "job_007";
String jsonPlan = "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}";
return new JobPlanInfo(jsonPlan);
}
}

0 comments on commit 9829ca0

Please sign in to comment.