forked from lichess-org/lila
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Duct.scala
95 lines (75 loc) · 2.46 KB
/
Duct.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package lila.hub
import scala.collection.immutable.Queue
import scala.concurrent.duration._
import scala.concurrent.Promise
import scala.concurrent.stm._
import lila.base.LilaException
/*
* Sequential like an actor, but for async functions,
* and using an STM backend instead of akka actor.
*/
trait Duct {
// implement async behaviour here
protected val process: Duct.ReceiveAsync
def !(msg: Any): Unit = atomic { implicit txn =>
stateRef.transform {
_.map(_ enqueue msg) orElse {
Txn.afterCommit { _ => run(msg) }
Duct.emptyQueue
}
}
}
def queueSize = stateRef.single().??(_.size)
/*
* Idle: None
* Busy: Some(Queue.empty)
* Busy with backlog: Some(Queue.nonEmpty)
*/
private[this] val stateRef: Ref[Option[Queue[Any]]] = Ref(None)
private[this] def run(msg: Any): Unit =
process.applyOrElse(msg, Duct.fallback) onComplete { _ => postRun }
private[this] def postRun = atomic { implicit txn =>
stateRef.transform {
_.flatMap(_.dequeueOption) map {
case (msg, newQueue) =>
Txn.afterCommit { _ => run(msg) }
newQueue
}
}
}
}
object Duct {
type ReceiveAsync = PartialFunction[Any, Fu[Any]]
private val emptyQueue: Option[Queue[Any]] = Some(Queue.empty)
private val fallback = { msg: Any =>
lila.log("Duct").warn(s"unhandled msg: $msg")
funit
}
case class Timeout(duration: FiniteDuration) extends lila.base.LilaException {
val message = s"FutureSequencer timed out after $duration"
}
/* Convenience functions to build upon Ducts */
object extra {
case class LazyFu[A](f: () => Fu[A]) {
def apply(timeout: Option[FiniteDuration])(implicit system: akka.actor.ActorSystem) =
timeout.foldLeft(f()) { (fu, dur) =>
fu.withTimeout(
duration = dur,
error = Timeout(dur)
)
}
}
def lazyFu = new Duct {
val process: Duct.ReceiveAsync = { case LazyFu(f) => f() }
}
def lazyFu(timeout: FiniteDuration)(implicit system: akka.actor.ActorSystem) = new Duct {
val process: Duct.ReceiveAsync = { case lf: LazyFu[_] => lf(timeout.some) }
}
case class LazyPromise[A](f: LazyFu[A], promise: Promise[A])
def lazyPromise(timeout: Option[FiniteDuration])(implicit system: akka.actor.ActorSystem) = new Duct {
val process: Duct.ReceiveAsync = {
case LazyPromise(lf, promise) => promise.completeWith { lf(timeout)(system) }.future
}
}
}
}