-
Notifications
You must be signed in to change notification settings - Fork 7
/
BlockableExecutorService.kt
104 lines (89 loc) · 3.11 KB
/
BlockableExecutorService.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package io.embrace.android.embracesdk.concurrency
import io.embrace.android.embracesdk.annotation.InternalApi
import org.jetbrains.annotations.TestOnly
import java.util.concurrent.AbstractExecutorService
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit
/**
* An [ExecutorService] that can be set to a mode that blocks tasks from executing unless explicitly unblocked. In either mode,
* when they run, they are executed on the current thread so their execution is predictable in tests.
*
* While blocking mode defaults to false, it can be instantiated in in blocking mode using the constructor parameter "blockingMode"
* (and subsequently toggled using the [blockingMode] attribute). So by default, this executor behaves like
* [com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService] unless switched to blocking mode.
*/
@InternalApi
internal class BlockableExecutorService(blockingMode: Boolean = false) : AbstractExecutorService() {
private val tasks = ConcurrentLinkedQueue<Runnable>()
private var shutdown = false
@Volatile
var blockingMode: Boolean = blockingMode
set(value) {
field = value
if (!field) {
runCurrentlyBlocked()
}
}
/**
* Unblock and run all submitted tasks. New submissions will NOT be run until explicitly told to after the current batch of tasks
* are completed.
*/
fun runCurrentlyBlocked() {
rejectIfShutdown()
var taskCount = tasks.size
if (taskCount > 0) {
var task = tasks.poll()
while (task != null && taskCount > 0) {
task.run()
taskCount--
if (taskCount > 0) {
task = tasks.poll()
}
}
}
}
/**
* Unblock and run one (1) submitted task at the head of the queue if it exists
*/
fun runNext() {
rejectIfShutdown()
tasks.poll()?.run()
}
/**
* Return the number of blocked tasks
*/
fun tasksBlockedCount(): Int = tasks.size
@TestOnly
override fun execute(command: Runnable?) {
checkNotNull(command)
rejectIfShutdown()
tasks.add(command)
if (!blockingMode) {
runCurrentlyBlocked()
}
}
override fun shutdown() {
do {
tasks.poll()?.run()
} while (tasks.isNotEmpty())
shutdown = true
}
override fun shutdownNow(): MutableList<Runnable> {
shutdown = true
val remainingTasks = tasks.toMutableList()
tasks.clear()
return remainingTasks
}
override fun isShutdown(): Boolean {
return shutdown
}
override fun isTerminated(): Boolean = shutdown && tasks.isEmpty()
override fun awaitTermination(timeout: Long, unit: TimeUnit?): Boolean = throw UnsupportedOperationException()
private fun rejectIfShutdown() {
if (isShutdown) {
throw RejectedExecutionException()
}
}
}