Skip to content

Commit

Permalink
[FLINK-1137] Enhance MutableObjectIterator with non-reuse next()
Browse files Browse the repository at this point in the history
This is in preparation for configurable object-reuse mode. We previously
referred to this as mutable object vs. mutable object safe mode or some
such thing.
  • Loading branch information
aljoscha authored and StephanEwen committed Jan 7, 2015
1 parent 2499294 commit 3832d7b
Show file tree
Hide file tree
Showing 40 changed files with 884 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;

/**
* A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
Expand All @@ -31,15 +32,15 @@ public class CoReaderIterator<T1, T2> {
private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the
// source

protected final DeserializationDelegate<T1> delegate1;
protected final DeserializationDelegate<T2> delegate2;
protected final ReusingDeserializationDelegate<T1> delegate1;
protected final ReusingDeserializationDelegate<T2> delegate2;

public CoReaderIterator(
CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader,
TypeSerializer<T1> serializer1, TypeSerializer<T2> serializer2) {
this.reader = reader;
this.delegate1 = new DeserializationDelegate<T1>(serializer1);
this.delegate2 = new DeserializationDelegate<T2>(serializer2);
this.delegate1 = new ReusingDeserializationDelegate<T1>(serializer1);
this.delegate2 = new ReusingDeserializationDelegate<T2>(serializer2);
}

public int next(T1 target1, T2 target2) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException {
}
return reuse;
}

@Override
public StreamRecord<IN> next() throws IOException {
if (listIterator.hasNext()) {
StreamRecord<IN> result = new StreamRecord<IN>();
result.setObject(listIterator.next());
return result;
} else {
return null;
}
}
}

public List<OUT> getOutputs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@

/**
* A simple iterator interface. The key differences to the {@link java.util.Iterator} are that this
* iterator accepts an object into which it can place the content if the object is mutable, and that
* it consolidates the logic in a single <code>next()</code> function, rather than in two different
* functions such as <code>hasNext()</code> and <code>next()</code>.
* iterator also as a <code>next()</code> method that </code>accepts an object into which it can
* place the content if the object is mutable, and that it consolidates the logic in a single
* <code>next()</code> function, rather than in two different functions such as
* <code>hasNext()</code> and <code>next()</code>.
*
* @param <E> The element type of the collection iterated over.
*/
public interface MutableObjectIterator<E> {

/**
* Gets the next element from the collection. The contents of that next element is put into the given target object.
* Gets the next element from the collection. The contents of that next element is put into the
* given target object.
*
* @param reuse The target object into which to place next element if E is mutable.
* @return The filled object or <code>null</code> if the iterator is exhausted
Expand All @@ -39,4 +41,14 @@ public interface MutableObjectIterator<E> {
* serialization / deserialization logic
*/
public E next(E reuse) throws IOException;

/**
* Gets the next element from the collection. The reader must create a new instance itself.
*
* @return The object or <code>null</code> if the iterator is exhausted
*
* @throws IOException Thrown, if a problem occurred in the underlying I/O layer or in the
* serialization / deserialization logic
*/
public E next() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -96,10 +95,11 @@ public void materializeVariable(MutableReader<?> reader, TypeSerializerFactory<?

try {
@SuppressWarnings("unchecked")
final MutableReader<DeserializationDelegate<T>> typedReader = (MutableReader<DeserializationDelegate<T>>) reader;
final MutableReader typedReader = (MutableReader) reader;
@SuppressWarnings("unchecked")
final TypeSerializer<T> serializer = ((TypeSerializerFactory<T>) serializerFactory).getSerializer();


@SuppressWarnings("unchecked")
final ReaderIterator<T> readerIterator = new ReaderIterator<T>(typedReader, serializer);

if (materializer) {
Expand All @@ -111,7 +111,7 @@ public void materializeVariable(MutableReader<?> reader, TypeSerializerFactory<?
ArrayList<T> data = new ArrayList<T>();

T element;
while ((element = readerIterator.next(serializer.createInstance())) != null) {
while ((element = readerIterator.next()) != null) {
data.add(element);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,18 @@ public E next(E reuse) throws IOException
return null;
}
}

@Override
public E next() throws IOException
{
try {
return this.accessors.deserialize(this.inView);
} catch (EOFException eofex) {
final List<MemorySegment> freeMem = this.inView.close();
if (this.freeMemTarget != null) {
this.freeMemTarget.addAll(freeMem);
}
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,13 @@ public E next(E reuse) throws IOException {
return null;
}
}

@Override
public E next() throws IOException {
try {
return this.serializer.deserialize(this.inputView);
} catch (EOFException e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ public BT next(BT reuse) throws IOException {
return reuse;
}

@Override
public BT next() throws IOException {
if (currentPartition == null) {
if (!partitions.hasNext()) {
return null;
}
currentPartition = partitions.next();
currentPartition.setReadPosition(0);
}

try {
return serializer.deserialize(currentPartition);
} catch (EOFException e) {
return advanceAndRead();
}

}

/* jump to the next partition and continue reading from that */
private BT advanceAndRead(BT reuse) throws IOException {
if (!partitions.hasNext()) {
Expand All @@ -81,4 +99,19 @@ private BT advanceAndRead(BT reuse) throws IOException {
return reuse;
}

/* jump to the next partition and continue reading from that */
private BT advanceAndRead() throws IOException {
if (!partitions.hasNext()) {
return null;
}
currentPartition = partitions.next();
currentPartition.setReadPosition(0);

try {
return serializer.deserialize(currentPartition);
} catch (EOFException e) {
return advanceAndRead();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.flink.runtime.operators;

import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
Expand All @@ -38,7 +39,6 @@
import org.apache.flink.runtime.operators.util.CloseableInputProvider;
import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.util.MutableObjectIterator;

/**
Expand Down Expand Up @@ -334,9 +334,8 @@ private void initInputReaders() throws Exception {

this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());

MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
@SuppressWarnings({ "rawtypes" })
final MutableObjectIterator<?> iter = new ReaderIterator(reader, this.inputTypeSerializerFactory.getSerializer());
final MutableObjectIterator<?> iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer());
this.reader = (MutableObjectIterator<IT>)iter;

// final sanity check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,27 @@ public T next(T reuse) throws IOException {
}
}

@Override
public T next() throws IOException {
// This is just a copy of the above, I wanted to keep the two separate,
// in case we change something later. Plus, it keeps the diff clean... :D
if(done || this.table.closed.get()) {
return null;
} else if(!cache.isEmpty()) {
return cache.remove(cache.size()-1);
} else {
while(!done && cache.isEmpty()) {
done = !fillCache();
}
if(!done) {
return cache.remove(cache.size()-1);
} else {
return null;
}
}
}


/**
* utility function that inserts all entries from a bucket and its overflow buckets into the cache
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@

/**
*
* @param <BT> The type of the build side records.
* @param <PT> The type of the probe side records.
* @tparam BT The type of the build side records.
* @tparam PT The type of the probe side records.
*/
public class HashPartition<BT, PT> extends AbstractPagedInputView implements SeekableDataInputView
{
Expand Down Expand Up @@ -620,7 +620,24 @@ public final BT next(BT reuse) throws IOException
return null;
}
}


public final BT next() throws IOException
{
final int pos = getCurrentPositionInSegment();
final int buffer = HashPartition.this.currentBufferNum;

this.currentPointer = (((long) buffer) << HashPartition.this.segmentSizeBits) + pos;

try {
BT result = HashPartition.this.buildSideSerializer.deserialize(HashPartition.this);
this.currentHashCode = this.comparator.hash(result);
return result;
} catch (EOFException eofex) {
return null;
}
}


protected final long getPointer()
{
return this.currentPointer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,56 @@ public BT next(BT reuse) {
this.numInSegment = 0;
}
}

public BT next() {
// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
while (true) {

while (this.numInSegment < this.countInSegment) {

final int thisCode = this.bucket.getInt(this.posInSegment);
this.posInSegment += HASH_CODE_LEN;

// check if the hash code matches
if (thisCode == this.searchHashCode) {
// get the pointer to the pair
final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
this.numInSegment++;

// deserialize the key to check whether it is really equal, or whether we had only a hash collision
try {
this.partition.setReadPosition(pointer);
BT result = this.accessor.deserialize(this.partition);
if (this.comparator.equalToReference(result)) {
this.lastPointer = pointer;
return result;
}
}
catch (IOException ioex) {
throw new RuntimeException("Error deserializing key or value from the hashtable: " +
ioex.getMessage(), ioex);
}
}
else {
this.numInSegment++;
}
}

// this segment is done. check if there is another chained bucket
final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
return null;
}

final int overflowSegNum = (int) (forwardPointer >>> 32);
this.bucket = this.overflowSegments[overflowSegNum];
this.bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
this.numInSegment = 0;
}
}

public void writeBack(BT value) throws IOException {
final SeekableDataOutputView outView = this.partition.getWriteView();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,14 @@ protected T getNextRecord(T reuse) throws IOException {
return null;
}
}

protected T getNextRecord() throws IOException {
if (this.numRecordsReturned < this.numRecordsInBuffer) {
this.numRecordsReturned++;
return this.serializer.deserialize(this.readView);
} else {
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,41 @@ public T next(T target) throws IOException {
}
}
}


@Override
public T next() throws IOException {
// check for the left over element
if (this.readPhase) {
return getNextRecord();
} else {
// writing phase. check for leftover first
T result = null;
if (this.leftOverReturned) {
// get next record
if ((result = this.input.next()) != null) {
if (writeNextRecord(result)) {
return result;
} else {
// did not fit into memory, keep as leftover
this.leftOverRecord = this.serializer.copy(result);
this.leftOverReturned = false;
this.fullWriteBuffer = true;
return null;
}
} else {
this.noMoreBlocks = true;
return null;
}
} else if (this.fullWriteBuffer) {
return null;
} else {
this.leftOverReturned = true;
return this.leftOverRecord;
}
}
}



public void reset() {
// a reset always goes to the read phase
Expand Down
Loading

0 comments on commit 3832d7b

Please sign in to comment.