Skip to content

Commit

Permalink
Some simple cleanups and doc updates while looking at (mostly) runtim…
Browse files Browse the repository at this point in the history
…e package

This PR consists of:

1. Remove unnecessary brackets in ExecutionVertex#getPreferredLocations
2. Throw illegal argument with proper message for consistencies of not nullable argument.
3. Add final modifier to RecordWriter#serializers since the content being used as lock.
4. Wrap too long of lines in some of the Java code for readibility.
5. Add missing JavaDoc parameter.
6. Remove final modifier in the OutputEmitter's private methods because it is redundant.

Author: Henry Saputra <[email protected]>

Closes apache#457 from hsaputra/cleanup_javadoc_and_longlines_1 and squashes the following commits:

8815302 [Henry Saputra] Some cleanups and doc updates while looking at runtime package: 1. Remove unnecessary brackets in ExecutionVertex#getPreferredLocations 2. Throw illegal argument with proper message for consistencies of not nullable argument. 3. Add final modifier to RecordWriter#serializers since the content being used as lock. 4. Wrap too long of lines in some of the Java code for readibility. 5. Add missing JavaDoc parameter. 6. Remove final modifier in the OutputEmitter's private methods because it is redundant.
  • Loading branch information
hsaputra committed Mar 6, 2015
1 parent 8f321c7 commit 27d1e7f
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void prepareForArchiving() {
*/
public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
if (scheduler == null) {
throw new NullPointerException();
throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution.");
}

final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,9 @@ public boolean isScheduleLocalOnly() {
*/
public Iterable<Instance> getPreferredLocations() {
// if we have hard location constraints, use those
{
List<Instance> constraintInstances = this.locationConstraintInstances;
if (constraintInstances != null && !constraintInstances.isEmpty()) {
return constraintInstances;
}
List<Instance> constraintInstances = this.locationConstraintInstances;
if (constraintInstances != null && !constraintInstances.isEmpty()) {
return constraintInstances;
}

// otherwise, base the preferred locations on the input connections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class RecordWriter<T extends IOReadableWritable> {
private final int numChannels;

/** {@link RecordSerializer} per outgoing channel */
private RecordSerializer<T>[] serializers;
private final RecordSerializer<T>[] serializers;

public RecordWriter(BufferWriter writer) {
this(writer, new RoundRobinChannelSelector<T>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public void setSize(int newSize) {
synchronized (recycleLock) {
ensureNotRecycled();

checkArgument(newSize >= 0 && newSize <= memorySegment.size(), "Size of buffer must be >= 0 and <= " + memorySegment.size() + ", but was " + newSize + ".");
checkArgument(newSize >= 0 && newSize <= memorySegment.size(), "Size of buffer must be >= 0 and <= " +
memorySegment.size() + ", but was " + newSize + ".");

currentSize = newSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public abstract class InputChannel {

protected final SingleInputGate inputGate;

protected InputChannel(SingleInputGate inputGate, int channelIndex, ExecutionAttemptID producerExecutionId, IntermediateResultPartitionID partitionId) {
protected InputChannel(SingleInputGate inputGate, int channelIndex, ExecutionAttemptID producerExecutionId,
IntermediateResultPartitionID partitionId) {
this.inputGate = inputGate;
this.channelIndex = channelIndex;
this.producerExecutionId = producerExecutionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ public void setBufferPool(BufferPool bufferPool) {

public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
synchronized (requestLock) {
if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null && inputChannel.getClass() == UnknownInputChannel.class) {

if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null &&
inputChannel.getClass() == UnknownInputChannel.class) {
numberOfUninitializedChannels++;
}
}
Expand Down Expand Up @@ -381,21 +381,25 @@ public static SingleInputGate create(NetworkEnvironment networkEnvironment, Part
case LOCAL:
LOG.debug("Create LocalInputChannel for {}.", partition);

inputChannels[channelIndex] = new LocalInputChannel(reader, channelIndex, producerExecutionId, partitionId, networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher());
inputChannels[channelIndex] = new LocalInputChannel(reader, channelIndex, producerExecutionId, partitionId,
networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher());

break;
case REMOTE:
LOG.debug("Create RemoteInputChannel for {}.", partition);

final RemoteAddress producerAddress = checkNotNull(partition.getProducerAddress(), "Missing producer address for remote intermediate result partition.");
final RemoteAddress producerAddress = checkNotNull(partition.getProducerAddress(),
"Missing producer address for remote intermediate result partition.");

inputChannels[channelIndex] = new RemoteInputChannel(reader, channelIndex, producerExecutionId, partitionId, producerAddress, networkEnvironment.getConnectionManager());
inputChannels[channelIndex] = new RemoteInputChannel(reader, channelIndex, producerExecutionId, partitionId,
producerAddress, networkEnvironment.getConnectionManager());

break;
case UNKNOWN:
LOG.debug("Create UnknownInputChannel for {}.", partition);

inputChannels[channelIndex] = new UnknownInputChannel(reader, channelIndex, producerExecutionId, partitionId, networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getConnectionManager());
inputChannels[channelIndex] = new UnknownInputChannel(reader, channelIndex, producerExecutionId, partitionId,
networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getConnectionManager());

break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import java.io.IOException;

/**
* The base class for all tasks able to participate in an iteration.
* The abstract base class for all tasks able to participate in an iteration.
*/
public abstract class AbstractIterativePactTask<S extends Function, OT> extends RegularPactTask<S, OT>
implements Terminable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import java.util.Map;

/**
* The abstract base class for all tasks. Encapsulated common behavior and implements the main life-cycle
* The base class for all tasks. Encapsulated common behavior and implements the main life-cycle
* of the user code.
*/
public class RegularPactTask<S extends Function, OT> extends AbstractInvokable implements PactTaskContext<S, OT> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public final int[] selectChannels(SerializationDelegate<T> record, int numberOfC

// --------------------------------------------------------------------------------------------

private final int[] robin(int numberOfChannels) {
private int[] robin(int numberOfChannels) {
if (this.channels == null || this.channels.length != 1) {
this.channels = new int[1];
}
Expand All @@ -156,7 +156,7 @@ private final int[] robin(int numberOfChannels) {
return this.channels;
}

private final int[] broadcast(int numberOfChannels) {
private int[] broadcast(int numberOfChannels) {
if (channels == null || channels.length != numberOfChannels) {
channels = new int[numberOfChannels];
for (int i = 0; i < numberOfChannels; i++) {
Expand All @@ -167,7 +167,7 @@ private final int[] broadcast(int numberOfChannels) {
return channels;
}

private final int[] hashPartitionDefault(T record, int numberOfChannels) {
private int[] hashPartitionDefault(T record, int numberOfChannels) {
if (channels == null || channels.length != 1) {
channels = new int[1];
}
Expand All @@ -189,7 +189,7 @@ else if (hash != Integer.MIN_VALUE) {
return this.channels;
}

private final int murmurHash(int k) {
private int murmurHash(int k) {
k *= 0xcc9e2d51;
k = Integer.rotateLeft(k, 15);
k *= 0x1b873593;
Expand All @@ -207,11 +207,11 @@ private final int murmurHash(int k) {
return k;
}

private final int[] rangePartition(T record, int numberOfChannels) {
private int[] rangePartition(T record, int numberOfChannels) {
throw new UnsupportedOperationException();
}

private final int[] customPartition(T record, int numberOfChannels) {
private int[] customPartition(T record, int numberOfChannels) {
if (channels == null) {
channels = new int[1];
extractedKeys = new Object[1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public static String getTemporaryFileDirectory() {
*
* @param log The logger to log the information to.
* @param componentName The component name to mention in the log.
* @param commandLineArgs The arguments accompanying the starting the component.
*/
public static void logEnvironmentInfo(Logger log, String componentName, String[] commandLineArgs) {
if (log.isInfoEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
}

try {
networkEnvironment = Some(new NetworkEnvironment(self, jobManager, timeout,
networkConfig))
networkEnvironment = Some(new NetworkEnvironment(self, jobManager, timeout, networkConfig))
} catch {
case ioe: IOException =>
log.error(ioe, "Failed to instantiate network environment.")
Expand Down

0 comments on commit 27d1e7f

Please sign in to comment.