Skip to content

Commit

Permalink
Dump pool threads on BroadcastChannelMultiReceiveStressTest failure;
Browse files Browse the repository at this point in the history
Move thread test utils to different file
  • Loading branch information
elizarov committed Oct 26, 2017
1 parent 6f5bd3f commit ffc61ae
Show file tree
Hide file tree
Showing 17 changed files with 67 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,54 +27,6 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.LockSupport

private const val WAIT_LOST_THREADS = 10_000L // 10s
private val ignoreLostThreads = mutableSetOf<String>()

fun ignoreLostThreads(vararg s: String) { ignoreLostThreads += s }

fun currentThreads(): Set<Thread> {
var estimate = 0
while (true) {
estimate = estimate.coerceAtLeast(Thread.activeCount() + 1)
val arrayOfThreads = Array<Thread?>(estimate) { null }
val n = Thread.enumerate(arrayOfThreads)
if (n >= estimate) {
estimate = n + 1
continue // retry with a better size estimate
}
val threads = hashSetOf<Thread>()
for (i in 0 until n)
threads.add(arrayOfThreads[i]!!)
return threads
}
}

fun checkTestThreads(threadsBefore: Set<Thread>) {
// give threads some time to shutdown
val waitTill = System.currentTimeMillis() + WAIT_LOST_THREADS
var diff: List<Thread>
do {
val threadsAfter = currentThreads()
diff = (threadsAfter - threadsBefore).filter { thread ->
ignoreLostThreads.none { prefix -> thread.name.startsWith(prefix) }
}
if (diff.isEmpty()) break
} while (System.currentTimeMillis() <= waitTill)
ignoreLostThreads.clear()
if (diff.isEmpty()) return
val message = "Lost threads ${diff.map { it.name }}"
println("!!! $message")
println("=== Dumping lost thread stack traces")
diff.forEach { thread ->
println("Thread \"${thread.name}\" ${thread.state}")
val trace = thread.stackTrace
for (t in trace) println("\tat ${t.className}.${t.methodName}(${t.fileName}:${t.lineNumber})")
println()
}
println("===")
error(message)
}

fun trackTask(block: Runnable) = timeSource.trackTask(block)

// helper function to dump exception to stdout for ease of debugging failed tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package kotlinx.coroutines.experimental

import guide.test.checkTestThreads
import guide.test.currentThreads
import org.junit.After
import org.junit.Before
import java.util.concurrent.atomic.AtomicBoolean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package kotlinx.coroutines.experimental

private const val WAIT_LOST_THREADS = 10_000L // 10s
private val ignoreLostThreads = mutableSetOf<String>()

fun ignoreLostThreads(vararg s: String) { ignoreLostThreads += s }

fun currentThreads(): Set<Thread> {
var estimate = 0
while (true) {
estimate = estimate.coerceAtLeast(Thread.activeCount() + 1)
val arrayOfThreads = Array<Thread?>(estimate) { null }
val n = Thread.enumerate(arrayOfThreads)
if (n >= estimate) {
estimate = n + 1
continue // retry with a better size estimate
}
val threads = hashSetOf<Thread>()
for (i in 0 until n)
threads.add(arrayOfThreads[i]!!)
return threads
}
}

fun List<Thread>.dumpThreads(header: String) {
println("=== $header")
forEach { thread ->
println("Thread \"${thread.name}\" ${thread.state}")
val trace = thread.stackTrace
for (t in trace) println("\tat ${t.className}.${t.methodName}(${t.fileName}:${t.lineNumber})")
println()
}
println("===")
}

fun ThreadPoolDispatcher.dumpThreads(header: String) =
currentThreads().filter { it is PoolThread && it.dispatcher == this@dumpThreads }.dumpThreads(header)

fun checkTestThreads(threadsBefore: Set<Thread>) {
// give threads some time to shutdown
val waitTill = System.currentTimeMillis() + WAIT_LOST_THREADS
var diff: List<Thread>
do {
val threadsAfter = currentThreads()
diff = (threadsAfter - threadsBefore).filter { thread ->
ignoreLostThreads.none { prefix -> thread.name.startsWith(prefix) }
}
if (diff.isEmpty()) break
} while (System.currentTimeMillis() <= waitTill)
ignoreLostThreads.clear()
if (diff.isEmpty()) return
val message = "Lost threads ${diff.map { it.name }}"
println("!!! $message")
diff.dumpThreads("Dumping lost thread stack traces")
error(message)
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class BroadcastChannelMultiReceiveStressTest(
}
} catch (e: Exception) {
println("Failed: $e")
pool.dumpThreads("Threads in pool")
receivers.indices.forEach { index ->
println("lastReceived[$index] = ${lastReceived[index].get()}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package kotlinx.coroutines.experimental.guava

import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture
import guide.test.ignoreLostThreads
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.IsEqual
import org.hamcrest.core.IsInstanceOf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package kotlinx.coroutines.experimental.future

import guide.test.ignoreLostThreads
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.IsEqual
import org.junit.Assert.*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.SuspendExecution
import co.paralleluniverse.strands.SuspendableCallable
import co.paralleluniverse.strands.dataflow.Val
import guide.test.ignoreLostThreads
import kotlinx.coroutines.experimental.CompletableDeferred
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.*
import org.junit.Before
import org.junit.Test
import java.util.concurrent.TimeUnit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@

package kotlinx.coroutines.experimental.reactor

import guide.test.ignoreLostThreads
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.reactive.awaitFirst
import kotlinx.coroutines.experimental.reactive.awaitLast
import kotlinx.coroutines.experimental.reactive.awaitSingle
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.yield
import org.hamcrest.core.IsEqual
import org.hamcrest.core.IsInstanceOf
import org.junit.Assert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@

package kotlinx.coroutines.experimental.reactor

import guide.test.ignoreLostThreads
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.run
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.IsEqual
import org.hamcrest.core.IsNot
import org.junit.Assert.assertThat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@

package kotlinx.coroutines.experimental.rx1

import guide.test.ignoreLostThreads
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.run
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.IsEqual
import org.hamcrest.core.IsNot
import org.junit.Assert.assertThat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package kotlinx.coroutines.experimental.rx1

import guide.test.ignoreLostThreads
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.IsEqual
import org.hamcrest.core.IsInstanceOf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package guide.test
import io.reactivex.Scheduler
import io.reactivex.disposables.Disposable
import io.reactivex.plugins.RxJavaPlugins
import kotlinx.coroutines.experimental.ignoreLostThreads
import org.junit.After
import org.junit.Before
import java.util.concurrent.TimeUnit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@

package kotlinx.coroutines.experimental.rx2

import guide.test.ignoreLostThreads
import io.reactivex.Maybe
import io.reactivex.Observable
import io.reactivex.functions.Action
import io.reactivex.internal.functions.Functions.ON_ERROR_MISSING
import io.reactivex.internal.functions.Functions.emptyConsumer
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.yield
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.IsEqual
import org.hamcrest.core.IsInstanceOf
import org.junit.Assert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@

package kotlinx.coroutines.experimental.rx2

import guide.test.ignoreLostThreads
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.run
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.IsEqual
import org.hamcrest.core.IsNot
import org.junit.Assert.assertThat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@

package kotlinx.coroutines.experimental.rx2

import guide.test.ignoreLostThreads
import io.reactivex.Observable
import io.reactivex.Single
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.yield
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.IsEqual
import org.hamcrest.core.IsInstanceOf
import org.junit.Assert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@

package kotlinx.coroutines.experimental.javafx

import guide.test.ignoreLostThreads
import javafx.application.Platform
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.*
import org.junit.Before
import org.junit.Test

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@

package kotlinx.coroutines.experimental.swing

import guide.test.ignoreLostThreads
import kotlinx.coroutines.experimental.TestBase
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.*
import org.junit.Before
import org.junit.Test
import javax.swing.SwingUtilities
Expand Down

0 comments on commit ffc61ae

Please sign in to comment.