Skip to content

Commit

Permalink
IO: optimize byte channel writePacket suspend tail-calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Mashkov committed Aug 25, 2017
1 parent 974cbb3 commit 5606133
Showing 1 changed file with 115 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -813,42 +813,124 @@ internal class ByteBufferChannel(

when (packet) {
is ByteReadPacketEmpty -> return
is ByteReadPacketSingle -> {
val buffer = packet.steal()
try {
writeFully(buffer)
} finally {
packet.pool.recycle(buffer)
}
}
is ByteReadPacketImpl -> {
try {
while (packet.remaining > 0) {
val buffer = packet.steal()
try {
writeFully(buffer)
} finally {
packet.pool.recycle(buffer)
}
}
} finally {
packet.release()
}
is ByteReadPacketSingle -> writeSingleBufferPacket(packet)
is ByteReadPacketImpl -> writeMultipleBufferPacket(packet)
else -> writeExternalPacket(packet)
}
}

private suspend fun writeSingleBufferPacket(packet: ByteReadPacketSingle) {
val buffer = packet.steal()
val t = try {
writeAsMuchAsPossible(buffer)
null
} catch (t: Throwable) {
t
}

if (t != null) {
packet.pool.recycle(buffer)
throw t
}

if (buffer.hasRemaining()) {
return writeSingleBufferPacketSuspend(buffer, packet)
}

packet.pool.recycle(buffer)
}

private suspend fun writeMultipleBufferPacket(packet: ByteReadPacketImpl) {
var buffer: ByteBuffer? = null

val t = try {
while (packet.remaining > 0) {
buffer = packet.steal()
writeAsMuchAsPossible(buffer)
if (buffer.hasRemaining()) break
packet.pool.recycle(buffer)
}
else -> {
val buffer = BufferPool.borrow()
try {
while (packet.remaining > 0) {
buffer.clear()
packet.readLazy(buffer)
buffer.flip()
writeFully(buffer)
}
} finally {
BufferPool.recycle(buffer)
packet.release()
null
} catch (t: Throwable) { t }

if (t != null) {
buffer?.let { packet.pool.recycle(it) }
packet.release()
throw t
}

if (buffer != null) {
return writeMultipleBufferPacketSuspend(buffer, packet)
}

packet.release()
}

private suspend fun writeSingleBufferPacketSuspend(buffer: ByteBuffer, packet: ByteReadPacketSingle) {
try {
writeFully(buffer)
} finally {
packet.pool.recycle(buffer)
}
}

private suspend fun writeMultipleBufferPacketSuspend(rem: ByteBuffer, packet: ByteReadPacketImpl) {
var buffer = rem

try {
do {
writeFully(buffer)
if (packet.remaining == 0) break
packet.pool.recycle(buffer)
buffer = packet.steal()
} while (true)
} finally {
packet.pool.recycle(buffer)
packet.release()
}
}

private suspend fun writeExternalPacket(packet: ByteReadPacket) {
val buffer = BufferPool.borrow()
val t = try {
while (packet.remaining > 0) {
buffer.clear()
packet.readLazy(buffer)
buffer.flip()
writeAsMuchAsPossible(buffer)
if (buffer.hasRemaining()) {
buffer.compact()
break
}
}

null
} catch (t: Throwable) {
t
}

buffer.flip()
if (buffer.hasRemaining()) {
return writeExternalPacketSuspend(buffer, packet)
}

BufferPool.recycle(buffer)
packet.release()

if (t != null) throw t
}

private suspend fun writeExternalPacketSuspend(buffer: ByteBuffer, packet: ByteReadPacket) {
try {
do {
buffer.compact()
packet.readLazy(buffer)
buffer.flip()
writeFully(buffer)
} while (packet.remaining > 0)
} finally {
BufferPool.recycle(buffer)
packet.release()
}
}

Expand Down

0 comments on commit 5606133

Please sign in to comment.