Skip to content

Commit

Permalink
add ContextCheck to LocalExecutor; check in HadoopDataSink for output…
Browse files Browse the repository at this point in the history
… path
  • Loading branch information
rmetzger committed Mar 31, 2014
1 parent 7479b65 commit d61d87f
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 18 deletions.
8 changes: 8 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ jdk:
- oraclejdk8
- oraclejdk7
- openjdk6

notifications:
webhooks:
urls:
- https://webhooks.gitter.im/e/d70a7e674cb9354c77b2
on_success: always # options: [always|never|change] default: always
on_failure: always # options: [always|never|change] default: always

env:
global:
# username and password for sonatype (maven delpoy)
Expand Down
6 changes: 6 additions & 0 deletions stratosphere-addons/hadoop-compatibility/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
<artifactId>stratosphere-clients</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-tests</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-tests</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@
package eu.stratosphere.hadoopcompatibility;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.compiler.contextcheck.Validatable;
import eu.stratosphere.hadoopcompatibility.datatypes.DefaultStratosphereTypeConverter;
import eu.stratosphere.hadoopcompatibility.datatypes.StratosphereTypeConverter;

import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;

Expand All @@ -36,20 +41,14 @@
*
* The HadoopDataSink provides a default converter: {@link eu.stratosphere.hadoopcompatibility.datatypes.DefaultStratosphereTypeConverter}
**/
public class HadoopDataSink<K,V> extends GenericDataSink {
public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable {

private static String DEFAULT_NAME = "<Unnamed Hadoop Data Sink>";

private JobConf jobConf;

public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator input, StratosphereTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
super(new HadoopOutputFormatWrapper<K,V>(hadoopFormat, jobConf, conv),input, name);
Preconditions.checkNotNull(hadoopFormat);
Preconditions.checkNotNull(jobConf);
this.name = name;
this.jobConf = jobConf;
jobConf.setOutputKeyClass(keyClass);
jobConf.setOutputValueClass(valueClass);
this(hadoopFormat, jobConf, name, ImmutableList.of(input), conv, keyClass, valueClass);
}

public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator input, Class<K> keyClass, Class<V> valueClass) {
Expand Down Expand Up @@ -92,4 +91,10 @@ public JobConf getJobConf() {
return this.jobConf;
}

@Override
public void check() {
// see for more details https://github.com/stratosphere/stratosphere/pull/531
Preconditions.checkNotNull(FileOutputFormat.getOutputPath(jobConf), "The HadoopDataSink currently expects a correct outputPath.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@

package eu.stratosphere.hadoopcompatibility;

import eu.stratosphere.api.common.io.OutputFormat;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.hadoopcompatibility.datatypes.StratosphereTypeConverter;
import eu.stratosphere.runtime.fs.hdfs.DistributedFileSystem;
import eu.stratosphere.types.Record;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;
import eu.stratosphere.api.common.io.OutputFormat;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.hadoopcompatibility.datatypes.StratosphereTypeConverter;
import eu.stratosphere.types.Record;


public class HadoopOutputFormatWrapper<K,V> implements OutputFormat<Record> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.PactCompiler;
import eu.stratosphere.compiler.contextcheck.ContextChecker;
import eu.stratosphere.compiler.dag.DataSinkNode;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plandump.PlanJSONDumpGenerator;
Expand Down Expand Up @@ -187,6 +188,9 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
if (plan == null)
throw new IllegalArgumentException("The plan may not be null.");

ContextChecker checker = new ContextChecker();
checker.check(plan);

synchronized (this.lock) {

// check if we start a session dedicated for this execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public boolean preVisit(Operator node) {
} else if (node instanceof DualInputOperator<?>) {
checkDualInputContract((DualInputOperator<?>) node);
}
if(node instanceof Validatable) {
((Validatable) node).check();
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/***********************************************************************************************************************
* Copyright (C) 2010-2014 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/

package eu.stratosphere.compiler.contextcheck;

/**
* Operators implementing this interface
* will be called from the {@link ContextChecker} during
* the compilation process.
*
*/
public interface Validatable {
public void check();
}

0 comments on commit d61d87f

Please sign in to comment.