Skip to content

Commit

Permalink
Refactored input/output gates
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Nov 19, 2011
1 parent f45bdbe commit 22f54d3
Show file tree
Hide file tree
Showing 20 changed files with 1,378 additions and 1,200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
package eu.stratosphere.nephele.execution;

import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.DistributionPattern;
import eu.stratosphere.nephele.io.InputGate;
import eu.stratosphere.nephele.io.OutputGate;
import eu.stratosphere.nephele.io.RecordDeserializer;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
Expand Down Expand Up @@ -154,18 +157,23 @@ public interface Environment {
int getNumberOfInputGates();

/**
* Registers an output gate with the environment.
* Creates and registers an output gate with the environment.
*
* @param outputGate
* the output gate to be registered with the environment
* @param outputClass
* @param selector
* @param isBroadcast
* @return the created output gate
*/
void registerOutputGate(final OutputGate<? extends Record> outputGate);
OutputGate<? extends Record> createAndRegisterOutputGate(Class<? extends Record> outputClass,
ChannelSelector<? extends Record> selector, boolean isBroadcast);

/**
* Registers an input gate with the environment.
* Creates and registers an input gate with the environment.
*
* @param inputGate
* the input gate to be registered with the environment
* @param deserializer
* @param distributionPattern
* @return the created input gate
*/
void registerInputGate(final InputGate<? extends Record> inputGate);
InputGate<? extends Record> createAndRegisterInputGate(RecordDeserializer<? extends Record> deserializer,
DistributionPattern distributionPattern);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ public class LocalDataInputStream extends FSDataInputStream {
*/
private FileInputStream fis = null;

/**
* The current position in the stream.
*/
private long position = 0;

/**
* Constructs a new <code>LocalDataInputStream</code> object from a given {@link File} object.
*
Expand All @@ -50,7 +45,6 @@ public class LocalDataInputStream extends FSDataInputStream {
public LocalDataInputStream(final File file) throws IOException {

this.fis = new FileInputStream(file);
this.position = 0;
}

/**
Expand All @@ -60,7 +54,6 @@ public LocalDataInputStream(final File file) throws IOException {
public void seek(final long desired) throws IOException {

this.fis.getChannel().position(desired);
this.position = desired;
}

/**
Expand All @@ -69,12 +62,7 @@ public void seek(final long desired) throws IOException {
@Override
public int read() throws IOException {

final int value = this.fis.read();
if (value >= 0) {
this.position++;
}

return value;
return this.fis.read();
}

/**
Expand All @@ -83,12 +71,7 @@ public int read() throws IOException {
@Override
public int read(final byte[] buffer, final int offset, final int length) throws IOException {

final int value = this.fis.read(buffer, offset, length);
if (value > 0) {
this.position += value;
}

return value;
return this.fis.read(buffer, offset, length);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ public class LocalDataOutputStream extends FSDataOutputStream {
*/
private FileOutputStream fos = null;

/**
* The current position in the output stream.
*/
private long position;

/**
* Constructs a new <code>LocalDataOutputStream</code> object from a given {@link File} object.
*
Expand All @@ -50,7 +45,6 @@ public class LocalDataOutputStream extends FSDataOutputStream {
public LocalDataOutputStream(final File file) throws IOException {

this.fos = new FileOutputStream(file);
this.position = 0;
}

/**
Expand All @@ -59,7 +53,6 @@ public LocalDataOutputStream(final File file) throws IOException {
@Override
public void write(final int b) throws IOException {
fos.write(b);
position++;
}

/**
Expand All @@ -68,7 +61,6 @@ public void write(final int b) throws IOException {
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
fos.write(b, off, len);
position += len; // update position
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,8 @@ private void connectInputGate(final RecordDeserializer<T> deserializer, final in
if (this.environment.hasUnboundInputGates()) {
this.inputGate = (InputGate<T>) this.environment.getUnboundInputGate(inputGateID);
} else {
this.inputGate = new InputGate<T>(environment.getJobID(), new GateID(), deserializer,
this.environment.getNumberOfInputGates(),
this.inputGate = (InputGate<T>) this.environment.createAndRegisterInputGate(deserializer,
distributionPattern);
this.environment.registerInputGate(this.inputGate);
}
}

Expand Down Expand Up @@ -102,17 +100,6 @@ public final boolean isInputChannelClosed(final int index) throws IOException, I
return false;
}

/**
* Registers a new listener object with the assigned input gate.
*
* @param inputGateListener
* the listener object to register
*/
public final void registerInputGateListener(final InputGateListener inputGateListener) {

this.inputGate.registerInputGateListener(inputGateListener);
}

/**
* Subscribes the listener object to receive events of the given type.
*
Expand All @@ -121,7 +108,8 @@ public final void registerInputGateListener(final InputGateListener inputGateLis
* @param eventType
* the type of event to register the listener for
*/
public final void subscribeToEvent(final EventListener eventListener, final Class<? extends AbstractTaskEvent> eventType) {
public final void subscribeToEvent(final EventListener eventListener,
final Class<? extends AbstractTaskEvent> eventType) {

// Delegate call to input gate
this.inputGate.subscribeToEvent(eventListener, eventType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ private void connectOutputGate(Class<T> outputClass, ChannelSelector<T> selector

this.outputGate = eog;
} else {
this.outputGate = new OutputGate<T>(environment.getJobID(), new GateID(), outputClass,
this.environment.getNumberOfOutputGates(), selector, isBroadcast);
this.environment.registerOutputGate(this.outputGate);
this.outputGate = (OutputGate<T>) this.environment.createAndRegisterOutputGate(outputClass, selector,
isBroadcast);
}
}

Expand All @@ -103,7 +102,7 @@ private void connectOutputGate(Class<T> outputClass, ChannelSelector<T> selector
* @throws IOException
* Thrown on an error that may happen during the transfer of the given record or a previous record.
*/
public void emit(T record) throws IOException, InterruptedException {
public void emit(final T record) throws IOException, InterruptedException {

// Simply pass record through to the corresponding output gate
this.outputGate.writeRecord(record);
Expand All @@ -118,17 +117,6 @@ public List<AbstractOutputChannel<T>> getOutputChannels() {
return this.outputGate.getOutputChannels();
}

/**
* Registers a new listener object with the assigned output gate.
*
* @param inputGateListener
* the listener object to register
*/
public void registerOutputGateListener(OutputGateListener outputGateListener) {

this.outputGate.registerOutputGateListener(outputGateListener);
}

// TODO (en)
public OutputGate<T> getOutputGate() {
return outputGate;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.nephele.io;

import java.io.IOException;

import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.types.Record;

public interface Gate<T extends Record> {

/**
* Subscribes the listener object to receive events of the given type.
*
* @param eventListener
* the listener object to register
* @param eventType
* the type of event to register the listener for
*/
void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType);

/**
* Removes the subscription for events of the given type for the listener object.
*
* @param eventListener
* the listener object to cancel the subscription for
* @param eventType
* the type of the event to cancel the subscription for
*/
void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType);

/**
* Publishes an event.
*
* @param event
* the event to be published
* @throws IOException
* thrown if an error occurs while transmitting the event
* @throws InterruptedException
* thrown if the thread is interrupted while waiting for the event to be published
*/
void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException;

/**
* Passes a received event on to the event notification manager so it cam ne dispatched.
*
* @param event
* the event to pass on to the notification manager
*/
void deliverEvent(AbstractTaskEvent event);

/**
* Returns the ID of the job this gate belongs to.
*
* @return the ID of the job this gate belongs to
*/
JobID getJobID();

/**
* Returns the type of the input/output channels which are connected to this gate.
*
* @return the type of input/output channels which are connected to this gate
*/
ChannelType getChannelType();
}
Loading

0 comments on commit 22f54d3

Please sign in to comment.