Skip to content

Commit

Permalink
Fix numerical overflow in FlatMapStressTest.kt
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad committed Apr 22, 2019
1 parent e1ac2e5 commit 165fbaf
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/test/TestBase.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public suspend inline fun <reified T : Throwable> assertFailsWith(flow: Flow<*>)
}

public suspend fun Flow<Int>.sum() = fold(0) { acc, value -> acc + value }
public suspend fun Flow<Long>.longSum() = fold(0L) { acc, value -> acc + value }

public class TestException(message: String? = null) : Throwable(message), NonRecoverableThrowable
public class TestException1(message: String? = null) : Throwable(message), NonRecoverableThrowable
Expand Down
16 changes: 8 additions & 8 deletions kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import kotlin.test.*
class FlatMapStressTest : TestBase() {

private val iterations = 2000 * stressTestMultiplier
private val expectedSum = iterations * (iterations + 1) / 2
private val expectedSum = iterations.toLong() * (iterations + 1) / 2

@Test
fun testConcurrencyLevel() = runTest {
Expand All @@ -35,7 +35,7 @@ class FlatMapStressTest : TestBase() {
val bufferSize = 5
withContext(Dispatchers.Default) {
val inFlightElements = AtomicLong(0L)
var result = 0
var result = 0L
(1..iterations step 4).asFlow().flatMapMerge(bufferSize = bufferSize) { value ->
unsafeFlow {
repeat(4) {
Expand All @@ -59,11 +59,11 @@ class FlatMapStressTest : TestBase() {
@Test
fun testDelivery() = runTest {
withContext(Dispatchers.Default) {
val result = (1..iterations step 4).asFlow().flatMapMerge { value ->
val result = (1L..iterations step 4).asFlow().flatMapMerge { value ->
unsafeFlow {
repeat(4) { emit(value + it) }
}
}.sum()
}.longSum()
assertEquals(expectedSum, result)
}
}
Expand All @@ -72,12 +72,12 @@ class FlatMapStressTest : TestBase() {
fun testIndependentShortBursts() = runTest {
withContext(Dispatchers.Default) {
repeat(iterations) {
val result = (1..4).asFlow().flatMapMerge { value ->
val result = (1L..4L).asFlow().flatMapMerge { value ->
unsafeFlow {
emit(value)
emit(value)
}
}.sum()
}.longSum()
assertEquals(20, result)
}
}
Expand All @@ -86,14 +86,14 @@ class FlatMapStressTest : TestBase() {
private suspend fun testConcurrencyLevel(maxConcurrency: Int) {
assumeTrue(maxConcurrency <= CORE_POOL_SIZE)
val concurrency = AtomicLong()
val result = (1..iterations).asFlow().flatMapMerge(concurrency = maxConcurrency) { value ->
val result = (1L..iterations).asFlow().flatMapMerge(concurrency = maxConcurrency) { value ->
unsafeFlow {
val current = concurrency.incrementAndGet()
assertTrue(current in 1..maxConcurrency)
emit(value)
concurrency.decrementAndGet()
}
}.sum()
}.longSum()

assertEquals(0, concurrency.get())
assertEquals(expectedSum, result)
Expand Down

0 comments on commit 165fbaf

Please sign in to comment.