Skip to content

Commit

Permalink
Moved Distribution Pattern out of User Code
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp Richter committed Apr 5, 2012
1 parent 6e82eb4 commit e3f5597
Show file tree
Hide file tree
Showing 41 changed files with 159 additions and 319 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.DistributionPattern;
import eu.stratosphere.nephele.io.GateID;
import eu.stratosphere.nephele.io.InputGate;
import eu.stratosphere.nephele.io.OutputGate;
Expand Down Expand Up @@ -169,8 +168,7 @@ OutputGate<? extends Record> createOutputGate(GateID gateID, Class<? extends Rec
* @param distributionPattern
* @return the created input gate
*/
InputGate<? extends Record> createInputGate(GateID gateID, RecordDeserializer<? extends Record> deserializer,
DistributionPattern distributionPattern);
InputGate<? extends Record> createInputGate(GateID gateID, RecordDeserializer<? extends Record> deserializer);

/**
* Registers an output gate with this environment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ public abstract class AbstractRecordReader<T extends Record> {
private final Environment environment;

protected AbstractRecordReader(final AbstractInvokable invokable, final RecordDeserializer<T> deserializer,
final int inputGateID, final DistributionPattern distributionPattern) {
final int inputGateID) {

this.environment = invokable.getEnvironment();
connectInputGate(deserializer, inputGateID, distributionPattern);
connectInputGate(deserializer, inputGateID);
}

/**
Expand All @@ -59,15 +59,14 @@ protected AbstractRecordReader(final AbstractInvokable invokable, final RecordDe
*/
// TODO: See if type safety can be improved here
@SuppressWarnings("unchecked")
private void connectInputGate(final RecordDeserializer<T> deserializer, final int inputGateID,
final DistributionPattern distributionPattern) {
private void connectInputGate(final RecordDeserializer<T> deserializer, final int inputGateID) {

GateID gateID = this.environment.getNextUnboundInputGateID();
if (gateID == null) {
gateID = new GateID();
}

this.inputGate = (InputGate<T>) this.environment.createInputGate(gateID, deserializer, distributionPattern);
this.inputGate = (InputGate<T>) this.environment.createInputGate(gateID, deserializer);
this.environment.registerInputGate(this.inputGate);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,7 @@
*
* @author warneke
*/
public interface DistributionPattern {

/**
* Checks if two subtasks of different tasks should be wired.
*
* @param nodeLowerStage
* the index of the producing task's subtask
* @param nodeUpperStage
* the index of the consuming task's subtask
* @param sizeSetLowerStage
* the number of subtasks of the producing task
* @param sizeSetUpperStage
* the number of subtasks of the consuming task
* @return <code>true</code> if a wire between the two considered subtasks should be created, <code>false</code>
* otherwise
*/
boolean createWire(int nodeLowerStage, int nodeUpperStage, int sizeSetLowerStage, int sizeSetUpperStage);
}
public enum DistributionPattern {
BIPARTITE, POINTWISE, STAR
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,6 @@ public interface InputGate<T extends Record> extends Gate<T> {
*/
void close() throws IOException, InterruptedException;

/**
* Returns the {@link DistributionPattern} associated with this input gate.
*
* @return the {@link DistributionPattern} associated with this input gate
*/
DistributionPattern getDistributionPattern();

/**
* Creates a new network input channel and assigns it to the given input gate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,20 @@ public class MutableRecordReader<T extends Record> extends AbstractRecordReader<
* @param distributionPattern
* the {@link DistributionPattern} that should be used for rewiring
*/
public MutableRecordReader(final AbstractTask taskBase, final DistributionPattern distributionPattern) {
public MutableRecordReader(final AbstractTask taskBase) {

super(taskBase, new MutableRecordDeserializer<T>(), 0, distributionPattern);
super(taskBase, new MutableRecordDeserializer<T>(), 0);
}

/**
* Constructs a new record reader and registers a new input gate with the application's environment.
*
* @param outputBase
* the application that instantiated the record reader
* @param distributionPattern
* the {@link DistributionPattern} that should be used for rewiring
*/
public MutableRecordReader(final AbstractOutputTask outputBase, final DistributionPattern distributionPattern) {

super(outputBase, new MutableRecordDeserializer<T>(), 0, distributionPattern);
}

/**
* Constructs a new record reader and registers a new input gate with the application's environment.
*
* @param taskBase
* the application that instantiated the record reader
*/
public MutableRecordReader(final AbstractTask taskBase) {
public MutableRecordReader(final AbstractOutputTask outputBase) {

super(taskBase, new MutableRecordDeserializer<T>(), 0, null);
super(outputBase, new MutableRecordDeserializer<T>(), 0);
}

/**
Expand All @@ -69,22 +56,7 @@ public MutableRecordReader(final AbstractTask taskBase) {
*/
public MutableRecordReader(final AbstractTask taskBase, final int inputGateID) {

super(taskBase, new MutableRecordDeserializer<T>(), inputGateID, null);
}

/**
* Constructs a new record reader and registers a new input gate with the application's environment.
*
* @param outputBase
* the application that instantiated the record reader
* @param inputGateID
* @param distributionPattern
* the {@link DistributionPattern} that should be used for rewiring
*/
public MutableRecordReader(final AbstractTask outputBase, final int inputGateID,
final DistributionPattern distributionPattern) {

super(outputBase, new MutableRecordDeserializer<T>(), inputGateID, distributionPattern);
super(taskBase, new MutableRecordDeserializer<T>(), inputGateID);
}

/**
Expand All @@ -96,10 +68,9 @@ public MutableRecordReader(final AbstractTask outputBase, final int inputGateID,
* @param distributionPattern
* the {@link DistributionPattern} that should be used for rewiring
*/
public MutableRecordReader(final AbstractOutputTask outputBase, int inputGateID,
DistributionPattern distributionPattern) {
public MutableRecordReader(final AbstractOutputTask outputBase, int inputGateID) {

super(outputBase, new MutableRecordDeserializer<T>(), inputGateID, distributionPattern);
super(outputBase, new MutableRecordDeserializer<T>(), inputGateID);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public class RecordReader<T extends Record> extends AbstractRecordReader<T> impl
* @param distributionPattern
* the {@link DistributionPattern} that should be used for rewiring
*/
public RecordReader(AbstractTask taskBase, Class<T> inputClass, DistributionPattern distributionPattern) {
public RecordReader(AbstractTask taskBase, Class<T> inputClass) {

super(taskBase, new DefaultRecordDeserializer<T>(inputClass), 0, distributionPattern);
super(taskBase, new DefaultRecordDeserializer<T>(inputClass), 0);
}

/**
Expand All @@ -74,9 +74,9 @@ public RecordReader(AbstractTask taskBase, Class<T> inputClass, DistributionPatt
* @param distributionPattern
* the {@link DistributionPattern} that should be used for rewiring
*/
public RecordReader(AbstractOutputTask outputBase, Class<T> inputClass, DistributionPattern distributionPattern) {
public RecordReader(AbstractOutputTask outputBase, Class<T> inputClass) {

super(outputBase, new DefaultRecordDeserializer<T>(inputClass), 0, distributionPattern);
super(outputBase, new DefaultRecordDeserializer<T>(inputClass), 0);
}

/**
Expand All @@ -91,7 +91,7 @@ public RecordReader(AbstractOutputTask outputBase, Class<T> inputClass, Distribu
*/
public RecordReader(AbstractTask taskBase, RecordDeserializer<T> deserializer) {

super(taskBase, deserializer, 0, null);
super(taskBase, deserializer, 0);
}

/**
Expand All @@ -106,23 +106,7 @@ public RecordReader(AbstractTask taskBase, RecordDeserializer<T> deserializer) {
*/
public RecordReader(AbstractTask taskBase, RecordDeserializer<T> deserializer, int inputGateID) {

super(taskBase, deserializer, inputGateID, null);
}

/**
* Constructs a new record reader and registers a new input gate with the application's environment.
*
* @param taskBase
* the application that instantiated the record reader
* @param inputClass
* the class of records that can be read from the record reader
* @param distributionPattern
* the {@link DistributionPattern} that should be used for rewiring
*/
public RecordReader(AbstractTask taskBase, RecordDeserializer<T> deserializer,
DistributionPattern distributionPattern) {

super(taskBase, deserializer, 0, distributionPattern);
super(taskBase, deserializer, inputGateID);
}

/**
Expand All @@ -135,10 +119,9 @@ public RecordReader(AbstractTask taskBase, RecordDeserializer<T> deserializer,
* @param distributionPattern
* the {@link DistributionPattern} that should be used for rewiring
*/
public RecordReader(AbstractTask outputBase, RecordDeserializer<T> deserializer, int inputGateID,
DistributionPattern distributionPattern) {
public RecordReader(AbstractOutputTask outputBase, RecordDeserializer<T> deserializer) {

super(outputBase, deserializer, inputGateID, distributionPattern);
super(outputBase, deserializer, 0);
}

/**
Expand All @@ -151,26 +134,9 @@ public RecordReader(AbstractTask outputBase, RecordDeserializer<T> deserializer,
* @param distributionPattern
* the {@link DistributionPattern} that should be used for rewiring
*/
public RecordReader(AbstractOutputTask outputBase, RecordDeserializer<T> deserializer,
DistributionPattern distributionPattern) {
public RecordReader(AbstractOutputTask outputBase, RecordDeserializer<T> deserializer, int inputGateID) {

super(outputBase, deserializer, 0, distributionPattern);
}

/**
* Constructs a new record reader and registers a new input gate with the application's environment.
*
* @param outputBase
* the application that instantiated the record reader
* @param inputClass
* the class of records that can be read from the record reader
* @param distributionPattern
* the {@link DistributionPattern} that should be used for rewiring
*/
public RecordReader(AbstractOutputTask outputBase, RecordDeserializer<T> deserializer, int inputGateID,
DistributionPattern distributionPattern) {

super(outputBase, deserializer, inputGateID, distributionPattern);
super(outputBase, deserializer, inputGateID);
}

/**
Expand Down Expand Up @@ -232,4 +198,3 @@ public T next() throws IOException, InterruptedException {
return retVal;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void invoke() throws Exception {

@Override
public void registerInputOutput() {
input = new RecordReader<FileRecord>(this, FileRecord.class, null);
input = new RecordReader<FileRecord>(this, FileRecord.class);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import eu.stratosphere.nephele.fs.FileStatus;
import eu.stratosphere.nephele.fs.FileSystem;
import eu.stratosphere.nephele.fs.Path;
import eu.stratosphere.nephele.io.PointwiseDistributionPattern;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.template.AbstractFileOutputTask;
import eu.stratosphere.nephele.types.StringRecord;
Expand Down Expand Up @@ -71,7 +70,7 @@ public void invoke() throws Exception {
*/
@Override
public void registerInputOutput() {
this.input = new RecordReader<StringRecord>(this, StringRecord.class, new PointwiseDistributionPattern());
this.input = new RecordReader<StringRecord>(this, StringRecord.class);
}

/**
Expand Down
Loading

0 comments on commit e3f5597

Please sign in to comment.