Skip to content

Commit

Permalink
Merge branch 'master' into version02
Browse files Browse the repository at this point in the history
Conflicts:
	nephele/nephele-server/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ByteBufferedChannelManager.java
  • Loading branch information
Daniel Warneke committed Jul 29, 2011
2 parents 1c47a63 + 4965721 commit 85baafd
Show file tree
Hide file tree
Showing 36 changed files with 409 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,23 @@ public void close() throws IOException, InterruptedException {
releasedConsumedReadBuffer();
}

// This code fragment makes sure the isClosed method works in case the channel input has not been fully consumed
if (this.getType() == ChannelType.NETWORK) {
synchronized (this.synchronisationObject) {
if (!this.brokerAggreedToCloseChannel) {
while (!this.brokerAggreedToCloseChannel) {

requestReadBuffersFromBroker();
if (this.uncompressedDataBuffer != null || this.compressedDataBuffer != null) {
releasedConsumedReadBuffer();
}
this.synchronisationObject.wait(500);
}
this.bufferedRecord = null;
}
}
}

/*
* Send close event to indicate the input channel has successfully
* processed all data it is interested in.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
import eu.stratosphere.nephele.types.StringRecord;
import eu.stratosphere.nephele.util.StringUtils;


/**
*
*/
public class JobGenericInputVertex extends JobInputVertex
{
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.List;

import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.instance.AllocatedResource;
import eu.stratosphere.nephele.instance.InstanceListener;
import eu.stratosphere.nephele.jobgraph.JobID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import java.net.InetSocketAddress;

import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobSubmissionResult;
import eu.stratosphere.nephele.configuration.ConfigConstants;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.fs.Path;
import eu.stratosphere.nephele.io.channels.ChannelType;
Expand Down Expand Up @@ -66,7 +64,7 @@ public static void main(String[] args) {
}

// Create jar file and attach it
final File jarFile = new File("/tmp/broadcastJob.jar");
final File jarFile = new File("/tmp/grepJob.jar");
final JarFileCreator jarFileCreator = new JarFileCreator(jarFile);
jarFileCreator.addClass(GrepTask.class);

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
/**
* This abstract scheduler must be extended by a scheduler implementations for Nephele. The abstract class defines the
* fundamental methods for scheduling and removing jobs. While Nephele's
* {@link eu.stratosphere.nephele.jobmanager.JobManager} is responsible for requesting the required instances for the
* job at the {@link eu.stratosphere.nephele.instance.InstanceManager}, the scheduler is in charge of assigning the
* individual tasks to the instances.
*
* @author warneke
*/
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.nephele.jobmanager.scheduler.queue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ public void unregisterJob(final ExecutionGraph executionGraph) {
while (it.hasNext()) {

final ExecutionGroupVertex groupVertex = it.next();
final InputSplit[] inputSplits = groupVertex.getInputSplits();

if (inputSplits == null) {
continue;
}

if (inputSplits.length == 0) {
continue;
}

final InputSplitAssigner assigner = this.assignerCache.remove(groupVertex);
if (assigner == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.pact.common.contract;

import eu.stratosphere.pact.common.type.Key;
Expand Down
Loading

0 comments on commit 85baafd

Please sign in to comment.