Skip to content

Commit

Permalink
[FLINK-1245] [Java API] Introduce TypeHints for Java API operators
Browse files Browse the repository at this point in the history
Also contains fixes by [email protected]
 - Make MissingTypeInfo optional in TypeExtractor (by default still throws exception)
 - Simplified deferred evaluation of type dependend code by making evaluations lazy
 - Add call location function names to MissingTypeInfo error messages.
 - Improvements on other error messages.

This closes apache#270
  • Loading branch information
twalthr authored and StephanEwen committed Jan 8, 2015
1 parent 06503c8 commit d8dbaee
Show file tree
Hide file tree
Showing 35 changed files with 1,198 additions and 390 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import org.apache.flink.api.java.ExecutionEnvironment;

/**
* Wordcount for placing at least something into the jar file.
*
* WordCount for placing at least something into the jar file.
*/
public class WordCount {

Expand Down Expand Up @@ -98,7 +97,7 @@ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// *************************************************************************

private static boolean fileOutput = false;
private static boolean verbose = false;

private static String textPath;
private static String outputPath;

Expand All @@ -111,7 +110,7 @@ private static boolean parseParameters(String[] args) {
textPath = args[0];
outputPath = args[1];
} else if(args.length == 4 && (args[0].startsWith("-v") || args[0].startsWith("--verbose"))) { // cli line: program {optArg} {optVal} {textPath} {outputPath}
verbose = Boolean.valueOf(args[1]);
Boolean.valueOf(args[1]); // parse verbosity flag
textPath = args[2];
outputPath = args[3];
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public class DualInputSemanticProperties extends SemanticProperties {


public DualInputSemanticProperties() {
super();
this.init();
init();
}

/**
Expand Down Expand Up @@ -251,15 +250,24 @@ public FieldSet getReadFields2() {
*/
@Override
public void clearProperties() {
this.init();
super.clearProperties();
init();
}

@Override
public boolean isEmpty() {
return super.isEmpty() &&
(forwardedFields1 == null || forwardedFields1.isEmpty()) &&
(forwardedFields2 == null || forwardedFields2.isEmpty()) &&
(readFields1 == null || readFields1.size() == 0) &&
(readFields2 == null || readFields2.size() == 0);
}


private void init() {
this.forwardedFields1 = new HashMap<Integer,FieldSet>();
this.forwardedFields2 = new HashMap<Integer,FieldSet>();
this.readFields1 = null;
this.readFields2 = null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ public abstract class SemanticProperties implements Serializable {

private static final long serialVersionUID = 1L;

/**
* Set of fields that are written in the destination record(s).
*/
/** Set of fields that are written in the destination record(s).*/
private FieldSet writtenFields;


/**
* Adds, to the existing information, field(s) that are written in
* the destination record(s).
Expand Down Expand Up @@ -71,10 +70,10 @@ public FieldSet getWrittenFields() {
* Clears the object.
*/
public void clearProperties() {
this.init();
this.writtenFields = null;
}

private void init() {
this.writtenFields = null;
public boolean isEmpty() {
return this.writtenFields == null || this.writtenFields.size() == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,18 @@
* Container for the semantic properties associated to a single input operator.
*/
public class SingleInputSemanticProperties extends SemanticProperties {

private static final long serialVersionUID = 1L;

/**
* Mapping from fields in the source record(s) to fields in the destination
* record(s).
*/
/**Mapping from fields in the source record(s) to fields in the destination record(s). */
private Map<Integer,FieldSet> forwardedFields;

/**
* Set of fields that are read in the source record(s).
*/
/** Set of fields that are read in the source record(s).*/
private FieldSet readFields;


public SingleInputSemanticProperties() {
super();
this.init();
init();
}

/**
Expand Down Expand Up @@ -140,8 +135,15 @@ public FieldSet getReadFields() {
*/
@Override
public void clearProperties() {
this.init();
super.clearProperties();
init();
}

@Override
public boolean isEmpty() {
return super.isEmpty() &&
(forwardedFields == null || forwardedFields.isEmpty()) &&
(readFields == null || readFields.size() == 0);
}

private void init() {
Expand Down Expand Up @@ -206,5 +208,10 @@ public void addWrittenFields(FieldSet writtenFields) {
public void setWrittenFields(FieldSet writtenFields) {
throw new UnsupportedOperationException();
}

@Override
public boolean isEmpty() {
return false;
}
}
}
Loading

0 comments on commit d8dbaee

Please sign in to comment.