Skip to content

Commit

Permalink
[FLINK-21190][runtime] Refactors JsonArchivist interface
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp authored and zentol committed Mar 23, 2021
1 parent 63be0cd commit b6e0074
Show file tree
Hide file tree
Showing 18 changed files with 88 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;
Expand All @@ -50,7 +50,7 @@
public class JobAccumulatorsHandler
extends AbstractAccessExecutionGraphHandler<
JobAccumulatorsInfo, JobAccumulatorsMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {

public JobAccumulatorsHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.io.IOException;
Expand All @@ -43,7 +43,7 @@
/** Handler serving the job configuration. */
public class JobConfigHandler
extends AbstractAccessExecutionGraphHandler<JobConfigInfo, JobMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {

public JobConfigHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;

Expand All @@ -57,7 +57,7 @@
/** Handler returning the details for the specified job. */
public class JobDetailsHandler
extends AbstractAccessExecutionGraphHandler<JobDetailsInfo, JobMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {

private final MetricFetcher metricFetcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.io.IOException;
Expand All @@ -51,7 +51,7 @@
public class JobExceptionsHandler
extends AbstractAccessExecutionGraphHandler<
JobExceptionsInfo, JobExceptionsMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {

static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.io.IOException;
Expand All @@ -42,7 +42,7 @@
/** Handler serving the job execution plan. */
public class JobPlanHandler
extends AbstractAccessExecutionGraphHandler<JobPlanInfo, JobMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {

public JobPlanHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nullable;
Expand All @@ -55,7 +55,7 @@
public class JobVertexDetailsHandler
extends AbstractAccessExecutionGraphHandler<
JobVertexDetailsInfo, JobVertexMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {
private final MetricFetcher metricFetcher;

public JobVertexDetailsHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;

Expand All @@ -64,7 +64,7 @@
public class JobVertexTaskManagersHandler
extends AbstractAccessExecutionGraphHandler<
JobVertexTaskManagersInfo, JobVertexMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {
private MetricFetcher metricFetcher;

public JobVertexTaskManagersHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nonnull;
Expand All @@ -47,7 +47,7 @@
public class JobsOverviewHandler
extends AbstractRestHandler<
RestfulGateway, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {

public JobsOverviewHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.io.IOException;
Expand All @@ -53,7 +53,7 @@
public class SubtaskExecutionAttemptAccumulatorsHandler
extends AbstractSubtaskAttemptHandler<
SubtaskExecutionAttemptAccumulatorsInfo, SubtaskAttemptMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {

/**
* Instantiates a new Abstract job vertex handler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;

Expand All @@ -55,7 +55,7 @@
public class SubtaskExecutionAttemptDetailsHandler
extends AbstractSubtaskAttemptHandler<
SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {

private final MetricFetcher metricFetcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.io.IOException;
Expand All @@ -49,7 +49,7 @@
/** Request handler for the subtasks times info. */
public class SubtasksTimesHandler
extends AbstractJobVertexHandler<SubtasksTimesInfo, JobVertexMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {
public SubtasksTimesHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
Expand All @@ -50,7 +50,7 @@
/** Handler which serves the checkpoint configuration. */
public class CheckpointConfigHandler
extends AbstractAccessExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {

public CheckpointConfigHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.io.IOException;
Expand All @@ -48,7 +48,7 @@
/** REST handler which returns the details for a checkpoint. */
public class CheckpointStatisticDetailsHandler
extends AbstractCheckpointHandler<CheckpointStatistics, CheckpointMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {

public CheckpointStatisticDetailsHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
Expand All @@ -57,7 +57,7 @@
/** Handler which serves the checkpoint statistics. */
public class CheckpointingStatisticsHandler
extends AbstractAccessExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {

public CheckpointingStatisticsHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsWithSubtaskDetails;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.history.OnlyExecutionGraphJsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.io.IOException;
Expand All @@ -58,7 +58,7 @@
public class TaskCheckpointStatisticDetailsHandler
extends AbstractCheckpointHandler<
TaskCheckpointStatisticsWithSubtaskDetails, TaskCheckpointMessageParameters>
implements JsonArchivist {
implements OnlyExecutionGraphJsonArchivist {

public TaskCheckpointStatisticDetailsHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
Expand Down Expand Up @@ -129,6 +128,7 @@
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerThreadDumpHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
Expand Down Expand Up @@ -952,11 +952,11 @@ public void handleError(final Exception exception) {
}

@Override
public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo)
throws IOException {
Collection<ArchivedJson> archivedJson = new ArrayList<>(archivingHandlers.size());
for (JsonArchivist archivist : archivingHandlers) {
Collection<ArchivedJson> subArchive = archivist.archiveJsonWithPath(graph);
Collection<ArchivedJson> subArchive = archivist.archiveJsonWithPath(executionGraphInfo);
archivedJson.addAll(subArchive);
}
return archivedJson;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,6 @@
*/
public interface JsonArchivist {

/**
* Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their
* respective REST URL for a given job.
*
* <p>The collection should contain one entry for every response that could be generated for the
* given job, for example one entry for each task. The REST URLs should be unique and must not
* contain placeholders.
*
* @param graph AccessExecutionGraph for which the responses should be generated
* @return Collection containing an ArchivedJson for every response that could be generated for
* the given job
* @throws IOException thrown if the JSON generation fails
*/
Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException;

/**
* Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their
* respective REST URL for a given job.
Expand All @@ -59,8 +44,6 @@ public interface JsonArchivist {
* the given job
* @throws IOException thrown if the JSON generation fails
*/
default Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo)
throws IOException {
return archiveJsonWithPath(executionGraphInfo.getArchivedExecutionGraph());
}
Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo)
throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.history;

import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;

import java.io.IOException;
import java.util.Collection;

/**
* Interface for all classes that want to participate in the archiving of job-related json responses
* but only provide {@link AccessExecutionGraph}-related information.
*/
public interface OnlyExecutionGraphJsonArchivist extends JsonArchivist {

/**
* Returns a {@link Collection} of {@link ArchivedJson}s containing JSON responses and their
* respective REST URL for a given job.
*
* <p>The collection should contain one entry for every response that could be generated for the
* given job, for example one entry for each task. The REST URLs should be unique and must not
* contain placeholders.
*
* @param graph AccessExecutionGraph for which the responses should be generated
* @return Collection containing an ArchivedJson for every response that could be generated for
* the given job
* @throws IOException thrown if the JSON generation fails
*/
Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException;

@Override
default Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo)
throws IOException {
return archiveJsonWithPath(executionGraphInfo.getArchivedExecutionGraph());
}
}

0 comments on commit b6e0074

Please sign in to comment.