Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow.chunked operator with size limit #1290

Closed
azulkarnyaev opened this issue Jun 24, 2019 · 21 comments
Closed

Flow.chunked operator with size limit #1290

azulkarnyaev opened this issue Jun 24, 2019 · 21 comments
Labels

Comments

@azulkarnyaev
Copy link

azulkarnyaev commented Jun 24, 2019

It would be useful to have an optional transformation in Flow.buffer method to aggregate the buffered items like in kotlin.sequences.Sequence.chunked.
I mean,
fun buffer(capacity: Int = BUFFERED, suspend transform: (List<T>) -> R) : Flow<T>

Then we can write

runBlocking {
        (1..100).asFlow().buffer(capacity = 10) { it.sum() }.collect { println(it) }
    }

with result 55, 155, 255, ... , 955

@elizarov
Copy link
Contributor

elizarov commented Jun 24, 2019

That would be a separate operator that we'll call chunked or something like that (as we generally follow stdlib naming convention). This operator will be totally unrelated to the buffer operator. Unlike buffer, this operator will be fully sequential.

P.S. Rx has a whole set of bufferXxx operators that actually correspond to chunked/windowed in Kotlin. On the other hand, buffer/conflate operators in Kotlin flows somewhat correspond to Rx onBackpressureBuffer operators.

@elizarov elizarov added the flow label Jun 24, 2019
@elizarov elizarov changed the title Flow.buffer with transformer Flow.chunked operator Jun 24, 2019
@elizarov
Copy link
Contributor

Also, I've forgot to ask what be your use-case for such an operator?

@azulkarnyaev
Copy link
Author

Thank you very much for the response!
About the use case: I need to handle stream of vectors (csv row) from a tcp socket and write aggregated statistics for every chunk of n messages to files.

@elizarov
Copy link
Contributor

Are you sure you need Flow for this? Wouldn't a Sequence from Kotlin standard library work for you?

@azulkarnyaev
Copy link
Author

Well, I'm using Ktor as a server for socket connection and using coroutines to write data to a file. Pseudo code for my task:

launch {
    val socket = server.accept()
    val input = socket.openReadChannel()
    flow {
          while (true) {
                val line = input.readUTF8Line()
                emit(line)
           }
    }.map { 
          convertToDomainObject(it) 
    }. chunked(1000) {
          aggregateToDomainObjects(it)
    }.collect {
          writeToFile(it)
    }
}

Yes, I can use just sync Sequence. But then I need to provide a back pressure mechanism manually: what if I receive messages faster than I store them? Hopefully, chunked() method will be the same as buffer() and provide a way for back pressure out of the box.

@elizarov
Copy link
Contributor

@azulkarnyaev Thanks for explanation. It does make sense.

@circusmagnus
Copy link

circusmagnus commented Jun 30, 2019

I would second this issue - I have a use case, where I'm receiving subsequent snapshots of a database and I need to produce classes representing diffs between those snapshots.

So I need to cache two subsequent emissions, emit them as Pair(first, second), remove the first emission, and wait for the third emission to emit another Pair(second, third). With collections, I do get it with windowed function.

I do not control frequency of emissions and they must happen on a background thread -> hence Flow is needed.

I think it could be generalized into something like this:

fun <T> Flow<T>.windowed(size: Int, step: Int): Flow<List<T>> = flow {
    // check that size and step are > 0
    val queue = ArrayDeque<T>(size)
    val toSkip = max(step - size, 0) < if sbd would like to skip some elements before getting another window, by serving step greater than size, then why not?
    val toRemove = min(step, size)
    var skipped = 0
    collect { element ->
        if(queue.size < size && skipped == toSkip) {
            queue.add(element)
        }
        else if (queue.size < size && skipped < toSkip) {
        skipped++
    }

        if(queue.size == size) {
            emit(queue.toList())
            repeat(toRemove) { queue.remove() }
            skipped = 0
        }
    }
} 

Intended use:

 flow.windowed(size = 2, step = 1)
    .map { listOfTwoNeighboringEmissions ->
        computeDiff()
} 

@elizarov elizarov changed the title Flow.chunked operator Flow.chunked operator with size limit Jul 1, 2019
@zach-klippenstein
Copy link
Contributor

@circusmagnus That sounds like a use case for scan more than windowing.

@circusmagnus
Copy link

Almost. Scan requires me to either provide an initial value, which I do not have (an empty diff is stupid, I just need to swallow first dB emission and wait for the second one to produce a diff) or to emit the same type, as I am receiving (scanReduce), which is also a no go, as I get a list of entities, but want to emit changes between them.

@tunjid
Copy link

tunjid commented Aug 6, 2020

Would this work?

            flow
                .scan(listOf<Item>()) { oldItems, newItem ->
                    if (oldItems.size >= BUFFER_COUNT) listOf(newItem)
                    else oldItems + newItem
                }
                .filter { it.size ==  BUFFER_COUNT}

@circusmagnus
Copy link

On the first look it should work. I would recommend however to use more efficient and streamlined operator outlined in this PR: #1558
flow.chunked(2) { twoEmissions -> combine(twoEmissions) }

It is not going into coroutines lib, as it does not deal with time-based chunking / windowing. But for now it is best solution for size-based chunking.

@AWinterman
Copy link

Hi all, I'm curious if there's been any work on this. I reach for something like this about once every two weeks, and keep coming back to this thread.

@circusmagnus
Copy link

I have proposed a design for unified time- and size-based chunking in #1302 . You are welcome to comment or just give thumbs up (or down).
No idea, what is the plan of coroutines team, regarding this issue, though.

@AWinterman
Copy link

AWinterman commented Feb 9, 2021

I'm realizing that I actually want a slightly different behavior from what I've seen discussed thus far, because really all I want is to be able to convert a stream of values to a batch operation when appropriate.

something like the following:

/**
 * [chunked] buffers a maximum of [maxSize] elements, preferring to emit early rather than wait if less than
 * [maxSize]
 *
 * If [checkIntervalMillis] is specified, chunkedNaturally suspends [checkIntervalMillis] to allow the buffer to fill.
 *
 * TODO: move to kotlin common
 */

fun <T> Flow<T>.chunked(maxSize: Int, checkIntervalMillis: Long = 0): Flow<List<T>> 

This is optimizing for a database that performs better with batch operations than a number of small ones, and for which it's safest to restrict writes to once a second.

My implementation is here. I'm sure I'm using coroutines incorrectly somehow:
https://gist.github.com/AWinterman/8516d4869f491176ebb270dafbb23199

@circusmagnus
Copy link

Seems that your chunked operator will suspend after filling up buffer (max size reached) but it will not emit until checkIntervalMillis is reached. checkIntervalMillis is a must-have condition for it to emit.

Is it intentional?

@AWinterman
Copy link

AWinterman commented Feb 9, 2021

@circusmagnus I'm not sure I follow.

  1. suspending after filling up the buffer is intentional. If the buffer is full, we need to exert backpressure on the upstream flow.
  2. the delay(checkIntervalMillis) ensures that we do not busy wait, and that the buffer has a chance to fill up before we collect a chunk and emit.

I don't know what you mean by "checkIntervalMillis is a must-have condition for it to emit."

@AWinterman
Copy link

AWinterman commented Feb 9, 2021

Ah, i just realized that the delay interval can be accomplished downstream

.transform {
    emit(it)
    delay(100)
}

which makes my usecase wholy subsumed by Flow.chunked operator with size limit.

@circusmagnus
Copy link

circusmagnus commented Feb 9, 2021

@circusmagnus I'm not sure I follow.

  1. suspending after filling up the buffer is intentional. If the buffer is full, we need to exert backpressure on the upstream flow.
  2. the delay(checkIntervalMillis) ensures that we do not busy wait, and that the buffer has a chance to fill up before we collect a chunk and emit.

I don't know what you mean by "checkIntervalMillis is a must-have condition for it to emit."

  1. If the buffer is full, we could try to emit, rather than suspend upstream until checkIntervalMillis is reached. Perhaps downstream is idle and it can accept a new chunk before checkIntervalMillis comes. In your impl downstream cannot emit more often, than checkIntervalMillis specifies. There is non-circumnavigable delay() there.
while (!buffer.isClosedForReceive) {
                    val chunk = getChunk(buffer, maxSize)
                    [email protected](chunk)
                    delay(checkIntervalMillis) <- we cannot emit more often, than that
                }
  1. Sure.checkIntervalMillis is a must have condition to emit, but maxSize is not - we can emit before reaching max size, but we cannot emit more often, than checkIntervalMillis says. Was it intentional? Do You need to limit frequency of emissions in your use-case?

@AWinterman
Copy link

@circusmagnus I'm not sure I follow.

  1. suspending after filling up the buffer is intentional. If the buffer is full, we need to exert backpressure on the upstream flow.
  2. the delay(checkIntervalMillis) ensures that we do not busy wait, and that the buffer has a chance to fill up before we collect a chunk and emit.

I don't know what you mean by "checkIntervalMillis is a must-have condition for it to emit."

  1. If the buffer is full, we could try to emit, rather than suspend upstream until checkIntervalMillis is reached. Perhaps downstream is idle and it can accept a new chunk before checkIntervalMillis comes. In your impl downstream cannot emit more often, than checkIntervalMillis specifies. There is non-circumnavigable delay() there.
while (!buffer.isClosedForReceive) {
                    val chunk = getChunk(buffer, maxSize)
                    [email protected](chunk)
                    delay(checkIntervalMillis) <- we cannot emit more often, than that
                }
  1. Sure.checkIntervalMillis is a must have condition to emit, but maxSize is not - we can emit before reaching max size, but we cannot emit more often, than checkIntervalMillis says. Was it intentional? Do You need to limit frequency of emissions in your use-case?

ah, yes, it was intentionally. Emit no more frequently than X, but as I stated above, because flows are composeable, this can be accomplished with a downstream flow operation, so my use case is entirely satisfied by a "Flow.chunked operator with size limit"

@sskrla
Copy link

sskrla commented Nov 9, 2022

Another possible implementation that we currently use:

/**
 * Chunks based on a time or size threshold.
 *
 * Borrowed from this [Stack Overflow question](https://stackoverflow.com/questions/51022533/kotlin-chunk-sequence-based-on-size-and-time).
 */
@OptIn(ObsoleteCoroutinesApi::class) 
fun <T> ReceiveChannel<T>.chunked(scope: CoroutineScope, size: Int, time: Duration) =
    scope.produce<List<T>> {
        while (true) { // this loop goes over each chunk
            val chunk = ConcurrentLinkedQueue<T>() // current chunk
            val ticker = ticker(time.toMillis()) // time-limit for this chunk
            try {
                whileSelect {
                    ticker.onReceive {
                        false  // done with chunk when timer ticks, takes priority over received elements
                    }
                    [email protected] {
                        chunk += it
                        chunk.size < size // continue whileSelect if chunk is not full
                    }
                }

            } catch (e: ClosedReceiveChannelException) {
                return@produce

            } finally {
                ticker.cancel()
                if (chunk.isNotEmpty())
                    send(chunk.toList())
            }
        }
    }

fun <T> Flow<T>.chunked(size: Int, time: Duration) =
    channelFlow {
        coroutineScope {
            val channel = asChannel(this@chunked).chunked(this, size, time)
            try {
                while (!channel.isClosedForReceive) {
                    send(channel.receive())
                }

            } catch(e: ClosedReceiveChannelException) {
                // Channel was closed by the flow completing, nothing to do

            } catch(e: CancellationException) {
                channel.cancel(e)
                throw e

            } catch (e: Exception) {
                channel.cancel(CancellationException("Closing channel due to flow exception", e))
                throw e
            }
        }
    }

@ExperimentalCoroutinesApi 
fun <T> CoroutineScope.asChannel(flow: Flow<T>): ReceiveChannel<T> = produce {
    flow.collect { value ->
        channel.send(value)
    }
}

I am not certain this is entirely correct, specifically launching a scope within the channelFlow seems like it may not be a very "flowy".

Our specific use case is batch with a max linger, so that we can attempt to make efficient external service calls, but not introduce too much delay if we can't fill up the batch.

@iseki0
Copy link

iseki0 commented Apr 18, 2024

Time has passed quickly, and it’s already been five years since this issue was first raised. Could you please provide an update on when we can expect a resolution?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

8 participants