Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel operators #88

Closed
wants to merge 1 commit into from
Closed

Channel operators #88

wants to merge 1 commit into from

Conversation

fvasco
Copy link
Contributor

@fvasco fvasco commented Jul 24, 2017

No description provided.

@fvasco
Copy link
Contributor Author

fvasco commented Jul 25, 2017

This extension function is useful for batch elaboration of live event stream.

I.e.
Grab a block of live event and put it in a database using a single batch insert.

I don't consider adeguate a same extension for the BroadcastChannel.

@elizarov
Copy link
Contributor

I'm sorry I'm dragging feet on this PR. It is also related to #69 and is a part of a large piece of work to provide the same set of operations for ReceiveChannel that are currently available for Sequence in Kotlin standard library. I really want to do this all in one big chunk as a part of one of the upcoming release.

@fvasco
Copy link
Contributor Author

fvasco commented Aug 1, 2017

Merged (with my own).

I hope this helps.

@fvasco
Copy link
Contributor Author

fvasco commented Aug 3, 2017

I am considering to port Sequence extension methods to ReceiveChannel: an typical case of cut and past programming.

So consumeEach should be renamed to forEach for homogenity, the collect* methods will become to*.

Please keep this pull request in hold and send me your feedbacks.

@elizarov
Copy link
Contributor

elizarov commented Aug 3, 2017

Ok. Here is a bit of additional information for your thought. The reason that ReceiveChannel has consumeEach (as opposed to forEach) is that we wanted it to be the same name both for reactive streams and for ReceiveChannel which may or may not have been the best wish.

Anyway, we could not name it forEach, because reactive streams already have forEach members with a different meaning (you cannot suspend inside of them), hence the extension for reactive streams was named consumeEach and that name name was used for ReceiveChannel too. It is Ok to reconsider the latter name, though. I think that symmetry between ReceiveChannel and Sequence is more important. I'm even open to breaking the symmetry between ReceiveChannel and SendChannel names to rename ReceiveChannel to something more data-flow-stream-like (especially if we find a better name for CompletableDeferred).

@fvasco fvasco changed the title ReceiveChannel<E>.drainTo Channel operators Aug 4, 2017
fvasco added a commit to fvasco/kotlinx.coroutines that referenced this pull request Aug 4, 2017
@fvasco
Copy link
Contributor Author

fvasco commented Aug 4, 2017

Hi @elizarov,
I wrote a simple draft.

For many operations I choose a default coroutine context, often Unconfined.
Because many operations are inline, I consider a valid choice having all coroutine just here, in the same thread invocation stack, so the developer may switch context using run(context) { ... } where required.

fvasco added a commit to fvasco/kotlinx.coroutines that referenced this pull request Aug 4, 2017
fvasco added a commit to fvasco/kotlinx.coroutines that referenced this pull request Aug 5, 2017
@elizarov
Copy link
Contributor

elizarov commented Aug 17, 2017

I've looked at the code and here are some thoughts in no particular order:

MutableCollection.asSendChannel seems to be misnamed for an operation that is going to mutate its receiver. Yes, it is somewhat symmetric to Iterable.asReceiveChannel, but the thing it does is way more dangerous. I cannot figure out a better name and I suggest that we don't include it into initial release at all (I've removed it for now).

I've renamed ReceiveChannel.asSequence to asBlockingSequence, because blocking has to be always explicit. I

Sequence.asReceiveChannel was missing and I've added it (Sequence is not Iterable).

I'd suggest to rename ReceiveChannel.drainTo into something like pipeTo. I'm not sure we really need all those additional parameters on min/max element. Can we just do without them? (did not change anything yet, waiting for clarification).

I've changed context of drop, take, and withIndex to Unconfined just like the rest of them. I've also added an explicit optional context parameter (with default of Unconfined) for them and for all other intermediate functions (which use produce(...) in their implementation).

I've rebased this pull request onto the current develop branch which is now using Kotlin 1.1.4 and I've made inline all the stuff that should be inline (which is supported in Kotlin 1.1.4).

However, we don't yet support crossinline suspend lambdas KT-19159, so I've marked all the functions that should use this feature with todos.

I've converted all return xxx functions to expressions as a matter of style.

I've pushed the result of those changes into a branch named pr-88. Please, pull from there

@elizarov
Copy link
Contributor

elizarov commented Aug 17, 2017

However, this still leaves us with a number of questions:

Most of the functions consume the incoming ReceiveChannel completely, however some of them do not. Namely: drainTo (with limits), elementAt, elementAtOrElse, elementAtOrNull, find, first, firstOrNull, indexOf. indexOfFirst, indexOfLast, single, singleOrNull, take, none, any.

This poses a serious problem. Consider the producing coroutine (think of something done with produce build or something you are receiving over socket). You can only pipeline operations on the resulting ReceiveChannel, because if don't consume it completely, then producer is going to be suspended forever.

I think we should limit operations on ReceiveChannel to pipelining operations only and have some other (Rx-like?) abstraction with a full set of sequence-like operators.

See, conceptually, the ReceiveChannel is not like Sequence. ReceiveChannel is like Iterator. If we are looking for a Sequence analogy, then the best thing we have that is like a Sequence is BroadcastChannel (but even it is not a perfect match).

There is also an inefficiency that stems from the fact that ReceiveChannel is a multi-consumer abstraction, while all the functions we have, use it only for a single-consumer case. First of all, we might want to somehow reflect it in the type, so that a more efficient single-consumer channel implementation could be used. The second observation is that ReceiveChannel, unlike SubscriptionReceiveChannel, does not have a close operation to indicate that we don't need it anymore. If it had one, then the operations like first could have closed it to indicate that they are done. See how it gets us back to Rx-like abstractions.

Moreover, if you take a look at all intermediate operations that use produce(...), then the actual result of produce is ProducerJob which is closable, but we don't expose its closeability via the result type (which we've declared as ReceiveChannel). If we did expose the close operation on the result, then the downstream code could have cancelled the producer when it does not need any more elements from it.

@elizarov
Copy link
Contributor

What I'd suggest to do is, for now, leave only operations that completely consume their source (filter, map, toList, etc are all Ok) and release it, while continue thinking about the problems I've outlined and how we might introduce better abstractions to resolve those problems in the future releases.

@elizarov
Copy link
Contributor

Here is another solution that I actually like more. I'll make all functions on ReceiveChannel to consume it completely. That will serve as a good interim solution, so that we can start playing with channels and higher-level operators, while deferring the worry about other problems I've outlined for later.

Just to recap, the key question that we are deferring here is what to do with concurrency. Shall channels be concurrent (multi-consumer) or single-consumer by default? Shall this distinction be reflected in the type (e.g. make MultileConsumerReceiveChannel a subclass or SingleConsumerReceiveChannel) or it should be left out as an implementation detail? Shall we also introduce better Rx-like abstractions and whether BroadcastChannel is the best candidate for Rx-like abstraction? Shall we got full Rx functional-style (e.g. setup a pipeline first, then execute it on a terminal operation) or an imperative approach would let us get all the goodies just as well?

@elizarov
Copy link
Contributor

So, here's the summary of what I've done:

I've introduced ReceiveChannel.consumeAll function and implemented it for all channel types.

I've dropped drainTo with limits and renamed it to consumeTo. Also, renamed sendTo to consumeTo, too. So, now you can consumeTo a collection or to a send channel. I have not used pipeTo name, leaving it reserved (maybe) for a function that is going to start a new coroutine that is performing piping between two channels.

I've dropped asBlockingSequence (originally asSequence) as this is the only function that I don't feel safe about with "consume everything" semantics. There is a conceptual mismatch between a ReceiveChannel (which is "consume once" entity) and the resulting Sequence (which can be consumed multiple times), which makes this transformation problematic.

I've pushed the results to pr-88 branch. Please, review.

@elizarov
Copy link
Contributor

Here is another open question. So, we have a bunch of ReceiveChannel operators, all of which fully consume the original channel. This is a different behavior than the corresponding operators on a Sequence that do not have this consuming semantics, as a Sequence can be used as many times as needed (unlike a ReceiveChannel which can be consumed only once).

It somewhat devalues the desire to have the same names for all those operators as they are in defined for a Sequence. We already have one naming deviation of Sequence.forEach vs ReceiveChannel.consumeEach (that later highlight the fact that you can use it only once) and maybe we should consider giving different names to all (or some other) terminal operations, because unlike the terminal operations on a sequence, they are truly terminal, as they consume their source in an irrevocable way.

So, let's get back to discussion on toList vs collectList. We can also name it consumeToList.

@fvasco
Copy link
Contributor Author

fvasco commented Sep 1, 2017

I'd suggest to rename ReceiveChannel.drainTo into something like pipeTo. I'm not sure we really need all those additional parameters on min/max element. Can we just do without them? (did not change anything yet, waiting for clarification).

I think max limit is useful.

I explain my use case.

A producer stores events in a channel, this event is unbounded, these occurs randomly and are potentially unbounded.
A consumer store there events in a database, for vary reasons it requires at least 1 event and no more 1000 (a configurable value).
So it is simple await the first event to unlock coroutine, but, on other side, receive too many events is pretty unuseful (and can lead to reallocation to a preallocated ArrayList).

I like if it should possible to define a strategy to limit events, but unfortunately something like channel.take(999).draintTo(arrayList) doesn't work.

@elizarov
Copy link
Contributor

elizarov commented Sep 1, 2017

@fvasco I see your use-case for pipeTo/drainTo now. I think we should have some higher-level "windowed"/"batched" primitive for it. That is a function that collects a batch of element and applies a given operation to them. Unlike syncrhnous windowing function that are coming to Kotlin 1.2 for sequences, async windowing function may come with both size limits and time limits.

@fvasco
Copy link
Contributor Author

fvasco commented Sep 2, 2017

ProducerJob already has fold method (Element.fold), so it is not possible use fold extension function on it.

There is also an inefficiency that stems from the fact that ReceiveChannel is a multi-consumer abstraction

In my use case exposed above I use a channel for one-to-one communication, I cosider a pipe a common event.

@fvasco
Copy link
Contributor Author

fvasco commented Sep 2, 2017

Personal consideration: I seen the ReceiveChannel.consumeAll function, I expected something like to consumeAll(consumer) (aka forEach), the current implementation looks really like the close method, the required clean up actions are only a implementation detail.

@elizarov
Copy link
Contributor

elizarov commented Sep 2, 2017

ProducerJob already has fold method (Element.fold), so it is not possible to use fold extension function on it.

Oops. That's bad. We need to figure out what can be done to solve that issue. The best approach that I see is to get rid of ProducerJob, because the only reason to have it in the first place was to provide an ability to cancel the producing coroutine, but we are introducing a new way to reach the same goal directly in ReceiveChannel interface (see below on receiver's close).

In my use case exposed above I use a channel for one-to-one communication, I consider a pipe a common event.

I find it all the time, too. My current thinking works in this direction:

  • Leave channel abstraction mostly "as-is" and reserve the "Channel" name for concurrent multi-producer, multi-consumer abstraction.
  • Define a new type for SPSC pipelines, with efficient implementations that fails fast if you try to concurrently send/receive to them. The hard thing here is the name. How shall it be named?

Personal consideration: I seen the ReceiveChannel.consumeAll function, I expected something like to consumeAll(consumer) (aka forEach), the current implementation looks really like the close method, the required clean up actions are only a implementation detail.

Agree. consumeAll is not the good name. We should find better one. It is really very similar to what close does, but it does a different thing. So, basically, we have two ways to "close" a channel and we shall figure out how to clearly name both of them:

  • When sender "closes" the channel it sends a special "close token", all the previously sent elements will be received. Sender close tells that "I'm not going to send any more elements".
  • When receiver "closes" the channel it immediately consumes all the remaining elements. Receiver close tells that "I will not receive anymore and nobody else is going to receive any more elements".

@fvasco
Copy link
Contributor Author

fvasco commented Sep 2, 2017

Define a new type for SPSC pipelines... How shall it be named?

I don't think nothing better than "pipe", or something like this one.

Agree. consumeAll is not the good name.

I think discard or dispose are more appropriate.

When receiver "closes" the channel it immediately consumes discards all the remaining elements.

@fvasco
Copy link
Contributor Author

fvasco commented Sep 3, 2017

If "pipe" is the new type then

val producer2 = producer.filter { it%2 == 0}.map { it.toString() }

is a pipeline :)

and we should support:

producer2.discard()

Moreover

pipe.toChannel()

will be a SPMC implementation.

@fvasco
Copy link
Contributor Author

fvasco commented Sep 3, 2017

Most of the functions consume the incoming ReceiveChannel completely, however some of them do not.

zip too, it may consume completely only one channel.

@elizarov
Copy link
Contributor

elizarov commented Sep 8, 2017

I've created a separate issue to discuss single-producer single-consumer channels (their naming and other design issues): #113

@elizarov
Copy link
Contributor

elizarov commented Oct 8, 2017

I plan to get back to this work. We've brainstormed the naming a little bit and decided that that receiver-close operation shall be naturally called cancel. This is logical: sender closes the channel to indicate the it does not plan to send any more items, while receiver cancels channel to indicate that it is not going to receive. Normally, when you have a pipeline, cancelling the receive channel will cancel the producer coroute, so there is a perfect name match.

@elizarov
Copy link
Contributor

elizarov commented Oct 8, 2017

However, we also need to introduce one new operation on receive channel. The operation that marks the channels as being in exlusive "ownership" of the reciver. This should protect from obvious mistakes like trying to filter twice on the same channel. So all the operations that fully "consume" the channel will mark the fact that they now "own" (or are consuming) it and thus a second attempt will fail fast. Name of this function is TBD.

@fvasco
Copy link
Contributor Author

fvasco commented Oct 8, 2017

exlusive "ownership"

It sounds like a Mutex attached to a Channel.

However filter twice a channel should be legal, ie:

val c : Channel = ...
// find the first A and the following B
val (a) = c.filter { it is A }.take(1)
val (b) = c.filter { it is B }.take(1)

@elizarov
Copy link
Contributor

elizarov commented Oct 8, 2017

@fvasco This particular case you've posted whenfilter/take(1) is used twice is going to work if and only if take(1) is implemented so that it stops consuming the channel as soon as it takes this one item. However, I think that this variant of take is error-prone. Let me explain. Consider this code:

val c: ReceiveChannel<Data> = ... // it is coming from somewhere
val (a) = c.take(1)
// do something with a (A)
val (b) = c.take(1) 
// do something else

But what happens if code at (A) fails with exception?

When building pipelines with channels one has to be extremely careful to make sure that channel is always consumed fully. If you stop consuming the channel, then the producer just suspends forever, waiting. Of course, you can orchestrate all of this properly in your code, alway making sure to use try-finally or some inline function that wraps it, but this has to be done carefully. It is very easy to forget to handle exceptions and to leave the channel "in the air" in the case of crash.

I, so far, had convinced myself that for safety and reliability purposes we should only provide intermediate and terminal operations on channels that always consume the channel they are invoked on and their implementations should include all the proper measures to cancel the channel if something crashes. Every extension function that we provide on ReceiveChannel<T> (including filter, map, and take) should be, ultimately, structured like this either directly or via some other inline function:

fun ReceiveChannel<T>.operation(...) {
    try {
        // receive elements and do something -- does not matter if terminal or intermediate
   } finally {
      cancel() // always cancel the channel if we are done normally or with exception
  }
}

I do believe that even take(x) operation should be written like that. Yes, it is somewhat limiting, since you will not be able to write that kind of code that uses take twice, but I think that is a reasonable price to pay for the increased robustness and safety in common pipeline use-cases.

Moreover, the very important feature of this kind of structure, is that it enables optimization via operator fusion. Consider c.filter { ... }.map { ... } invocation. If there is a guarantee that map fully consumes the channel that is produced by filter and no other code is allowed to use the channel that was produced by filter, then map and filter can be fused into a single coroutine if they run in the same context. Ultimately, that is our chance to get Rx-like performance.

To clarify, the plan is to use this "channel ownership" not like a simple tag, but to perform this fusion, e.g. when map requests ownership on the channel that is produced by filter, the fusion logic kicks in to check if fusion is possible and performs it if it is, completely avoiding creation of yet another channel for map result in that case.

@fvasco
Copy link
Contributor Author

fvasco commented Oct 9, 2017

Hi @elizarov,
the final design looks reasonable, but I wish expose some considerations.

Channel is a ReceiveChannel, you are proposing to add a new operator grab (as example) to become an exclusive receiver on it. In such case the Channel might be not isClosedForReceive but the receive operation might fail (owner check fails). This behavious looks inconsistent with the current ReceiveChannel interface.

Sometimes I consider the current Channel implementation as a DispatchChannel, similarly to BroadcastChannel, so this my requires or not an explicit subscription.

Following this reasoning, the effective Channel should be for one-to-one communication, so no explicit grab should be required.

In such case the library should contains the terminal operators:

fun Channel.broadcast(): ...
fun Channel.dispatch(): ...

As finally consideration adding the cancel operator to ReceiveChannel make SubscriptionReceiveChannel effectively useless.

@fvasco
Copy link
Contributor Author

fvasco commented Oct 10, 2017

A qiuck example related to #113

// one-to-one
interface Channel<T> : SendChannel<T>, ReceiveChannel<T> {
    val capacity: Int
}

// one-to-many
interface MulticastChannel<T> : SendChannel<T> {
    // get a new receiver
    fun openSubscription(): ReceiveChannel<T>
}

fun <T> Channel<T>.broadcast(): MulticastChannel<T> = TODO()
fun <T> Channel<T>.dispatch(): MulticastChannel<T> = TODO()

@elizarov
Copy link
Contributor

elizarov commented Oct 12, 2017

@fvasco You are right. It is in the scope of #113

We do have an option to introduce a separate single-consumer abstraction and explicitly represent it (statically) in the type system. Moreover, if we follow this path, then this single-producer/single-consumer should logically be the default, since that is what you'll want most of the time.

This way, every time you have a channel (which is going to be single-consumer by default) and you want to use a "fan-out" pattern you'll have to invoke dispatch and then you'll be able to have multiple coroutines receiving from it.

I'm trying to explore here is whether we can live without explicitly representing it in the type system and thus without requiring this dispatch invocation. Why I want to explore this? The reason is mostly philosophical, but still. See, the coroutines are a concurrent abstraction. It does not seem right to have a core coroutine-related abstraction (channel) that does not support concurrency by default. I feel that it is going to be quite brittle and error-prone if we force users to invoke dispatch every time they want to do fan-out. Why error-prone? Because if you forget to do dispatch, but you are doing receive from multiple coroutines, then your code might still work if your receive invocations happen not to overlap, but may fail if some timings/environment changes. We'll be able to catch the violation and report an error only in some cases.

So I want to prototype (first) this version where we don't have a separate dispatch and don't have those separate types, but just channels that work both for single-receiver and multiple-receiver cases, and see if it going to be good or not.

elizarov pushed a commit that referenced this pull request Nov 15, 2017
elizarov pushed a commit that referenced this pull request Nov 20, 2017
@elizarov
Copy link
Contributor

I've manually merged this PR into develop branch with a bunch of additional tweaks on top of it in a separate commit. See comments here: b555d91

To highlight: The cancel method was introduced on ReceiveChannel and ProducerJob was removed (produce now simply returns ReceiveChannel). All intermediate & terminal operators on ReceiveChannel fully consume the source channel using helper inline functions consume or consumeEach (whichever more convenient). I'll create a separate issue to make it fail-fast when trying to consume the channel that is already being consumed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants