Skip to content

Commit

Permalink
[FLINK-8729][streaming] Refactor JSONGenerator to use jackson
Browse files Browse the repository at this point in the history
This closes apache#5554.
  • Loading branch information
zentol committed Mar 9, 2018
1 parent b8dac87 commit b8eb6af
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 33 deletions.
6 changes: 0 additions & 6 deletions flink-streaming-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ under the License.
<version>3.5</version>
</dependency>

<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.commons.json</artifactId>
<version>2.0.6</version>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.StreamOperator;

import org.apache.sling.commons.json.JSONArray;
import org.apache.sling.commons.json.JSONException;
import org.apache.sling.commons.json.JSONObject;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -48,14 +48,15 @@ public class JSONGenerator {
public static final String PARALLELISM = "parallelism";

private StreamGraph streamGraph;
private final ObjectMapper mapper = new ObjectMapper();

public JSONGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
}

public String getJSON() throws JSONException {
JSONObject json = new JSONObject();
JSONArray nodes = new JSONArray();
public String getJSON() {
ObjectNode json = mapper.createObjectNode();
ArrayNode nodes = mapper.createArrayNode();
json.put("nodes", nodes);
List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
Collections.sort(operatorIDs, new Comparator<Integer>() {
Expand All @@ -75,20 +76,20 @@ public int compare(Integer o1, Integer o2) {
return json.toString();
}

private void visit(JSONArray jsonArray, List<Integer> toVisit,
Map<Integer, Integer> edgeRemapings) throws JSONException {
private void visit(ArrayNode jsonArray, List<Integer> toVisit,
Map<Integer, Integer> edgeRemapings) {

Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);

if (streamGraph.getSourceIDs().contains(vertexID)
|| Collections.disjoint(vertex.getInEdges(), toVisit)) {

JSONObject node = new JSONObject();
ObjectNode node = mapper.createObjectNode();
decorateNode(vertexID, node);

if (!streamGraph.getSourceIDs().contains(vertexID)) {
JSONArray inputs = new JSONArray();
ArrayNode inputs = mapper.createArrayNode();
node.put(PREDECESSORS, inputs);

for (StreamEdge inEdge : vertex.getInEdges()) {
Expand All @@ -99,7 +100,7 @@ private void visit(JSONArray jsonArray, List<Integer> toVisit,
decorateEdge(inputs, inEdge, mappedID);
}
}
jsonArray.put(node);
jsonArray.add(node);
toVisit.remove(vertexID);
} else {
Integer iterationHead = -1;
Expand All @@ -111,38 +112,38 @@ private void visit(JSONArray jsonArray, List<Integer> toVisit,
}
}

JSONObject obj = new JSONObject();
JSONArray iterationSteps = new JSONArray();
ObjectNode obj = mapper.createObjectNode();
ArrayNode iterationSteps = mapper.createArrayNode();
obj.put(STEPS, iterationSteps);
obj.put(ID, iterationHead);
obj.put(PACT, "IterativeDataStream");
obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism());
obj.put(CONTENTS, "Stream Iteration");
JSONArray iterationInputs = new JSONArray();
ArrayNode iterationInputs = mapper.createArrayNode();
obj.put(PREDECESSORS, iterationInputs);
toVisit.remove(iterationHead);
visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
jsonArray.put(obj);
jsonArray.add(obj);
}

if (!toVisit.isEmpty()) {
visit(jsonArray, toVisit, edgeRemapings);
}
}

private void visitIteration(JSONArray jsonArray, List<Integer> toVisit, int headId,
Map<Integer, Integer> edgeRemapings, JSONArray iterationInEdges) throws JSONException {
private void visitIteration(ArrayNode jsonArray, List<Integer> toVisit, int headId,
Map<Integer, Integer> edgeRemapings, ArrayNode iterationInEdges) {

Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
toVisit.remove(vertexID);

// Ignoring head and tail to avoid redundancy
if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) {
JSONObject obj = new JSONObject();
jsonArray.put(obj);
ObjectNode obj = mapper.createObjectNode();
jsonArray.add(obj);
decorateNode(vertexID, obj);
JSONArray inEdges = new JSONArray();
ArrayNode inEdges = mapper.createArrayNode();
obj.put(PREDECESSORS, inEdges);

for (StreamEdge inEdge : vertex.getInEdges()) {
Expand All @@ -161,16 +162,15 @@ private void visitIteration(JSONArray jsonArray, List<Integer> toVisit, int head

}

private void decorateEdge(JSONArray inputArray, StreamEdge inEdge, int mappedInputID)
throws JSONException {
JSONObject input = new JSONObject();
inputArray.put(input);
private void decorateEdge(ArrayNode inputArray, StreamEdge inEdge, int mappedInputID) {
ObjectNode input = mapper.createObjectNode();
inputArray.add(input);
input.put(ID, mappedInputID);
input.put(SHIP_STRATEGY, inEdge.getPartitioner());
input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
input.put(SHIP_STRATEGY, inEdge.getPartitioner().toString());
input.put(SIDE, (inputArray.size() == 0) ? "first" : "second");
}

private void decorateNode(Integer vertexID, JSONObject node) throws JSONException {
private void decorateNode(Integer vertexID, ObjectNode node) {

StreamNode vertex = streamGraph.getStreamNode(vertexID);

Expand Down

0 comments on commit b8eb6af

Please sign in to comment.