Skip to content

Commit

Permalink
Merge branch 'warneke'
Browse files Browse the repository at this point in the history
  • Loading branch information
Fabian Hueske committed Feb 27, 2011
2 parents 845cd6e + aff72d3 commit 2421ef2
Show file tree
Hide file tree
Showing 17 changed files with 823 additions and 460 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ public void run() {
* @throws IOException
* thrown on error while initializing the RPC connection to the job manager
*/
public JobClient(JobGraph jobGraph)
throws IOException {
public JobClient(JobGraph jobGraph) throws IOException {

this(jobGraph, new Configuration());
}
Expand All @@ -150,11 +149,9 @@ public JobClient(JobGraph jobGraph)
* @throws IOException
* thrown on error while initializing the RPC connection to the job manager
*/
public JobClient(JobGraph jobGraph, Configuration configuration)
throws IOException {
public JobClient(JobGraph jobGraph, Configuration configuration) throws IOException {

final String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_ADDRESS);
final String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
final int port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ public final class ConfigConstants {
*/
public static final String TASK_MANAGER_DATA_PORT_KEY = "taskmanager.data.port";

/**
* The key for the config parameter defining whether to use discovery on startup.
*/
public static final String TASK_MANAGER_USE_DISCOVERY_KEY = "taskmanager.setup.usediscovery";

/**
* The key for the config parameter defining the directory for temporary files.
*/
Expand All @@ -76,11 +71,6 @@ public final class ConfigConstants {
// Default Values
// ------------------------------------------------------------------------

/**
* The default network address to connect to for communication with the job manager.
*/
public static final String DEFAULT_JOB_MANAGER_IPC_ADDRESS = "127.0.0.1";

/**
* The default network port to connect to for communication with the job manager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static String getRandomFilename(String prefix) {
final StringBuilder stringBuilder = new StringBuilder(prefix);

for (int i = 0; i < LENGTH; i++) {
stringBuilder.append(ALPHABET[(int) Math.floor(Math.random() * ALPHABET.length)]);
stringBuilder.append(ALPHABET[(int) Math.floor(Math.random() * (double)ALPHABET.length)]);
}

return stringBuilder.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package eu.stratosphere.nephele.profiling.impl;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -27,7 +28,6 @@
import org.apache.commons.logging.LogFactory;

import eu.stratosphere.nephele.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.discovery.DiscoveryService;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.ipc.Server;
Expand Down Expand Up @@ -63,14 +63,14 @@ public class JobManagerProfilerImpl implements JobManagerProfiler, ProfilerImplP

private final Map<JobID, JobProfilingData> registeredJobs = new HashMap<JobID, JobProfilingData>();

public JobManagerProfilerImpl()
throws ProfilingException {
public JobManagerProfilerImpl(InetAddress jobManagerbindAddress) throws ProfilingException {

// Start profiling IPC server
final int handlerCount = GlobalConfiguration.getInteger(RPC_NUM_HANDLER_KEY, DEFAULT_NUM_HANLDER);
final int rpcPort = GlobalConfiguration.getInteger(ProfilingUtils.JOBMANAGER_RPC_PORT_KEY,
ProfilingUtils.JOBMANAGER_DEFAULT_RPC_PORT);
final InetSocketAddress rpcServerAddress = new InetSocketAddress(DiscoveryService.getServiceAddress(), rpcPort);

final InetSocketAddress rpcServerAddress = new InetSocketAddress(jobManagerbindAddress, rpcPort);
Server profilingServerTmp = null;
try {

Expand Down
Loading

0 comments on commit 2421ef2

Please sign in to comment.