Skip to content

Commit

Permalink
Regroup benchmarks and adapt them to structured concurrency, cleanup …
Browse files Browse the repository at this point in the history
…CoroutineScheduler
  • Loading branch information
qwwdfsad committed Dec 12, 2019
1 parent 7e895fc commit 5202a8b
Show file tree
Hide file tree
Showing 16 changed files with 204 additions and 152 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks

import kotlinx.coroutines.*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package benchmarks

import benchmarks.actors.CORES_COUNT
import benchmarks.akka.CORES_COUNT
import kotlinx.coroutines.*
import kotlinx.coroutines.scheduling.*
import org.openjdk.jmh.annotations.Param
Expand All @@ -22,14 +22,14 @@ abstract class ParametrizedDispatcherBase : CoroutineScope {

abstract var dispatcher: String
override lateinit var coroutineContext: CoroutineContext
var closeable: Closeable? = null
private var closeable: Closeable? = null

@UseExperimental(InternalCoroutinesApi::class)
@Setup
@UseExperimental(InternalCoroutinesApi::class)
open fun setup() {
coroutineContext = when {
dispatcher == "fjp" -> ForkJoinPool.commonPool().asCoroutineDispatcher()
dispatcher == "experimental" -> {
dispatcher == "scheduler" -> {
ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it }
}
dispatcher.startsWith("ftp") -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.actors
package benchmarks.akka

import akka.actor.ActorRef
import akka.actor.ActorSystem
Expand All @@ -13,7 +13,6 @@ import org.openjdk.jmh.annotations.*
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

const val N_MESSAGES = 100_000

Expand All @@ -29,12 +28,12 @@ class Stop
* PingPongAkkaBenchmark.singlePingPong default-dispatcher avgt 10 173.742 ± 41.984 ms/op
* PingPongAkkaBenchmark.singlePingPong single-thread-dispatcher avgt 10 24.181 ± 0.730 ms/op
*/
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
//@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
//@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
//@Fork(value = 2)
//@BenchmarkMode(Mode.AverageTime)
//@OutputTimeUnit(TimeUnit.MILLISECONDS)
//@State(Scope.Benchmark)
open class PingPongAkkaBenchmark {

lateinit var system: ActorSystem
Expand Down Expand Up @@ -62,12 +61,12 @@ open class PingPongAkkaBenchmark {
Await.ready(system.terminate(), Duration.Inf())
}

@Benchmark
// @Benchmark
fun singlePingPong() {
runPingPongs(1)
}

@Benchmark
// @Benchmark
fun coresCountPingPongs() {
runPingPongs(Runtime.getRuntime().availableProcessors())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.actors
package benchmarks.akka

import akka.actor.ActorRef
import akka.actor.ActorSystem
Expand All @@ -14,7 +14,6 @@ import scala.concurrent.Await
import scala.concurrent.duration.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit

const val ROUNDS = 10_000
const val STATE_SIZE = 1024
Expand All @@ -38,12 +37,12 @@ val CORES_COUNT = Runtime.getRuntime().availableProcessors()
* StatefulActorAkkaBenchmark.singleComputationSingleRequestor default-dispatcher avgt 14 39.964 ± 2.343 ms/op
* StatefulActorAkkaBenchmark.singleComputationSingleRequestor single-thread-dispatcher avgt 14 10.214 ± 2.152 ms/op
*/
@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
//@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
//@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
//@Fork(value = 2)
//@BenchmarkMode(Mode.AverageTime)
//@OutputTimeUnit(TimeUnit.MILLISECONDS)
//@State(Scope.Benchmark)
open class StatefulActorAkkaBenchmark {

lateinit var system: ActorSystem
Expand Down Expand Up @@ -72,22 +71,22 @@ open class StatefulActorAkkaBenchmark {
Await.ready(system.terminate(), Duration.Inf())
}

@Benchmark
// @Benchmark
fun singleComputationSingleRequestor() {
run(1, 1)
}

@Benchmark
// @Benchmark
fun singleComputationMultipleRequestors() {
run(1, CORES_COUNT)
}

@Benchmark
// @Benchmark
fun multipleComputationsSingleRequestor() {
run(CORES_COUNT, 1)
}

@Benchmark
// @Benchmark
fun multipleComputationsMultipleRequestors() {
run(CORES_COUNT, CORES_COUNT)
}
Expand Down Expand Up @@ -120,7 +119,8 @@ open class StatefulActorAkkaBenchmark {

private fun createComputationActors(initLatch: CountDownLatch, count: Int): List<ActorRef> {
return (0 until count).map {
system.actorOf(Props.create(ComputationActor::class.java,
system.actorOf(Props.create(
ComputationActor::class.java,
LongArray(STATE_SIZE) { ThreadLocalRandom.current().nextLong(0, 100) }, initLatch)
.withDispatcher("akka.actor.$dispatcher"))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.scheduler

import benchmarks.akka.*
import kotlinx.coroutines.*
import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.annotations.State
import java.lang.Thread.*
import java.util.concurrent.*
import kotlin.concurrent.*
import kotlin.coroutines.*

@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
open class DispatchersContextSwitchBenchmark {
private val nCoroutines = 10000
private val delayTimeMs = 1L
private val nRepeatDelay = 10

private val fjp = ForkJoinPool.commonPool().asCoroutineDispatcher()
private val ftp = Executors.newFixedThreadPool(CORES_COUNT - 1).asCoroutineDispatcher()

@TearDown
fun teardown() {
ftp.close()
(ftp.executor as ExecutorService).awaitTermination(1, TimeUnit.SECONDS)
}

@Benchmark
fun coroutinesIoDispatcher() = runBenchmark(Dispatchers.IO)

@Benchmark
fun coroutinesDefaultDispatcher() = runBenchmark(Dispatchers.Default)

@Benchmark
fun coroutinesFjpDispatcher() = runBenchmark(fjp)

@Benchmark
fun coroutinesFtpDispatcher() = runBenchmark(ftp)

@Benchmark
fun coroutinesBlockingDispatcher() = runBenchmark(EmptyCoroutineContext)

@Benchmark
fun threads() {
val threads = List(nCoroutines) {
thread(start = true) {
repeat(nRepeatDelay) {
sleep(delayTimeMs)
}
}
}
threads.forEach { it.join() }
}

private fun runBenchmark(dispatcher: CoroutineContext) = runBlocking {
repeat(nCoroutines) {
launch(dispatcher) {
repeat(nRepeatDelay) {
delayOrYield()
}
}
}
}

private suspend fun delayOrYield() {
delay(delayTimeMs)
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks
package benchmarks.scheduler

import benchmarks.*
import kotlinx.coroutines.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*
Expand Down Expand Up @@ -44,7 +45,7 @@ open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
}

lateinit var coefficients: LongArray
override var dispatcher: String = "experimental"
override var dispatcher: String = "scheduler"

@Setup
override fun setup() {
Expand Down Expand Up @@ -129,8 +130,18 @@ open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
} else {
pendingCount = 2
// One may fork only once here and executing second task here with looping over firstComplete to be even more efficient
first = RecursiveAction(coefficients, start, start + (end - start) / 2, parent = this).fork()
second = RecursiveAction(coefficients, start + (end - start) / 2, end, parent = this).fork()
first = RecursiveAction(
coefficients,
start,
start + (end - start) / 2,
parent = this
).fork()
second = RecursiveAction(
coefficients,
start + (end - start) / 2,
end,
parent = this
).fork()
}

tryComplete()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks
package benchmarks.scheduler

import benchmarks.*
import kotlinx.coroutines.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*
Expand All @@ -21,7 +22,7 @@ import java.util.concurrent.*
@State(Scope.Benchmark)
open class LaunchBenchmark : ParametrizedDispatcherBase() {

@Param("experimental", "fjp")
@Param("scheduler", "fjp")
override var dispatcher: String = "fjp"

private val jobsToLaunch = 100
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks
package benchmarks.scheduler

import benchmarks.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.openjdk.jmh.annotations.*
Expand Down Expand Up @@ -52,7 +53,7 @@ open class StatefulAsyncBenchmark : ParametrizedDispatcherBase() {
@Param("1", "8", "16")
var jobsCount = 1

@Param("fjp", "ftp_1", "ftp_8")
@Param("fjp", "ftp_1", "dispatcher")
override var dispatcher: String = "fjp"

@Volatile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.actors
package benchmarks.scheduler.actors

import benchmarks.*
import benchmarks.actors.StatefulActorBenchmark.*
import benchmarks.akka.*
import benchmarks.scheduler.actors.StatefulActorBenchmark.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.openjdk.jmh.annotations.*
Expand Down Expand Up @@ -57,18 +58,18 @@ import java.util.concurrent.*
@State(Scope.Benchmark)
open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {

@Param("1024", "8192", "262144")
@Param("1024", "8192")
var stateSize: Int = -1

@Param("fjp", "ftp_1", "ftp_8", "experimental")
@Param("fjp", "scheduler")
override var dispatcher: String = "fjp"

@Benchmark
fun multipleComputationsUnfair() = runBlocking {
val resultChannel: Channel<Unit> = Channel(1)
val computations = (0 until CORES_COUNT).map { computationActor(stateSize) }
val requestor = requestorActorUnfair(computations, resultChannel)
requestor.send(Letter(Start(), Channel(0)))
requestor.send(Letter(Start(), requestor))
resultChannel.receive()
}

Expand All @@ -77,7 +78,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
val resultChannel: Channel<Unit> = Channel(1)
val computations = (0 until CORES_COUNT).map { computationActor(stateSize) }
val requestor = requestorActorFair(computations, resultChannel)
requestor.send(Letter(Start(), Channel(0)))
requestor.send(Letter(Start(), requestor))
resultChannel.receive()
}

Expand All @@ -95,6 +96,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
}
is Long -> {
if (++received >= ROUNDS * 8) {
computations.forEach { it.close() }
stopChannel.send(Unit)
return@actor
} else {
Expand Down Expand Up @@ -122,6 +124,7 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
}
is Long -> {
if (++receivedTotal >= ROUNDS * computations.size) {
computations.forEach { it.close() }
stopChannel.send(Unit)
return@actor
} else {
Expand All @@ -136,4 +139,4 @@ open class ConcurrentStatefulActorBenchmark : ParametrizedDispatcherBase() {
}
}
}
}
}
Loading

0 comments on commit 5202a8b

Please sign in to comment.