Skip to content

Commit

Permalink
[FLINK-6358] [gelly] Write job details for Gelly examples
Browse files Browse the repository at this point in the history
Add an option to write job details to a file in JSON format. Job details
include: job ID, runtime, parameters with values, and accumulators with
values.

This closes #4170
  • Loading branch information
greghogan committed Jul 11, 2017
1 parent be4853d commit 273223f
Showing 1 changed file with 92 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.graph;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
Expand Down Expand Up @@ -48,14 +49,24 @@
import org.apache.flink.graph.drivers.output.Hash;
import org.apache.flink.graph.drivers.output.Output;
import org.apache.flink.graph.drivers.output.Print;
import org.apache.flink.graph.drivers.parameter.BooleanParameter;
import org.apache.flink.graph.drivers.parameter.Parameterized;
import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
import org.apache.flink.graph.drivers.parameter.StringParameter;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.InstantiationUtil;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.commons.lang3.text.StrBuilder;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* This default main class executes Flink drivers.
Expand All @@ -70,7 +81,8 @@
* <p>Algorithms must explicitly support each type of output via implementation of
* interfaces. This is scalable as the number of outputs is small and finite.
*/
public class Runner {
public class Runner
extends ParameterizedBase {

private static final String INPUT = "input";

Expand Down Expand Up @@ -108,6 +120,27 @@ public class Runner {
.addClass(Hash.class)
.addClass(Print.class);

private final ParameterTool parameters;

private final BooleanParameter disableObjectReuse = new BooleanParameter(this, "__disable_object_reuse");

private final StringParameter jobDetailsPath = new StringParameter(this, "__job_details_path")
.setDefaultValue(null);

/**
* Create an algorithm runner from the given arguments.
*
* @param args command-line arguments
*/
public Runner(String[] args) {
parameters = ParameterTool.fromArgs(args);
}

@Override
public String getName() {
return this.getClass().getSimpleName();
}

/**
* List available algorithms. This is displayed to the user when no valid
* algorithm is given in the program parameterization.
Expand Down Expand Up @@ -192,21 +225,26 @@ private static String getAlgorithmUsage(String algorithmName) {
.toString();
}

public static void main(String[] args) throws Exception {
public void run() throws Exception {
// Set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig config = env.getConfig();

// should not have any non-Flink data types
config.disableAutoTypeRegistration();
config.disableForceAvro();
config.disableForceKryo();

ParameterTool parameters = ParameterTool.fromArgs(args);
config.setGlobalJobParameters(parameters);

// configure local parameters and throw proper exception on error
try {
this.configure(parameters);
} catch (RuntimeException ex) {
throw new ProgramParametrizationException(ex.getMessage());
}

// integration tests run with with object reuse both disabled and enabled
if (parameters.has("__disable_object_reuse")) {
if (disableObjectReuse.getValue()) {
config.disableObjectReuse();
} else {
config.enableObjectReuse();
Expand Down Expand Up @@ -296,6 +334,55 @@ public static void main(String[] args) throws Exception {
}

algorithm.printAnalytics(System.out);

if (jobDetailsPath.getValue() != null) {
writeJobDetails(env, jobDetailsPath.getValue());
}
}

/**
* Write the following job details as a JSON encoded file: runtime environment
* job ID, runtime, parameters, and accumulators.
*
* @param env the execution environment
* @param jobDetailsPath filesystem path to write job details
* @throws IOException on error writing to jobDetailsPath
*/
private static void writeJobDetails(ExecutionEnvironment env, String jobDetailsPath) throws IOException {
JobExecutionResult result = env.getLastJobExecutionResult();

File jsonFile = new File(jobDetailsPath);

try (JsonGenerator json = new JsonFactory().createGenerator(jsonFile, JsonEncoding.UTF8)) {
json.writeStartObject();

json.writeObjectFieldStart("Apache Flink");
json.writeStringField("version", EnvironmentInformation.getVersion());
json.writeStringField("commit ID", EnvironmentInformation.getRevisionInformation().commitId);
json.writeStringField("commit date", EnvironmentInformation.getRevisionInformation().commitDate);
json.writeEndObject();

json.writeStringField("job_id", result.getJobID().toString());
json.writeNumberField("runtime_ms", result.getNetRuntime());

json.writeObjectFieldStart("parameters");
for (Map.Entry<String, String> entry : env.getConfig().getGlobalJobParameters().toMap().entrySet()) {
json.writeStringField(entry.getKey(), entry.getValue());
}
json.writeEndObject();

json.writeObjectFieldStart("accumulators");
for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
json.writeStringField(entry.getKey(), entry.getValue().toString());
}
json.writeEndObject();

json.writeEndObject();
}
}

public static void main(String[] args) throws Exception {
new Runner(args).run();
}

/**
Expand Down

0 comments on commit 273223f

Please sign in to comment.