Skip to content

Commit

Permalink
[hotfix] Remove unnecessary dependencies in flink-runtime.
Browse files Browse the repository at this point in the history
  - Removes Apache Commons HTTP Client
  - Removes Apache Commons IO
  - Removes Jettison JSONparser and consolidates tests to use Jackson instead (which is included anyways)
  • Loading branch information
StephanEwen committed Jan 15, 2016
1 parent a65cd8d commit 388c280
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,6 @@ under the License.
<exclude>com.typesafe.akka:akka-slf4j_*</exclude>
<exclude>io.netty:netty-all</exclude>
<exclude>io.netty:netty</exclude>
<exclude>org.eclipse.jetty:jetty-server</exclude>
<exclude>org.eclipse.jetty:jetty-continuation</exclude>
<exclude>org.eclipse.jetty:jetty-http</exclude>
<exclude>org.eclipse.jetty:jetty-io</exclude>
<exclude>org.eclipse.jetty:jetty-util</exclude>
<exclude>org.eclipse.jetty:jetty-security</exclude>
<exclude>org.eclipse.jetty:jetty-servlet</exclude>
<exclude>commons-fileupload:commons-fileupload</exclude>
<exclude>org.apache.avro:avro</exclude>
<exclude>commons-collections:commons-collections</exclude>
Expand Down Expand Up @@ -175,18 +168,14 @@ under the License.
<exclude>org.apache.commons:commons-math</exclude>
<exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
<exclude>commons-logging:commons-logging</exclude>
<exclude>org.apache.httpcomponents:httpclient</exclude>
<exclude>org.apache.httpcomponents:httpcore</exclude>
<exclude>commons-codec:commons-codec</exclude>
<exclude>com.fasterxml.jackson.core:jackson-core</exclude>
<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
<exclude>org.codehaus.jettison:jettison</exclude>
<exclude>stax:stax-api</exclude>
<exclude>com.typesafe:config</exclude>
<exclude>org.uncommons.maths:uncommons-maths</exclude>
<exclude>com.github.scopt:scopt_*</exclude>
<exclude>org.mortbay.jetty:servlet-api</exclude>
<exclude>commons-io:commons-io</exclude>
<exclude>commons-cli:commons-cli</exclude>
</excludes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,9 @@ under the License.
<exclude>com.typesafe.akka:akka-slf4j_*</exclude>
<exclude>io.netty:netty-all</exclude>
<exclude>io.netty:netty</exclude>
<exclude>org.eclipse.jetty:jetty-server</exclude>
<exclude>org.eclipse.jetty:jetty-continuation</exclude>
<exclude>org.eclipse.jetty:jetty-http</exclude>
<exclude>org.eclipse.jetty:jetty-io</exclude>
<exclude>org.eclipse.jetty:jetty-util</exclude>
<exclude>org.eclipse.jetty:jetty-security</exclude>
<exclude>org.eclipse.jetty:jetty-servlet</exclude>
<exclude>commons-fileupload:commons-fileupload</exclude>
<exclude>org.apache.avro:avro</exclude>
<exclude>commons-collections:commons-collections</exclude>
<exclude>org.codehaus.jackson:jackson-core-asl</exclude>
<exclude>org.codehaus.jackson:jackson-mapper-asl</exclude>
<exclude>com.thoughtworks.paranamer:paranamer</exclude>
<exclude>org.xerial.snappy:snappy-java</exclude>
<exclude>org.apache.commons:commons-compress</exclude>
Expand All @@ -180,18 +171,14 @@ under the License.
<exclude>org.apache.commons:commons-math</exclude>
<exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
<exclude>commons-logging:commons-logging</exclude>
<exclude>org.apache.httpcomponents:httpclient</exclude>
<exclude>org.apache.httpcomponents:httpcore</exclude>
<exclude>commons-codec:commons-codec</exclude>
<exclude>com.fasterxml.jackson.core:jackson-core</exclude>
<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
<exclude>org.codehaus.jettison:jettison</exclude>
<exclude>stax:stax-api</exclude>
<exclude>com.typesafe:config</exclude>
<exclude>org.uncommons.maths:uncommons-maths</exclude>
<exclude>com.github.scopt:scopt_*</exclude>
<exclude>org.mortbay.jetty:servlet-api</exclude>
<exclude>commons-io:commons-io</exclude>
<exclude>commons-cli:commons-cli</exclude>
</excludes>
Expand Down
17 changes: 0 additions & 17 deletions flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,6 @@ under the License.
<artifactId>commons-cli</artifactId>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<!-- Version is set in root POM -->
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand All @@ -94,12 +83,6 @@ under the License.
<!-- Version is set in root POM -->
</dependency>

<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>1.1</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

import akka.actor.ActorSystem;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
Expand All @@ -30,17 +34,14 @@
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;

import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
Expand Down Expand Up @@ -142,17 +143,25 @@ public static WebMonitor startWebRuntimeMonitor(
}
}

public static Map<String, String> fromKeyValueJsonArray(JSONArray parsed) throws JSONException {
Map<String, String> hashMap = new HashMap<>();

for (int i = 0; i < parsed.length(); i++) {
JSONObject jsonObject = parsed.getJSONObject(i);
String key = jsonObject.getString("key");
String value = jsonObject.getString("value");
hashMap.put(key, value);
public static Map<String, String> fromKeyValueJsonArray(String jsonString) {
try {
Map<String, String> map = new HashMap<>();
ObjectMapper m = new ObjectMapper();
ArrayNode array = (ArrayNode) m.readTree(jsonString);

Iterator<JsonNode> elements = array.elements();
while (elements.hasNext()) {
JsonNode node = elements.next();
String key = node.get("key").asText();
String value = node.get("value").asText();
map.put(key, value);
}

return map;
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}

return hashMap;
}

public static JobDetails createDetailsForJob(ExecutionGraph job) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@

package org.apache.flink.test.web;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;

import org.apache.commons.io.FileUtils;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;

import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -38,6 +39,11 @@
import java.util.Collection;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(Parameterized.class)
public class WebFrontendITCase extends MultipleProgramsTestBase {

Expand Down Expand Up @@ -69,41 +75,51 @@ public void getFrontPage() {
try {
String fromHTTP = TestBaseUtils.getFromHTTP("http:https://localhost:" + port + "/index.html");
String text = "Apache Flink Dashboard";
Assert.assertTrue("Startpage should contain " + text, fromHTTP.contains(text));
} catch (Exception e) {
assertTrue("Startpage should contain " + text, fromHTTP.contains(text));
}
catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
fail(e.getMessage());
}
}

@Test
public void getNumberOfTaskManagers() {
try {
String json = TestBaseUtils.getFromHTTP("http:https://localhost:" + port + "/taskmanagers/");
JSONObject response = new JSONObject(json);
JSONArray taskManagers = response.getJSONArray("taskmanagers");
Assert.assertNotNull(taskManagers);
Assert.assertEquals(cluster.numTaskManagers(), taskManagers.length());
}catch(Throwable e) {

ObjectMapper mapper = new ObjectMapper();
JsonNode response = mapper.readTree(json);
ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers");

assertNotNull(taskManagers);
assertEquals(cluster.numTaskManagers(), taskManagers.size());
}
catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
fail(e.getMessage());
}
}

@Test
public void getTaskmanagers() {
try {
String json = getFromHTTP("http:https://localhost:" + port + "/taskmanagers/");
JSONObject parsed = new JSONObject(json);
JSONArray taskManagers = parsed.getJSONArray("taskmanagers");
Assert.assertNotNull(taskManagers);
Assert.assertEquals(cluster.numTaskManagers(), taskManagers.length());
JSONObject taskManager = taskManagers.getJSONObject(0);
Assert.assertNotNull(taskManager);
Assert.assertEquals(4, taskManager.getInt("freeSlots"));
}catch(Throwable e) {

ObjectMapper mapper = new ObjectMapper();
JsonNode parsed = mapper.readTree(json);
ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");

assertNotNull(taskManagers);
assertEquals(cluster.numTaskManagers(), taskManagers.size());

JsonNode taskManager = taskManagers.get(0);
assertNotNull(taskManager);
assertEquals(4, taskManager.get("freeSlots").asInt());
}
catch(Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
fail(e.getMessage());
}
}

Expand All @@ -114,31 +130,32 @@ public void getLogAndStdoutFiles() {

FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
String logs = getFromHTTP("http:https://localhost:" + port + "/jobmanager/log");
Assert.assertTrue(logs.contains("job manager log"));
assertTrue(logs.contains("job manager log"));

FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
logs = getFromHTTP("http:https://localhost:" + port + "/jobmanager/stdout");
Assert.assertTrue(logs.contains("job manager out"));
} catch(Throwable e) {
assertTrue(logs.contains("job manager out"));
}
catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
fail(e.getMessage());
}
}

@Test
public void getConfiguration() {
try {
String config = getFromHTTP("http:https://localhost:" + port + "/jobmanager/config");
JSONArray array = new JSONArray(config);

Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(array);
Assert.assertTrue(conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY).startsWith(logDir.toString()));
Assert.assertEquals(
cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
} catch (Throwable e) {

Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config);
assertTrue(conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY).startsWith(logDir.toString()));
assertEquals(
cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
}
catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
fail(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Joiner;

import org.apache.commons.io.FileUtils;

import org.apache.flink.client.FlinkYarnSessionCli;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
Expand All @@ -29,6 +35,7 @@
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.TestBaseUtils;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
Expand All @@ -43,14 +50,15 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;

import org.apache.log4j.Level;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;

import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -199,16 +207,17 @@ public void testTaskManagerFailure() {
LOG.info("Got application URL from YARN {}", url);

String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/");
JSONObject parsedTMs = new JSONObject(response);
JSONArray taskManagers = parsedTMs.getJSONArray("taskmanagers");


JsonNode parsedTMs = new ObjectMapper().readTree(response);
ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers");
Assert.assertNotNull(taskManagers);
Assert.assertEquals(1, taskManagers.length());
Assert.assertEquals(1, taskManagers.getJSONObject(0).getInt("slotsNumber"));
Assert.assertEquals(1, taskManagers.size());
Assert.assertEquals(1, taskManagers.get(0).get("slotsNumber").asInt());

// get the configuration from webinterface & check if the dynamic properties from YARN show up there.
String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config");
JSONArray parsed = new JSONArray(jsonConfig);
Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(parsed);
Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig);

Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value"));
Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers"));
Expand All @@ -226,9 +235,9 @@ public void testTaskManagerFailure() {
}
LOG.info("Extracted hostname:port: {} {}", hostname, port);

Assert.assertEquals("unable to find hostname in " + parsed, hostname,
Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname,
parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY));
Assert.assertEquals("unable to find port in " + parsed, port,
Assert.assertEquals("unable to find port in " + jsonConfig, port,
parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));

// test logfile access
Expand Down
Loading

0 comments on commit 388c280

Please sign in to comment.