Skip to content

Commit

Permalink
Merge branch 'experimental'
Browse files Browse the repository at this point in the history
Conflicts:
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/execution/Environment.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/Gate.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/InputGate.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/OutputGate.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/RecordWriter.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/channels/AbstractInputChannel.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/channels/AbstractOutputChannel.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/AbstractByteBufferedOutputChannel.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedInputChannelBroker.java
	nephele/nephele-common/src/main/java/eu/stratosphere/nephele/io/channels/bytebuffered/ByteBufferedOutputChannelBroker.java
	nephele/nephele-common/src/test/java/eu/stratosphere/nephele/io/channels/bytebuffered/FileInputChannelTest.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/io/channels/FileBufferManager.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/protocols/ChannelLookupProtocol.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedChannelManager.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedInputChannelWrapper.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedOutputChannelGroup.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedOutputChannelWrapper.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnection.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionThread.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnectionThread.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/checkpointing/ChannelCheckpoint.java
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/checkpointing/EphemeralCheckpoint.java
	nephele/nephele-server/src/test/java/eu/stratosphere/nephele/taskmanager/checkpointing/ChannelCheckpointTest.java
	pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/OutputEmitter.java
  • Loading branch information
Daniel Warneke committed Jul 19, 2011
2 parents a3ef45a + 250ad2b commit 19d4e34
Show file tree
Hide file tree
Showing 69 changed files with 4,236 additions and 1,958 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@

import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.DistributionPattern;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.io.InputGate;
import eu.stratosphere.nephele.io.OutputGate;
import eu.stratosphere.nephele.io.RecordDeserializer;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
Expand Down Expand Up @@ -318,15 +321,6 @@ public void instantiateInvokable() throws Exception {
if (this.jobID == null) {
LOG.warn("jobVertexID is null");
}

// Set the vertex ID for all the gates
for (int i = 0; i < this.inputGates.size(); i++) {
this.inputGates.get(i).setJobID(this.jobID);
}

for (int i = 0; i < this.outputGates.size(); i++) {
this.outputGates.get(i).setJobID(this.jobID);
}
}

/**
Expand Down Expand Up @@ -580,16 +574,39 @@ public void read(DataInput in) throws IOException {
final int numOuputGates = in.readInt();

for (int i = 0; i < numOuputGates; i++) {
final String gateClassName = StringRecord.readString(in);
Class<? extends Record> c = null;
final String typeClassName = StringRecord.readString(in);
Class<? extends Record> type = null;
try {
c = (Class<? extends Record>) Class.forName(gateClassName, true, cl);
type = (Class<? extends Record>) Class.forName(typeClassName, true, cl);
} catch (ClassNotFoundException cnfe) {
throw new IOException("Class " + gateClassName + " not found in one of the supplied jar files: "
throw new IOException("Class " + typeClassName + " not found in one of the supplied jar files: "
+ StringUtils.stringifyException(cnfe));
}

final boolean isBroadcast = in.readBoolean();

ChannelSelector<? extends Record> channelSelector = null;
if (!isBroadcast) {

final String channelSelectorClassName = StringRecord.readString(in);
try {
channelSelector = (ChannelSelector<? extends Record>) Class.forName(channelSelectorClassName, true,
cl).newInstance();
} catch (InstantiationException e) {
throw new IOException(StringUtils.stringifyException(e));
} catch (IllegalAccessException e) {
throw new IOException(StringUtils.stringifyException(e));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.stringifyException(e));
}

channelSelector.read(in);
}



@SuppressWarnings("rawtypes")
final OutputGate<? extends Record> eog = new OutputGate(c, i);
final OutputGate<? extends Record> eog = new OutputGate(this.jobID, type, i, channelSelector, isBroadcast);
eog.read(in);
this.outputGates.add(eog);
// Mark as unbound for reconnection of RecordWriter
Expand All @@ -600,9 +617,45 @@ public void read(DataInput in) throws IOException {

for (int i = 0; i < numInputGates; i++) {

// TODO (erik) : gate.read(...) deserializes the type c anyway ...
final String deserializerClassName = StringRecord.readString(in);
RecordDeserializer<? extends Record> recordDeserializer = null;
Class<? extends RecordDeserializer<? extends Record>> deserializerClass = null;
try {
deserializerClass = (Class<? extends RecordDeserializer<? extends Record>>) cl
.loadClass(deserializerClassName);
recordDeserializer = deserializerClass.newInstance();

} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.stringifyException(e));
} catch (InstantiationException e) {
throw new IOException(StringUtils.stringifyException(e));
} catch (IllegalAccessException e) {
throw new IOException(StringUtils.stringifyException(e));
}

recordDeserializer.setClassLoader(cl);
recordDeserializer.read(in);

final String distributionPatternClassName = StringRecord.readString(in);
DistributionPattern distributionPattern = null;
Class<? extends DistributionPattern> distributionPatternClass = null;
try {
distributionPatternClass = (Class<? extends DistributionPattern>) cl
.loadClass(distributionPatternClassName);

distributionPattern = distributionPatternClass.newInstance();

} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.stringifyException(e));
} catch (InstantiationException e) {
throw new IOException(StringUtils.stringifyException(e));
} catch (IllegalAccessException e) {
throw new IOException(StringUtils.stringifyException(e));
}

@SuppressWarnings("rawtypes")
final InputGate<? extends Record> eig = new InputGate(null /* c */, i, null);
final InputGate<? extends Record> eig = new InputGate(this.jobID, recordDeserializer, i,
distributionPattern);
eig.read(in);
this.inputGates.add(eig);
// Mark as unbound for reconnection of RecordReader
Expand Down Expand Up @@ -662,13 +715,25 @@ public void write(DataOutput out) throws IOException {
// Output gates
out.writeInt(getNumberOfOutputGates());
for (int i = 0; i < getNumberOfOutputGates(); i++) {
StringRecord.writeString(out, getOutputGate(i).getType().getName());
final OutputGate<? extends Record> outputGate = getOutputGate(i);
StringRecord.writeString(out, outputGate.getType().getName());
out.writeBoolean(outputGate.isBroadcast());
if (!outputGate.isBroadcast()) {
// Write out class name of channel selector
StringRecord.writeString(out, outputGate.getChannelSelector().getClass().getName());
outputGate.getChannelSelector().write(out);
}

getOutputGate(i).write(out);
}

// Input gates
out.writeInt(getNumberOfInputGates());
for (int i = 0; i < getNumberOfInputGates(); i++) {
final InputGate<? extends Record> inputGate = getInputGate(i);
StringRecord.writeString(out, inputGate.getRecordDeserializer().getClass().getName());
inputGate.getRecordDeserializer().write(out);
StringRecord.writeString(out, inputGate.getDistributionPattern().getClass().getName());
getInputGate(i).write(out);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.nephele.io;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.event.task.EventNotificationManager;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.types.Record;
import eu.stratosphere.nephele.util.EnumUtils;

/**
* In Nephele a gate represents the connection between a user program and the processing framework. A gate
* must be connected to exactly one record reader/writer and to at least one channel. The <code>Gate</code> class itself
* is abstract. A gate automatically created for every record reader/writer in the user program. A gate can only be used
* to transport one specific type of records.
* <p>
* This class in general is not thread-safe.
*
* @author warneke
* @param <T>
* the record type to be transported from this gate
*/
public abstract class AbstractGate<T extends Record> implements IOReadableWritable {

/**
* The class of the record transported through this gate.
*/
private final Class<T> type;

/**
* The ID of the job this gate belongs to.
*/
private final JobID jobID;

/**
* The index of the gate in the list of available input/output gates.
*/
private final int index;

/**
* The event notification manager used to dispatch events.
*/
private final EventNotificationManager eventNotificationManager = new EventNotificationManager();

/**
* The type of input/output channels connected to this gate.
*/
private ChannelType channelType = ChannelType.NETWORK;

/**
* Constructs a new abstract gate
*
* @param jobID
* the ID of the job this gate belongs to
* @param type
* the ID of the job this gate belongs to
* @param index
* the index of the gate in the list of available input/output gates.
*/
protected AbstractGate(final JobID jobID, final Class<T> type, final int index) {
this.jobID = jobID;
this.type = type;
this.index = index;
}

/**
* Returns the type of record that can be transported through this gate.
*
* @return the type of record that can be transported through this gate
*/
public final Class<T> getType() {
return this.type;
}

/**
* Returns the index that has been assigned to the gate upon initialization.
*
* @return the index that has been assigned to the gate upon initialization.
*/
public final int getIndex() {
return this.index;
}

/**
* Returns the event notification manager used to dispatch events.
*
* @return the event notification manager used to dispatch events
*/
protected final EventNotificationManager getEventNotificationManager() {
return this.eventNotificationManager;
}

/**
* Checks if the gate is closed. The gate is closed if alls this associated channels are closed.
*
* @return <code>true</code> if the gate is closed, <code>false</code> otherwise
* @throws IOException
* thrown if any error occurred while closing the gate
*/
public abstract boolean isClosed() throws IOException;

/**
* Checks if the considered gate is an input gate.
*
* @return <code>true</code> if the considered gate is an input gate, <code>false</code> if it is an output gate
*/
public abstract boolean isInputGate();

/**
* {@inheritDoc}
*/
@Override
public String toString() {

return "Gate " + this.index;
}

/**
* Subscribes the listener object to receive events of the given type.
*
* @param eventListener
* the listener object to register
* @param eventType
* the type of event to register the listener for
*/
public void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {

this.eventNotificationManager.subscribeToEvent(eventListener, eventType);
}

/**
* Removes the subscription for events of the given type for the listener object.
*
* @param eventListener
* the listener object to cancel the subscription for
* @param eventType
* the type of the event to cancel the subscription for
*/
public final void unsubscribeFromEvent(final EventListener eventListener,
final Class<? extends AbstractTaskEvent> eventType) {

this.eventNotificationManager.unsubscribeFromEvent(eventListener, eventType);
}

/**
* Passes a received event on to the event notification manager so it cam ne dispatched.
*
* @param event
* the event to pass on to the notification manager
*/
public final void deliverEvent(final AbstractTaskEvent event) {

this.eventNotificationManager.deliverEvent((AbstractTaskEvent) event);
}

/**
* Publishes an event.
*
* @param event
* the event to be published
* @throws IOException
* thrown if an error occurs while transmitting the event
* @throws InterruptedException
* thrown if the thread is interrupted while waiting for the event to be published
*/
public abstract void publishEvent(final AbstractTaskEvent event) throws IOException, InterruptedException;

/**
* Sets the type of the input/output channels which are connected to this gate.
*
* @param channelType
* the type of input/output channels which are connected to this gate
*/
public final void setChannelType(final ChannelType channelType) {

this.channelType = channelType;
}

/**
* Returns the type of the input/output channels which are connected to this gate.
*
* @return the type of input/output channels which are connected to this gate
*/
public final ChannelType getChannelType() {

return this.channelType;
}

/**
* {@inheritDoc}
*/
@Override
public void read(final DataInput in) throws IOException {

this.channelType = EnumUtils.readEnum(in, ChannelType.class);
}

/**
* {@inheritDoc}
*/
@Override
public void write(final DataOutput out) throws IOException {

EnumUtils.writeEnum(out, this.channelType);
}

/**
* Returns the ID of the job this gate belongs to.
*
* @return the ID of the job this gate belongs to
*/
public JobID getJobID() {

return this.jobID;
}
}
Loading

0 comments on commit 19d4e34

Please sign in to comment.