Skip to content

Commit

Permalink
Added functionality to re-initialize file format defaults for use wit…
Browse files Browse the repository at this point in the history
…h local executor.
  • Loading branch information
StephanEwen committed Mar 3, 2014
1 parent badff63 commit 719d27b
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@

package eu.stratosphere.client.minicluster;

import java.lang.reflect.Method;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
Expand All @@ -28,6 +34,8 @@

public class NepheleMiniCluster {

private static final Log LOG = LogFactory.getLog(NepheleMiniCluster.class);

private static final int DEFAULT_JM_RPC_PORT = 6498;

private static final int DEFAULT_TM_RPC_PORT = 6501;
Expand Down Expand Up @@ -152,6 +160,10 @@ public void start() throws Exception {
GlobalConfiguration.includeConfiguration(conf);
}

// force the input/output format classes to load the default values from the configuration.
// we need to do this here, because the format classes may have been initialized before the mini cluster was started
initializeIOFormatClasses();

// before we start the jobmanager, we need to make sure that there are no lingering IPC threads from before
// check that all threads are done before we return
Thread[] allThreads = new Thread[Thread.activeCount()];
Expand Down Expand Up @@ -207,6 +219,21 @@ private void waitForJobManagerToBecomeReady() throws InterruptedException {
}
}

private static void initializeIOFormatClasses() {
try {
Method im = FileInputFormat.class.getDeclaredMethod("initDefaultsFromConfiguration");
im.setAccessible(true);
im.invoke(null);

Method om = FileOutputFormat.class.getDeclaredMethod("initDefaultsFromConfiguration");
om.setAccessible(true);
om.invoke(null);
}
catch (Exception e) {
LOG.error("Cannot (re) initialize the globally loaded defaults. Some classes might mot follow the specified default behavior.");
}
}

public static Configuration getMiniclusterDefaultConfig(int jobManagerRpcPort, int taskManagerRpcPort,
int taskManagerDataPort, String hdfsConfigFile, boolean visualization,
boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,16 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
/**
* The timeout (in milliseconds) to wait for a filesystem stream to respond.
*/
static final long DEFAULT_OPENING_TIMEOUT;
private static long DEFAULT_OPENING_TIMEOUT;

static {
initDefaultsFromConfiguration();
}

private static final void initDefaultsFromConfiguration() {

final long to = GlobalConfiguration.getLong(ConfigConstants.FS_STREAM_OPENING_TIMEOUT_KEY,
ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT);
ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT);
if (to < 0) {
LOG.error("Invalid timeout value for filesystem stream opening: " + to + ". Using default value of " +
ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT);
Expand All @@ -98,6 +103,10 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
}
}

static final long getDefaultOpeningTimeout() {
return DEFAULT_OPENING_TIMEOUT;
}

// --------------------------------------------------------------------------------------------
// Variables for internal operation.
// They are all transient, because we do not want them so be serialized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ public static enum OutputDirectoryMode {

// --------------------------------------------------------------------------------------------

private static final WriteMode DEFAULT_WRITE_MODE;
private static WriteMode DEFAULT_WRITE_MODE;

private static final OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;
private static OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;


static {
private static final void initDefaultsFromConfiguration() {
final boolean overwrite = GlobalConfiguration.getBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE);
ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE);

DEFAULT_WRITE_MODE = overwrite ? WriteMode.OVERWRITE : WriteMode.CREATE;

final boolean alwaysCreateDirectory = GlobalConfiguration.getBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
Expand All @@ -69,6 +69,10 @@ public static enum OutputDirectoryMode {
DEFAULT_OUTPUT_DIRECTORY_MODE = alwaysCreateDirectory ? OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY;
}

static {
initDefaultsFromConfiguration();
}

// --------------------------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -140,7 +144,7 @@ public void configure(Configuration parameters) {
}

if (this.openTimeout == -1) {
this.openTimeout = FileInputFormat.DEFAULT_OPENING_TIMEOUT;
this.openTimeout = FileInputFormat.getDefaultOpeningTimeout();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,6 @@ public int getMaximumNumberOfSubtasks() {
final Path path = fileOutputFormat.getOutputFilePath();
final WriteMode writeMode = fileOutputFormat.getWriteMode();
final OutputDirectoryMode outDirMode = fileOutputFormat.getOutputDirectoryMode();

if (writeMode != WriteMode.OVERWRITE) {
System.err.println("WRONG OVERWRITE VALUE READ FROM FILE OUTPUT FORMAT.");
throw new RuntimeException("WRONG OVERWRITE VALUE READ FROM FILE OUTPUT FORMAT.");
}

// Prepare output path and determine max DOP
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import eu.stratosphere.client.LocalExecutor;
import eu.stratosphere.example.java.record.wordcount.WordCount;
import eu.stratosphere.test.testdata.WordCountData;
import eu.stratosphere.util.LogUtils;


public class LocalExecutorTest {
Expand All @@ -42,13 +41,13 @@ public void testLocalExecutorWithWordCount() {

// run WordCount
WordCount wc = new WordCount();
wc.getPlan("4", inFile.toURI().toString(), outFile.toURI().toString());

LocalExecutor executor = new LocalExecutor();
LocalExecutor.setLoggingLevel(Level.ERROR);
executor.setDefaultOverwriteFiles(true);

LogUtils.initializeDefaultConsoleLogger(Level.ERROR);

executor.start();

executor.executePlan(wc.getPlan("4", inFile.toURI().toString(), outFile.toURI().toString()));
executor.stop();
} catch (Exception e) {
Expand Down

0 comments on commit 719d27b

Please sign in to comment.