Skip to content

Commit

Permalink
IO: byte channel reader and writer coroutines support
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Mashkov committed Aug 24, 2017
1 parent 9d5152b commit 25b2bc5
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kotlinx.coroutines.experimental.io

import kotlinx.coroutines.experimental.*
import kotlin.coroutines.experimental.*

internal open class ByteChannelCoroutine(
parentContext: CoroutineContext,
open val channel: ByteChannel
) : AbstractCoroutine<Unit>(parentContext, active = true) {
override fun afterCompletion(state: Any?, mode: Int) {
val cause = (state as? JobSupport.CompletedExceptionally)?.cause
if (!channel.close(cause) && cause != null)
handleCoroutineException(context, cause)

super.afterCompletion(state, mode)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package kotlinx.coroutines.experimental.io

import kotlinx.coroutines.experimental.*
import kotlin.coroutines.experimental.*

/**
* A coroutine job that is reading from a byte channel
*/
interface ReaderJob : Job {
/**
* A reference to the channel that this coroutine is reading from
*/
val channel: ByteWriteChannel
}

interface ReaderScope : CoroutineScope {
val channel: ByteReadChannel
}

fun reader(coroutineContext: CoroutineContext,
channel: ByteChannel,
block: suspend ReaderScope.() -> Unit): ReaderJob {
val coroutine = ReaderCoroutine(newCoroutineContext(coroutineContext), channel)
coroutine.initParentJob(coroutineContext[Job])
block.startCoroutine(coroutine, coroutine)
return coroutine
}

fun reader(coroutineContext: CoroutineContext,
autoFlush: Boolean = false,
block: suspend ReaderScope.() -> Unit): ReaderJob = reader(coroutineContext, ByteChannel(autoFlush), block)

private class ReaderCoroutine(context: CoroutineContext, channel: ByteChannel)
: ByteChannelCoroutine(context, channel), ReaderJob, ReaderScope
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package kotlinx.coroutines.experimental.io

import kotlinx.coroutines.experimental.*
import kotlin.coroutines.experimental.*

/**
* A coroutine job that is writing to a byte channel
*/
interface WriterJob : Job {
/**
* A reference to the channel that this coroutine is writing to
*/
val channel: ByteReadChannel
}

interface WriterScope : CoroutineScope {
val channel: ByteWriteChannel
}

fun writer(coroutineContext: CoroutineContext,
channel: ByteChannel,
block: suspend CoroutineScope.() -> Unit): WriterJob {
val coroutine = WriterCoroutine(newCoroutineContext(coroutineContext), channel)
coroutine.initParentJob(coroutineContext[Job])
block.startCoroutine(coroutine, coroutine)
return coroutine
}

fun writer(coroutineContext: CoroutineContext,
autoFlush: Boolean = false,
block: suspend CoroutineScope.() -> Unit): WriterJob = writer(coroutineContext, ByteChannel(autoFlush), block)

private class WriterCoroutine(ctx: CoroutineContext, channel: ByteChannel)
: ByteChannelCoroutine(ctx, channel), WriterScope, WriterJob

0 comments on commit 25b2bc5

Please sign in to comment.