Skip to content

Commit

Permalink
Introduced ReceiveChannel.cancel method;
Browse files Browse the repository at this point in the history
all operators on ReceiveChannel fully consume the original channel
using a helper consume extension, which is reflected in docs;
removed `suspend` modifier from intermediate channel operators;
consistently renamed channel type param to <E>;
added two versions for xxxTo fun -- with MutableList & SendChannel;
added tests for all channel operators;
dropped/deprecated ActorJob/ProducerJob, fixes Kotlin#127
  • Loading branch information
elizarov committed Nov 20, 2017
1 parent 66d18c0 commit b555d91
Show file tree
Hide file tree
Showing 26 changed files with 1,496 additions and 789 deletions.
4 changes: 2 additions & 2 deletions core/kotlinx-coroutines-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ Coroutine builder functions:
| ------------- | ------------- | ---------------- | ---------------
| [launch] | [Job] | [CoroutineScope] | Launches coroutine that does not have any result
| [async] | [Deferred] | [CoroutineScope] | Returns a single value with the future result
| [produce][kotlinx.coroutines.experimental.channels.produce] | [ProducerJob][kotlinx.coroutines.experimental.channels.ProducerJob] | [ProducerScope][kotlinx.coroutines.experimental.channels.ProducerScope] | Produces a stream of elements
| [actor][kotlinx.coroutines.experimental.channels.actor] | [ActorJob][kotlinx.coroutines.experimental.channels.ActorJob] | [ActorScope][kotlinx.coroutines.experimental.channels.ActorScope] | Processes a stream of messages
| [produce][kotlinx.coroutines.experimental.channels.produce] | [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [ProducerScope][kotlinx.coroutines.experimental.channels.ProducerScope] | Produces a stream of elements
| [actor][kotlinx.coroutines.experimental.channels.actor] | [SendChannel][kotlinx.coroutines.experimental.channels.SendChannel] | [ActorScope][kotlinx.coroutines.experimental.channels.ActorScope] | Processes a stream of messages
| [runBlocking] | `T` | [CoroutineScope] | Blocks the thread while the coroutine runs

Coroutine dispatchers implementing [CoroutineDispatcher]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause

protected open val hasCancellingState: Boolean get() = false

public final override fun cancel(cause: Throwable?): Boolean =
public override fun cancel(cause: Throwable?): Boolean =
if (hasCancellingState)
makeCancelling(cause) else
makeCancelled(cause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
remove()
}

override fun resumeSendClosed(closed: Closed<*>) {
if (select.trySelect(null))
select.resumeSelectCancellableWithException(closed.sendException)
}

override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
}

Expand All @@ -407,6 +412,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
override val pollResult: Any? get() = element
override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
override fun resumeSendClosed(closed: Closed<*>) {}
}
}

Expand Down Expand Up @@ -567,6 +573,24 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
return if (result === POLL_FAILED) null else receiveOrNullResult(result)
}

override fun cancel(cause: Throwable?): Boolean =
close(cause).also {
cleanupSendQueueOnCancel()
}

// Note: this function is invoked when channel is already closed
protected open fun cleanupSendQueueOnCancel() {
val closed = closedForSend ?: error("Cannot happen")
while (true) {
val send = takeFirstSendOrPeekClosed() ?: error("Cannot happen")
if (send is Closed<*>) {
check(send === closed)
return // cleaned
}
send.resumeSendClosed(closed)
}
}

public final override fun iterator(): ChannelIterator<E> = Itr(this)

// ------ registerSelectReceive ------
Expand Down Expand Up @@ -909,6 +933,7 @@ public interface Send {
val pollResult: Any? // E | Closed
fun tryResumeSend(idempotent: Any?): Any?
fun completeResumeSend(token: Any)
fun resumeSendClosed(closed: Closed<*>)
}

/**
Expand All @@ -922,7 +947,7 @@ public interface ReceiveOrClosed<in E> {
}

/**
* Represents closed channel.
* Represents sender for a specific element.
* @suppress **This is unstable API and it is subject to change.**
*/
@Suppress("UNCHECKED_CAST")
Expand All @@ -932,6 +957,7 @@ public class SendElement(
) : LockFreeLinkedListNode(), Send {
override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
override fun completeResumeSend(token: Any) = cont.completeResume(token)
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
override fun toString(): String = "SendElement($pollResult)[$cont]"
}

Expand All @@ -951,6 +977,7 @@ public class Closed<in E>(
override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
override fun tryResumeReceive(value: E, idempotent: Any?): Any? = CLOSE_RESUMED
override fun completeResumeReceive(token: Any) { check(token === CLOSE_RESUMED) }
override fun resumeSendClosed(closed: Closed<*>) = error("Should be never invoked")
override fun toString(): String = "Closed[$closeCause]"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,11 @@ public interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {
}

/**
* Return type for [actor] coroutine builder.
* @suppress **Deprecated**: Use `SendChannel`.
*/
public interface ActorJob<in E> : Job, SendChannel<E> {
/**
* A reference to the mailbox channel that this coroutine is receiving messages from.
* All the [SendChannel] functions on this interface delegate to
* the channel instance returned by this function.
*/
@Deprecated(message = "Use `SendChannel`", replaceWith = ReplaceWith("SendChannel"))
interface ActorJob<in E> : SendChannel<E> {
@Deprecated(message = "Use SendChannel itself")
val channel: SendChannel<E>
}

Expand Down Expand Up @@ -87,7 +84,7 @@ public fun <E> actor(
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend ActorScope<E>.() -> Unit
): ActorJob<E> {
): SendChannel<E> {
val newContext = newCoroutineContext(context)
val channel = Channel<E>(capacity)
val coroutine = if (start.isLazy)
Expand All @@ -109,8 +106,6 @@ private class LazyActorCoroutine<E>(
channel: Channel<E>,
private val block: suspend ActorScope<E>.() -> Unit
) : ActorCoroutine<E>(parentContext, channel, active = false), SelectClause2<E, SendChannel<E>> {
override val channel: Channel<E> get() = this

override fun onStart() {
block.startCoroutineCancellable(this, this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ class ArrayBroadcastChannel<E>(
override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
override val isBufferFull: Boolean get() = error("Should not be used")

override fun close() {
if (close(cause = null))
broadcastChannel.updateHead(removeSub = this)
}
override fun cancel(cause: Throwable?): Boolean =
close(cause).also { closed ->
if (closed) broadcastChannel.updateHead(removeSub = this)
}

// returns true if subHead was updated and broadcast channel's head must be checked
// this method is lock-free (it never waits on lock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,18 @@ public open class ArrayChannel<E>(
send!!.completeResumeSend(token!!)
return result
}

// Note: this function is invoked when channel is already closed
override fun cleanupSendQueueOnCancel() {
// clear buffer first
lock.withLock {
repeat(size) {
buffer[head] = 0
head = (head + 1) % capacity
}
size = 0
}
// then clean all queued senders
super.cleanupSendQueueOnCancel()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@ public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
/**
* Return type for [BroadcastChannel.openSubscription] that can be used to [receive] elements from the
* open subscription and to [close] it to unsubscribe.
*
* Note, that invocation of [cancel] also closes subscription.
*/
public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
/**
* Closes this subscription.
* Closes this subscription. This is a synonym for [cancel].
*/
public override fun close()
public override fun close() { cancel() }
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ public interface SendChannel<in E> {
/**
* Closes this channel with an optional exceptional [cause].
* This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
* Conceptually, its sends a special "close token" over this channel. Immediately after invocation of this function
* Conceptually, its sends a special "close token" over this channel.
*
* Immediately after invocation of this function
* [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
* on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
* are received.
Expand Down Expand Up @@ -192,6 +194,21 @@ public interface ReceiveChannel<out E> {
* throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public operator fun iterator(): ChannelIterator<E>

/**
* Cancels reception of remaining elements from this channel. This function closes the channel with
* the specified cause (unless it was already closed) and removes all buffered sent elements from it.
* This function returns `true` if the channel was not closed previously, or `false` otherwise.
*
* Immediately after invocation of this function [isClosedForReceive] and
* [isClosedForSend][SendChannel.isClosedForSend]
* on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
* afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
* [ClosedReceiveChannelException] if it was cancelled without a cause.
* A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
* receive on a failed channel throw the specified [cause] exception.
*/
public fun cancel(cause: Throwable? = null): Boolean
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@
package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.AbstractCoroutine
import kotlinx.coroutines.experimental.JobSupport
import kotlinx.coroutines.experimental.handleCoroutineException
import kotlin.coroutines.experimental.CoroutineContext

internal open class ChannelCoroutine<E>(
parentContext: CoroutineContext,
open val channel: Channel<E>,
channel: Channel<E>,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by channel {
override fun afterCompletion(state: Any?, mode: Int) {
val cause = (state as? JobSupport.CompletedExceptionally)?.cause
if (!channel.close(cause) && cause != null)
val channel: Channel<E>
get() = this

override fun onCancellation(exceptionally: CompletedExceptionally?) {
val cause = exceptionally?.cause
if (!close(cause) && cause != null)
handleCoroutineException(context, cause)
}

override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
}
Loading

0 comments on commit b555d91

Please sign in to comment.