Skip to content

Commit

Permalink
[FLINK-7706] [flip6] Add JobAccumulatorsHandler for new REST endpoint
Browse files Browse the repository at this point in the history
This closes apache#4898.
  • Loading branch information
yew1eb authored and tillrohrmann committed Nov 7, 2017
1 parent de201a6 commit 0c62c52
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
Expand Down Expand Up @@ -302,6 +304,15 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
executor,
metricFetcher);

JobAccumulatorsHandler jobAccumulatorsHandler = new JobAccumulatorsHandler(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
JobAccumulatorsHeaders.getInstance(),
executionGraphCache,
executor);

final File tmpDir = restConfiguration.getTmpDir();

Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
Expand Down Expand Up @@ -331,6 +342,7 @@ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> in
handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler));
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler));
handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
handlers.add(Tuple2.of(JobAccumulatorsHeaders.getInstance(), jobAccumulatorsHandler));
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.job;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* Request handler that returns the aggregated accumulators of a job.
*/
public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobMessageParameters> {

public JobAccumulatorsHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobMessageParameters> messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {
super(
localRestAddress,
leaderRetriever,
timeout,
responseHeaders,
messageHeaders,
executionGraphCache,
executor);
}

@Override
protected JobAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph graph) throws RestHandlerException {
StringifiedAccumulatorResult[] accs = graph.getAccumulatorResultsStringified();
List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(accs.length);

for (StringifiedAccumulatorResult acc : accs) {
userTaskAccumulators.add(
new JobAccumulatorsInfo.UserTaskAccumulator(
acc.getName(),
acc.getType(),
acc.getValue()));
}

return new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* Message headers for the {@link JobAccumulatorsHandler}.
*/
public class JobAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobMessageParameters> {

private static final JobAccumulatorsHeaders INSTANCE = new JobAccumulatorsHeaders();

public static final String URL = "/jobs" +
"/:" + JobIDPathParameter.KEY +
"/accumulators";

private JobAccumulatorsHeaders() {
}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}

@Override
public Class<JobAccumulatorsInfo> getResponseClass() {
return JobAccumulatorsInfo.class;
}

@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}

@Override
public JobMessageParameters getUnresolvedMessageParameters() {
return new JobMessageParameters();
}

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}

@Override
public String getTargetRestEndpointURL() {
return URL;
}

public static JobAccumulatorsHeaders getInstance() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages;

import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Objects;

/**
* Response type of the {@link JobAccumulatorsHandler}.
*/
public class JobAccumulatorsInfo implements ResponseBody {
public static final String FIELD_NAME_JOB_ACCUMULATORS = "job-accumulators";
public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = "user-task-accumulators";

@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS)
private List<JobAccumulator> jobAccumulators;

@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS)
private List<UserTaskAccumulator> userAccumulators;

@JsonCreator
public JobAccumulatorsInfo(
@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List<JobAccumulator> jobAccumulators,
@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators) {
this.jobAccumulators = Preconditions.checkNotNull(jobAccumulators);
this.userAccumulators = Preconditions.checkNotNull(userAccumulators);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
JobAccumulatorsInfo that = (JobAccumulatorsInfo) o;
return Objects.equals(userAccumulators, that.userAccumulators);
}

@Override
public int hashCode() {
return Objects.hash(userAccumulators);
}

//---------------------------------------------------------------------------------
// Static helper classes
//---------------------------------------------------------------------------------

/**
* Json serializer for the {@link JobAccumulatorsInfo}.
*/
public static final class JobAccumulator {
// empty for now
}

/**
* Json serializer for the {@link JobAccumulatorsInfo}.
*/
public static final class UserTaskAccumulator {

public static final String FIELD_NAME_ACC_NAME = "name";
public static final String FIELD_NAME_ACC_TYPE = "type";
public static final String FIELD_NAME_ACC_VALUE = "value";

@JsonProperty(FIELD_NAME_ACC_NAME)
private String name;

@JsonProperty(FIELD_NAME_ACC_TYPE)
private String type;

@JsonProperty(FIELD_NAME_ACC_VALUE)
private String value;

@JsonCreator
public UserTaskAccumulator(
@JsonProperty(FIELD_NAME_ACC_NAME) String name,
@JsonProperty(FIELD_NAME_ACC_TYPE) String type,
@JsonProperty(FIELD_NAME_ACC_VALUE) String value) {
this.name = Preconditions.checkNotNull(name);
this.type = Preconditions.checkNotNull(type);
this.value = Preconditions.checkNotNull(value);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UserTaskAccumulator that = (UserTaskAccumulator) o;
return Objects.equals(name, that.name) &&
Objects.equals(type, that.type) &&
Objects.equals(value, that.value);
}

@Override
public int hashCode() {
return Objects.hash(name, type, value);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Tests that the {@link JobAccumulatorsInfo} can be marshalled and unmarshalled.
*/
public class JobAccumulatorsInfoTest extends RestResponseMarshallingTestBase<JobAccumulatorsInfo> {
@Override
protected Class<JobAccumulatorsInfo> getTestResponseClass() {
return JobAccumulatorsInfo.class;
}

@Override
protected JobAccumulatorsInfo getTestResponseInstance() throws Exception {
List<JobAccumulatorsInfo.UserTaskAccumulator> userAccumulatorList = new ArrayList<>(3);
userAccumulatorList.add(new JobAccumulatorsInfo.UserTaskAccumulator(
"uta1.name",
"uta1.type",
"uta1.value"));
userAccumulatorList.add(new JobAccumulatorsInfo.UserTaskAccumulator(
"uta2.name",
"uta2.type",
"uta2.value"));
userAccumulatorList.add(new JobAccumulatorsInfo.UserTaskAccumulator(
"uta3.name",
"uta3.type",
"uta3.value"));

return new JobAccumulatorsInfo(Collections.emptyList(), userAccumulatorList);
}
}

0 comments on commit 0c62c52

Please sign in to comment.