-
Notifications
You must be signed in to change notification settings - Fork 7
/
LogOrchestrator.kt
95 lines (79 loc) · 3.1 KB
/
LogOrchestrator.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package io.embrace.android.embracesdk.internal.logs
import io.embrace.android.embracesdk.comms.delivery.DeliveryService
import io.embrace.android.embracesdk.internal.clock.Clock
import io.embrace.android.embracesdk.worker.ScheduledWorker
import java.lang.Long.min
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
internal class LogOrchestrator(
private val logOrchestratorScheduledWorker: ScheduledWorker,
private val clock: Clock,
private val sink: LogSink,
private val deliveryService: DeliveryService
) {
@Volatile
private var lastLogTime: AtomicLong = AtomicLong(0)
@Volatile
private var firstLogInBatchTime: AtomicLong = AtomicLong(0)
@Volatile
private var scheduledCheckFuture: ScheduledFuture<*>? = null
init {
sink.callOnLogsStored(::onLogsAdded)
}
private fun onLogsAdded() {
lastLogTime.set(clock.now())
firstLogInBatchTime.compareAndSet(0, lastLogTime.get())
if (!sendLogsIfNeeded()) {
// If [firstLogInBatchTime] was cleared by a concurrent call to [sendLogsIfNeeded]
// then update it to the time of this log
firstLogInBatchTime.compareAndSet(0, lastLogTime.get())
scheduleCheck()
}
}
/**
* Returns true if logs were sent, false otherwise
*/
@Synchronized
private fun sendLogsIfNeeded(): Boolean {
val now = clock.now()
val shouldSendLogs = isMaxLogsPerBatchReached() ||
isMaxInactivityTimeReached(now) ||
isMaxBatchTimeReached(now)
if (!shouldSendLogs) {
return false
}
scheduledCheckFuture?.cancel(false)
scheduledCheckFuture = null
firstLogInBatchTime.set(0)
val storedLogs = sink.flushLogs(MAX_LOGS_PER_BATCH)
if (storedLogs.isNotEmpty()) {
deliveryService.sendLogs(LogPayload(logs = storedLogs))
}
return true
}
private fun scheduleCheck() {
val now = clock.now()
val nextBatchCheck = MAX_BATCH_TIME - (now - firstLogInBatchTime.get())
val nextInactivityCheck = MAX_INACTIVITY_TIME - (now - lastLogTime.get())
scheduledCheckFuture?.cancel(false)
scheduledCheckFuture = logOrchestratorScheduledWorker.schedule<Unit>(
::sendLogsIfNeeded,
min(nextBatchCheck, nextInactivityCheck),
TimeUnit.MILLISECONDS
)
}
private fun isMaxLogsPerBatchReached(): Boolean =
sink.completedLogs().size >= MAX_LOGS_PER_BATCH
private fun isMaxInactivityTimeReached(now: Long): Boolean =
now - lastLogTime.get() > MAX_INACTIVITY_TIME
private fun isMaxBatchTimeReached(now: Long): Boolean {
val firstLogInBatchTime = firstLogInBatchTime.get()
return firstLogInBatchTime != 0L && now - firstLogInBatchTime > MAX_BATCH_TIME
}
companion object {
private const val MAX_LOGS_PER_BATCH = 50
private const val MAX_BATCH_TIME = 5000L // In milliseconds
private const val MAX_INACTIVITY_TIME = 2000L // In milliseconds
}
}