Skip to content

Commit

Permalink
Merge branch 'version02_wo_dm' into version02
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Nov 14, 2011
2 parents 7b97ad4 + 64e2e9b commit abf0f98
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ final class OutputChannelContext implements ByteBufferedOutputChannelBroker, Cha
private int sequenceNumber = 0;

/**
* Stores if the spilling queue has already been registered with the network connection.
* Stores if the flushing the of spilling queue has already been triggered.
*/
private boolean spillingQueueRegisteredWithNetworkConnection = false;
private boolean spillingQueueAlreadyFlushed = false;

OutputChannelContext(final OutputGateContext outputGateContext,
final AbstractByteBufferedOutputChannel<?> byteBufferedOutputChannel, final boolean isReceiverRunning,
Expand Down Expand Up @@ -165,21 +165,12 @@ public void releaseWriteBuffers() throws IOException, InterruptedException {
if (!this.isReceiverRunning) {
this.queuedOutgoingEnvelopes.add(this.outgoingTransferEnvelope);
} else {
if (!this.queuedOutgoingEnvelopes.isEmpty()) {
this.queuedOutgoingEnvelopes.add(this.outgoingTransferEnvelope);
if (!this.spillingQueueRegisteredWithNetworkConnection) {
if (this.outputGateContext.registerSpillingQueueWithNetworkConnection(
this.byteBufferedOutputChannel.getID(), this.queuedOutgoingEnvelopes)) {
this.spillingQueueRegisteredWithNetworkConnection = true;
} else {
// Direct connection, spill the queue
while (!this.queuedOutgoingEnvelopes.isEmpty()) {
this.outputGateContext.processEnvelope(this, this.queuedOutgoingEnvelopes.poll());
}
}
}
} else {

if (this.queuedOutgoingEnvelopes.isEmpty()) {
this.outputGateContext.processEnvelope(this, this.outgoingTransferEnvelope);
} else {
this.queuedOutgoingEnvelopes.add(this.outgoingTransferEnvelope);
flushQueuedOutgoingEnvelopes();
}
}

Expand All @@ -199,29 +190,17 @@ public void transferEventToInputChannel(final AbstractEvent event) throws IOExce
final TransferEnvelope ephemeralTransferEnvelope = createNewOutgoingTransferEnvelope();
ephemeralTransferEnvelope.addEvent(event);

if (!this.isReceiverRunning) {
// TODO: Add to checkpoint

if (!this.isReceiverRunning) {
this.queuedOutgoingEnvelopes.add(ephemeralTransferEnvelope);

} else {

if (!this.queuedOutgoingEnvelopes.isEmpty()) {
this.queuedOutgoingEnvelopes.add(ephemeralTransferEnvelope);
if (!this.spillingQueueRegisteredWithNetworkConnection) {
if (this.outputGateContext.registerSpillingQueueWithNetworkConnection(
this.byteBufferedOutputChannel.getID(), this.queuedOutgoingEnvelopes)) {
this.spillingQueueRegisteredWithNetworkConnection = true;
} else {
// Direct connection, spill the queue but make sure we do not copy data back to main memory
this.queuedOutgoingEnvelopes.disableAsynchronousUnspilling();

while (!this.queuedOutgoingEnvelopes.isEmpty()) {
this.outputGateContext.processEnvelope(this, this.queuedOutgoingEnvelopes.poll());
}
}
}
} else {
if (this.queuedOutgoingEnvelopes.isEmpty()) {
this.outputGateContext.processEnvelope(this, ephemeralTransferEnvelope);
} else {
this.queuedOutgoingEnvelopes.add(ephemeralTransferEnvelope);
flushQueuedOutgoingEnvelopes();
}
}
}
Expand Down Expand Up @@ -331,24 +310,20 @@ public void queueTransferEnvelope(TransferEnvelope transferEnvelope) {
}
}

/**
* {@inheritDoc}
*/
@Override
public boolean hasDataLeftToTransmit() throws IOException, InterruptedException {
void flushQueuedOutgoingEnvelopes() throws IOException, InterruptedException {

if (!this.isReceiverRunning) {
return true;
if (this.spillingQueueAlreadyFlushed) {
return;
}

if (!this.queuedOutgoingEnvelopes.isEmpty()) {

if (this.outputGateContext.registerSpillingQueueWithNetworkConnection(
this.byteBufferedOutputChannel.getID(), this.queuedOutgoingEnvelopes)) {

return true;
// TODO: Make this mechanisms smarter
this.queuedOutgoingEnvelopes.spillSynchronouslyIncludingHead();
this.queuedOutgoingEnvelopes.printSpillingState();

} else {
if (!this.outputGateContext.registerSpillingQueueWithNetworkConnection(
this.byteBufferedOutputChannel.getID(), this.queuedOutgoingEnvelopes)) {

// Direct connection, spill the queue but make sure we do not copy data back to main memory
this.queuedOutgoingEnvelopes.disableAsynchronousUnspilling();
Expand All @@ -359,7 +334,22 @@ public boolean hasDataLeftToTransmit() throws IOException, InterruptedException
}
}

return false;
this.spillingQueueAlreadyFlushed = true;
}

/**
* {@inheritDoc}
*/
@Override
public boolean hasDataLeftToTransmit() throws IOException, InterruptedException {

if (!this.isReceiverRunning) {
return true;
}

flushQueuedOutgoingEnvelopes();

return (!this.queuedOutgoingEnvelopes.isEmpty());
}

long getAmountOfMainMemoryInQueue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ private void checkForActiveOutputChannels() throws IOException, InterruptedExcep
while (it.hasNext()) {
final OutputChannelContext channelContext = it.next();
if (channelContext.isChannelActive()) {
channelContext.hasDataLeftToTransmit();
channelContext.flushQueuedOutgoingEnvelopes();
it.remove();
} else {
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,47 @@ public synchronized long spillSynchronouslyIncludingHead() throws IOException {
return spill(true);
}

/**
* Prints out the current spilling state of this queue, i.e. how many buffers that are encapsulated inside the
* queued transfer envelopes reside in main memory and how many reside on hard disk.
*/
public synchronized void printSpillingState() {

final StringBuilder str = new StringBuilder();
str.append("Memory footprint of ");
str.append(this);
str.append(":\n");
str.append(size());
str.append(" elements in queue\n");

SpillingQueueElement elem = this.head;
while (elem != null) {

final Iterator<TransferEnvelope> it = elem.iterator();
while (it.hasNext()) {
while (it.hasNext()) {
final TransferEnvelope te = it.next();
final Buffer buffer = te.getBuffer();
if (buffer == null) {
str.append('X');
} else {
if (buffer.isBackedByMemory()) {
str.append('M');
} else {
str.append('F');
}
}
}
}

elem = elem.getNextElement();
}

str.append('\n');

System.out.println(str.toString());
}

public long getAmountOfMainMemoryInQueue() {

return this.sizeOfMemoryBuffers.get();
Expand Down

0 comments on commit abf0f98

Please sign in to comment.