-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flow.chunked operator with size limit #1290
Comments
That would be a separate operator that we'll call P.S. Rx has a whole set of |
Also, I've forgot to ask what be your use-case for such an operator? |
Thank you very much for the response! |
Are you sure you need |
Well, I'm using Ktor as a server for socket connection and using coroutines to write data to a file. Pseudo code for my task:
Yes, I can use just sync |
@azulkarnyaev Thanks for explanation. It does make sense. |
I would second this issue - I have a use case, where I'm receiving subsequent snapshots of a database and I need to produce classes representing diffs between those snapshots. So I need to cache two subsequent emissions, emit them as Pair(first, second), remove the first emission, and wait for the third emission to emit another Pair(second, third). With collections, I do get it with windowed function. I do not control frequency of emissions and they must happen on a background thread -> hence Flow is needed. I think it could be generalized into something like this:
Intended use:
|
@circusmagnus That sounds like a use case for scan more than windowing. |
Almost. Scan requires me to either provide an initial value, which I do not have (an empty diff is stupid, I just need to swallow first dB emission and wait for the second one to produce a diff) or to emit the same type, as I am receiving (scanReduce), which is also a no go, as I get a list of entities, but want to emit changes between them. |
Would this work? flow
.scan(listOf<Item>()) { oldItems, newItem ->
if (oldItems.size >= BUFFER_COUNT) listOf(newItem)
else oldItems + newItem
}
.filter { it.size == BUFFER_COUNT} |
On the first look it should work. I would recommend however to use more efficient and streamlined operator outlined in this PR: #1558 It is not going into coroutines lib, as it does not deal with time-based chunking / windowing. But for now it is best solution for size-based chunking. |
Hi all, I'm curious if there's been any work on this. I reach for something like this about once every two weeks, and keep coming back to this thread. |
I have proposed a design for unified time- and size-based chunking in #1302 . You are welcome to comment or just give thumbs up (or down). |
I'm realizing that I actually want a slightly different behavior from what I've seen discussed thus far, because really all I want is to be able to convert a stream of values to a batch operation when appropriate. something like the following: /**
* [chunked] buffers a maximum of [maxSize] elements, preferring to emit early rather than wait if less than
* [maxSize]
*
* If [checkIntervalMillis] is specified, chunkedNaturally suspends [checkIntervalMillis] to allow the buffer to fill.
*
* TODO: move to kotlin common
*/
fun <T> Flow<T>.chunked(maxSize: Int, checkIntervalMillis: Long = 0): Flow<List<T>> This is optimizing for a database that performs better with batch operations than a number of small ones, and for which it's safest to restrict writes to once a second. My implementation is here. I'm sure I'm using coroutines incorrectly somehow: |
Seems that your chunked operator will suspend after filling up buffer (max size reached) but it will not emit until checkIntervalMillis is reached. Is it intentional? |
@circusmagnus I'm not sure I follow.
I don't know what you mean by " |
Ah, i just realized that the delay interval can be accomplished downstream
which makes my usecase wholy subsumed by Flow.chunked operator with size limit. |
|
ah, yes, it was intentionally. Emit no more frequently than X, but as I stated above, because flows are composeable, this can be accomplished with a downstream flow operation, so my use case is entirely satisfied by a "Flow.chunked operator with size limit" |
Another possible implementation that we currently use:
I am not certain this is entirely correct, specifically launching a scope within the Our specific use case is batch with a max linger, so that we can attempt to make efficient external service calls, but not introduce too much delay if we can't fill up the batch. |
Time has passed quickly, and it’s already been five years since this issue was first raised. Could you please provide an update on when we can expect a resolution? |
It would be useful to have an optional transformation in
Flow.buffer
method to aggregate the buffered items like inkotlin.sequences.Sequence.chunked
.I mean,
fun buffer(capacity: Int = BUFFERED, suspend transform: (List<T>) -> R) : Flow<T>
Then we can write
with result 55, 155, 255, ... , 955
The text was updated successfully, but these errors were encountered: