Skip to content

Commit

Permalink
Merge branch 'stage1_version02' into version02
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Jul 13, 2012
2 parents d1ee396 + e980698 commit d6a8775
Show file tree
Hide file tree
Showing 50 changed files with 2,426 additions and 1,605 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import eu.stratosphere.pact.runtime.plugable.PactRecordComparator;
import eu.stratosphere.pact.runtime.plugable.PactRecordSerializer;
import eu.stratosphere.pact.runtime.sort.UnilateralSortMerger;
import eu.stratosphere.pact.runtime.task.ReduceTask;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.pact.runtime.util.KeyComparator;

Expand Down Expand Up @@ -290,11 +289,15 @@ private Iterator<PactRecord> createSortedIterator(final InputFileIterator inputF
if (stackTrace[index].getClassName().contains("Test"))
testName.append(stackTrace[index].toString());
// instantiate a sort-merger
AbstractTask parentTask = new ReduceTask<PactRecord, PactRecord>() {
AbstractTask parentTask = new AbstractTask() {
@Override
public String toString() {
return "TestPair Sorter " + testName;
}
@Override
public void registerInputOutput() {}
@Override
public void invoke() throws Exception {}
};

if (info == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.pact.common.stubs;


/**
* The execution context provides basic information about the parallel runtime context
* in which a stub instance lives. Such information includes the current number of
* parallel stub instances, the stub's parallel task index, the pact name, or the iteration context.
*
* @author Stephan Ewen
*/
public interface ExecutionContext
{
/**
* Gets the name of the task. This is the name supplied to the contract upon instantiation. If
* no name was given at contract instantiation time, a default name will be returned.
*
* @return The task's name.
*/
String getTaskName();

/**
* Gets the number of parallel subtasks in which the stub is executed.
*
* @return The number of parallel subtasks in which the stub is executed.
*/
int getNumberOfSubtasks();

/**
* Gets the subtask's parallel task number.
*
* @return The subtask's parallel task number.
*/
int getSubtaskIndex();
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public <T extends Value> T getField(final int fieldNum, final Class<T> type)
{
// range check
if (fieldNum < 0 || fieldNum >= this.numFields) {
throw new IndexOutOfBoundsException(fieldNum+" for range [0.."+(this.numFields-1)+"]");
throw new IndexOutOfBoundsException(fieldNum + " for range [0.." + (this.numFields - 1) + "]");
}

// get offset and check for null
Expand Down Expand Up @@ -1064,110 +1064,95 @@ public void read(DataInput in) throws IOException
initFields(data, 0, len);
}

private final void initFields(byte[] data, int begin, int len)
private final void initFields(final byte[] data, final int begin, final int len)
{
// read number of fields, variable length encoded reverse at the back
int pos = begin + len - 2;
int numFields = data[begin + len - 1] & 0xFF;
if (numFields >= MAX_BIT) {
int shift = 7;
int curr;
numFields = numFields & 0x7f;
while ((curr = data[pos--]) >= MAX_BIT) {
numFields |= (curr & 0x7f) << shift;
shift += 7;
try {
// read number of fields, variable length encoded reverse at the back
int pos = begin + len - 2;
int numFields = data[begin + len - 1] & 0xFF;
if (numFields >= MAX_BIT) {
int shift = 7;
int curr;
numFields = numFields & 0x7f;
while ((curr = data[pos--]) >= MAX_BIT) {
numFields |= (curr & 0x7f) << shift;
shift += 7;
}
numFields |= curr << shift;
}
numFields |= curr << shift;
}
this.numFields = numFields;

// ensure that all arrays are there and of sufficient size
if (this.offsets == null || this.offsets.length < numFields) {
this.offsets = new int[numFields];
}
if (this.lengths == null || this.lengths.length < numFields) {
this.lengths = new int[numFields];
}
if (this.readFields == null || this.readFields.length < numFields) {
this.readFields = new Value[numFields];
}
if (this.writeFields == null || this.writeFields.length < numFields) {
this.writeFields = new Value[numFields];
}

final int beginMasks = pos; // beginning of bitmap for null fields
final int fieldsBy8 = (numFields >>> 3) + ((numFields & 0x7) == 0 ? 0 : 1);

pos = beginMasks - fieldsBy8;
int lastNonNullField = -1;

for (int field = 0, chunk = 0; chunk < fieldsBy8; chunk++) {
int mask = data[beginMasks - chunk];
for (int i = 0; i < 8 && field < numFields; i++, field++) {
if ((mask & 0x1) == 0x1) {
// not null, so read the offset value, if we are not the first non-null fields
if (lastNonNullField >= 0) {
// offset value is variable length encoded
int start = data[pos--] & 0xff;
if (start >= MAX_BIT) {
int shift = 7;
int curr;
start = start & 0x7f;
while ((curr = data[pos--] & 0xff) >= MAX_BIT) {
start |= (curr & 0x7f) << shift;
shift += 7;
this.numFields = numFields;
// ensure that all arrays are there and of sufficient size
if (this.offsets == null || this.offsets.length < numFields) {
this.offsets = new int[numFields];
}
if (this.lengths == null || this.lengths.length < numFields) {
this.lengths = new int[numFields];
}
if (this.readFields == null || this.readFields.length < numFields) {
this.readFields = new Value[numFields];
}
if (this.writeFields == null || this.writeFields.length < numFields) {
this.writeFields = new Value[numFields];
}

final int beginMasks = pos; // beginning of bitmap for null fields
final int fieldsBy8 = (numFields >>> 3) + ((numFields & 0x7) == 0 ? 0 : 1);
pos = beginMasks - fieldsBy8;
int lastNonNullField = -1;
for (int field = 0, chunk = 0; chunk < fieldsBy8; chunk++) {
int mask = data[beginMasks - chunk];
for (int i = 0; i < 8 && field < numFields; i++, field++) {
if ((mask & 0x1) == 0x1) {
// not null, so read the offset value, if we are not the first non-null fields
if (lastNonNullField >= 0) {
// offset value is variable length encoded
int start = data[pos--] & 0xff;
if (start >= MAX_BIT) {
int shift = 7;
int curr;
start = start & 0x7f;
while ((curr = data[pos--] & 0xff) >= MAX_BIT) {
start |= (curr & 0x7f) << shift;
shift += 7;
}
start |= curr << shift;
}
start |= curr << shift;
this.offsets[field] = start + begin;
this.lengths[lastNonNullField] = start + begin - this.offsets[lastNonNullField];
}
else {
this.offsets[field] = begin;
}
this.offsets[field] = start + begin;
this.lengths[lastNonNullField] = start + begin - this.offsets[lastNonNullField];
lastNonNullField = field;
}
else {
this.offsets[field] = begin;
// field is null
this.offsets[field] = NULL_INDICATOR_OFFSET;
}
lastNonNullField = field;
mask >>= 1;
}
else {
// field is null
this.offsets[field] = NULL_INDICATOR_OFFSET;
}
mask >>= 1;
}
if (lastNonNullField >= 0) {
this.lengths[lastNonNullField] = pos - this.offsets[lastNonNullField] + 1;
}
this.firstModifiedPos = Integer.MAX_VALUE;
}
if (lastNonNullField >= 0) {
this.lengths[lastNonNullField] = pos - this.offsets[lastNonNullField] + 1;
}
this.firstModifiedPos = Integer.MAX_VALUE;
}

/**
* @param fields
* @param holders
* @param binData
* @param offset
* @return
*/
public boolean readBinary(int[] fields, Value[] holders, byte[] binData, int offset)
{
// read the length
int val = binData[offset++] & 0xff;
if (val >= MAX_BIT) {
int shift = 7;
int curr;
val = val & 0x7f;
while ((curr = binData[offset++] & 0xff) >= MAX_BIT) {
val |= (curr & 0x7f) << shift;
shift += 7;
catch (ArrayIndexOutOfBoundsException aioobex) {
StringBuilder bld = new StringBuilder(len * 4 + 64);
bld.append("Record deserialization error: Record byte signature: ");

for (int i = 0; i < len; i++) {
int num = data[i + begin] & 0xff;
bld.append(num);
if (i < len - 1) {
bld.append(',');
}
}
val |= curr << shift;
throw new RuntimeException(bld.toString(), aioobex);
}

// initialize the fields
this.binaryData = binData;
initFields(binData, offset, val);

// get the values
return getFieldsInto(fields, holders);
}

/**
Expand Down Expand Up @@ -1203,25 +1188,7 @@ public long serialize(DataOutputView target)
*/
public void deserialize(DataInputView source) throws IOException
{
int val = source.readUnsignedByte();
if (val >= MAX_BIT) {
int shift = 7;
int curr;
val = val & 0x7f;
while ((curr = source.readUnsignedByte()) >= MAX_BIT) {
val |= (curr & 0x7f) << shift;
shift += 7;
}
val |= curr << shift;
}
this.binaryLen = val;

// read the binary representation
if (this.binaryData == null || this.binaryData.length < this.binaryLen) {
this.binaryData = new byte[this.binaryLen];
}
source.readFully(this.binaryData, 0, this.binaryLen);
initFields(this.binaryData, 0, this.binaryLen);
read(source);
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void write(final DataOutput out) throws IOException {
@Override
public int compareTo(final Key o) {
if (!(o instanceof PactInteger))
throw new ClassCastException("Cannot compare " + o.getClass().getName() + " to N_Integer!");
throw new ClassCastException("Cannot compare " + o.getClass().getName() + " to PactInteger!");

final int other = ((PactInteger) o).value;

Expand All @@ -128,7 +128,7 @@ public int hashCode() {
@Override
public boolean equals(final Object obj) {
if (obj instanceof PactInteger) {
return ((PactInteger) obj).value == value;
return ((PactInteger) obj).value == this.value;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,6 @@ public void setValue(CharBuffer buffer)
this.hashCode = 0;
}


public void setValueUTF8(byte[] bytes, int offset, int len) {
throw new UnsupportedOperationException();
}


/**
* Sets the value of this <code>PactString</code>, assuming that the binary data is ASCII coded. The n-th character of the
* <code>PactString</code> corresponds directly to the n-th byte in the given array after the offset.
Expand Down
Loading

0 comments on commit d6a8775

Please sign in to comment.