Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow size- and time-based chunked #2378

Open
wants to merge 28 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
74d3d60
Merge pull request #1 from Kotlin/master
circusmagnus Mar 18, 2020
c0bf01b
Merge pull request #2 from Kotlin/develop
circusmagnus Mar 18, 2020
ccb76c8
Merge pull request #3 from Kotlin/master
circusmagnus Nov 3, 2020
d52fd69
Merge pull request #4 from Kotlin/master
circusmagnus Nov 6, 2020
6fb01b9
Add time- and size-based chunking operators
circusmagnus Nov 10, 2020
43bfcfb
Remove unused operators
circusmagnus Nov 10, 2020
c378678
Add visibility modifiers and clarify tests
circusmagnus Nov 10, 2020
cfbd8ea
Merge pull request #5 from Kotlin/master
circusmagnus Dec 23, 2020
1c98a45
Merge remote-tracking branch 'origin/master' into flow-time-based-chu…
circusmagnus Dec 23, 2020
5237f92
Chunk with interval and size only
circusmagnus Dec 23, 2020
8b8b28e
Chunk with interval and size only - part 2
circusmagnus Jan 8, 2021
c2a4eac
Merge pull request #6 from Kotlin/master
circusmagnus Mar 29, 2021
5c5c088
Add time- and size-based chunking operators
circusmagnus Nov 10, 2020
e04a106
Remove unused operators
circusmagnus Nov 10, 2020
942b163
Add visibility modifiers and clarify tests
circusmagnus Nov 10, 2020
a12429e
Chunk with interval and size only
circusmagnus Dec 23, 2020
da1a57c
Chunk with interval and size only - part 2
circusmagnus Jan 8, 2021
632d540
Prepare Chunking Methods
circusmagnus Mar 29, 2021
c3244ff
Add a bunch of tests
circusmagnus Mar 31, 2021
5b5c3bd
Test Time based chunking
circusmagnus Apr 2, 2021
2b9e5d1
Add docs and last tests
circusmagnus Apr 12, 2021
9cb86f9
Add test for error propagation in Natural Chunking
circusmagnus Apr 14, 2021
d996a9b
Enable for JDK 1.6
circusmagnus Apr 14, 2021
b16e9b0
Merge remote-tracking branch 'origin/flow-time-based-chunked' into fl…
circusmagnus Apr 14, 2021
3aaf7bd
Merge pull request #7 from Kotlin/develop
circusmagnus Apr 14, 2021
e795cc2
Merge remote-tracking branch 'origin/develop' into flow-time-based-ch…
circusmagnus Apr 14, 2021
3fb6939
Adjust for changes in Channel API
circusmagnus Apr 15, 2021
7431426
New Api dump
circusmagnus Apr 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Chunk with interval and size only
  • Loading branch information
circusmagnus committed Mar 29, 2021
commit a12429eec3df962ccca352fe90082747277c7804
191 changes: 52 additions & 139 deletions kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,170 +12,83 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
import kotlin.math.*
import kotlin.time.*

private const val NO_MAXIMUM = -1

public fun <T> Flow<T>.chunked(maxSize: Int, minSize: Int = 1): Flow<List<T>> {
require(minSize in 0 until maxSize)
return flow {
val accumulator = mutableListOf<T>()
collect { value ->
accumulator.add(value)
if (accumulator.size == maxSize) emit(accumulator.drain())
}
if (accumulator.size >= minSize) emit(accumulator)
}
public object ChunkConstraint {
public const val NO_MAXIMUM: Int = Int.MAX_VALUE
public const val NO_INTERVAL: Long = Long.MAX_VALUE
}

@ExperimentalTime
public fun <T> Flow<T>.chunked(
chunkDuration: Duration,
minSize: Int = 1,
maxSize: Int = NO_MAXIMUM
): Flow<List<T>> = chunked(chunkDuration.toDelayMillis(), minSize, maxSize)
interval: Duration,
size: Int
): Flow<List<T>> = chunked(interval.toLongMilliseconds(), size)

public fun <T> Flow<T>.chunked(
chunkDurationMs: Long,
minSize: Int = 1,
maxSize: Int = NO_MAXIMUM
intervalMs: Long,
size: Int
): Flow<List<T>> {
require(chunkDurationMs > 0)
require(minSize >= 0)
require(maxSize == NO_MAXIMUM || maxSize >= minSize)

return if (minSize == 0 && maxSize == NO_MAXIMUM) chunkFixedTimeWindows(chunkDurationMs)
else if (minSize == 0) chunkContinousWindows(chunkDurationMs, maxSize)
else chunkFloatingWindows(chunkDurationMs, minSize, maxSize)
}

private fun <T> Flow<T>.chunkFixedTimeWindows(durationMs: Long): Flow<List<T>> = scopedFlow { downstream ->
val upstream = produce(capacity = Channel.CHANNEL_DEFAULT_CAPACITY) {
val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) }
launch {
for (tick in ticker) send(Signal.TimeIsUp)
}
collect { value -> send(Signal.NewElement(value)) }
ticker.close()
}
val accumulator = mutableListOf<T>()
require(intervalMs >= 0)
require(size > 0)

for (signal in upstream) {
when (signal) {
is Signal.NewElement -> accumulator.add(signal.value)
is Signal.TimeIsUp -> downstream.emit(accumulator.drain())
}
}
if (accumulator.isNotEmpty()) downstream.emit(accumulator)
return if(intervalMs != ChunkConstraint.NO_INTERVAL) chunkedTimeBased(intervalMs, size)
else chunkedSizeBased(size)
}

private fun <T> Flow<T>.chunkContinousWindows(durationMs: Long, maxSize: Int): Flow<List<T>> =
scopedFlow { downstream ->
val inbox: ReceiveChannel<T> = [email protected](this)
val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) }
val accumulator = mutableListOf<T>()
private fun <T> Flow<T>.chunkedTimeBased(intervalMs: Long, size: Int): Flow<List<T>> = scopedFlow { downstream ->
val buffer = Channel<T>(size)
val emitSemaphore = Channel<Unit>()
val collectSemaphore = Channel<Unit>()

whileSelect {
inbox.onReceiveOrClosed.invoke { valueOrClosed ->
val isOpen = !valueOrClosed.isClosed
if (isOpen) {
accumulator.add(valueOrClosed.value)
if(accumulator.size == maxSize){
ticker.send(Ticker.Message.Reset)
downstream.emit(accumulator.drain())
ticker.send(Ticker.Message.Start)
}
}
isOpen
}
ticker.onReceive.invoke {
downstream.emit(accumulator.drain())
true
launch {
collect { value ->
val hasCapacity = buffer.offer(value)
if (!hasCapacity) {
emitSemaphore.send(Unit)
collectSemaphore.receive()
buffer.send(value)
}
}

ticker.close()
if (accumulator.isNotEmpty()) downstream.emit(accumulator)
emitSemaphore.close()
buffer.close()
}

private fun <T> Flow<T>.chunkFloatingWindows(
durationMs: Long,
minSize: Int,
maxSize: Int,
): Flow<List<T>> {

return scopedFlow { downstream ->
val upstream: ReceiveChannel<T> = [email protected](this)
val ticker = Ticker(durationMs, this)
val accumulator = mutableListOf<T>()

whileSelect {
upstream.onReceiveOrClosed.invoke { valueOrClosed ->
val isOpen = valueOrClosed.isClosed.not()
if (isOpen) {
if (accumulator.isEmpty()) ticker.send(Ticker.Message.Start)
accumulator.add(valueOrClosed.value)
if (accumulator.size == maxSize) {
ticker.send(Ticker.Message.Reset)
downstream.emit(accumulator.drain())
}
}
isOpen
}
ticker.onReceive.invoke {
if (accumulator.size >= minSize) downstream.emit(accumulator.drain())
true
}
whileSelect {
emitSemaphore.onReceiveOrClosed { valueOrClosed ->
buffer.drain().takeIf { it.isNotEmpty() }?.let { downstream.emit(it) }
val shouldCollectNextChunk = valueOrClosed.isClosed.not()
if (shouldCollectNextChunk) collectSemaphore.send(Unit) else collectSemaphore.close()
shouldCollectNextChunk
}

ticker.close()
if (accumulator.size >= minSize) downstream.emit(accumulator)
}
}

private class Ticker(
private val intervalMs: Long,
scope: CoroutineScope,
private val inbox: Channel<Message> = Channel(),
private val ticks: Channel<Unit> = Channel()
) : SendChannel<Ticker.Message> by inbox, ReceiveChannel<Unit> by ticks {

init {
scope.processMessages()
}

private fun CoroutineScope.processMessages() = launch {
var ticker = setupTicks()
for (message in inbox) {
when (message) {
Message.Start -> ticker.start()
Message.Reset -> {
ticker.cancel()
ticker = setupTicks()
}
}
onTimeout(intervalMs) {
downstream.emit(buffer.awaitFirstAndDrain())
true
}
ticker.cancel()
ticks.cancel()
}
}

private fun CoroutineScope.setupTicks() = launch(start = CoroutineStart.LAZY) {
while (true) {
delay(intervalMs)
ticks.send(Unit)
}
}
private suspend fun <T> ReceiveChannel<T>.awaitFirstAndDrain(): List<T> {
val first = receiveOrClosed().takeIf { it.isClosed.not() }?.value ?: return emptyList()
return drain(mutableListOf(first))
}

sealed class Message {
object Start : Message()
object Reset : Message()
private tailrec fun <T> ReceiveChannel<T>.drain(acc: MutableList<T> = mutableListOf()): List<T> {
val item = poll()
return if (item == null) acc
else {
acc.add(item)
drain(acc)
}
}

private sealed class Signal<out T> {
class NewElement<out T>(val value: T) : Signal<T>()
object TimeIsUp : Signal<Nothing>()
private fun <T> Flow<T>.chunkedSizeBased(size: Int): Flow<List<T>> = flow {
val buffer = mutableListOf<T>()
collect { value ->
buffer.add(value)
if(buffer.size == size) emit(buffer.drain())
}
if(buffer.isNotEmpty()) emit(buffer)
}

private fun <T> MutableList<T>.drain() = toList().also { this.clear() }