-
Notifications
You must be signed in to change notification settings - Fork 8
/
EmbraceDeliveryService.kt
181 lines (162 loc) · 7.48 KB
/
EmbraceDeliveryService.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package io.embrace.android.embracesdk.comms.delivery
import io.embrace.android.embracesdk.comms.api.ApiService
import io.embrace.android.embracesdk.internal.compression.ConditionalGzipOutputStream
import io.embrace.android.embracesdk.internal.payload.Envelope
import io.embrace.android.embracesdk.internal.payload.LogPayload
import io.embrace.android.embracesdk.internal.payload.toFailedSpan
import io.embrace.android.embracesdk.internal.serialization.PlatformSerializer
import io.embrace.android.embracesdk.internal.utils.Provider
import io.embrace.android.embracesdk.logging.EmbLogger
import io.embrace.android.embracesdk.ndk.NativeCrashService
import io.embrace.android.embracesdk.payload.EventMessage
import io.embrace.android.embracesdk.payload.NativeCrashData
import io.embrace.android.embracesdk.payload.NetworkEvent
import io.embrace.android.embracesdk.payload.SessionMessage
import io.embrace.android.embracesdk.payload.isV2Payload
import io.embrace.android.embracesdk.session.id.SessionIdTracker
import io.embrace.android.embracesdk.session.orchestrator.SessionSnapshotType
import io.embrace.android.embracesdk.worker.BackgroundWorker
import io.embrace.android.embracesdk.worker.TaskPriority
import java.util.concurrent.TimeUnit
internal class EmbraceDeliveryService(
private val cacheManager: DeliveryCacheManager,
private val apiService: ApiService,
private val backgroundWorker: BackgroundWorker,
private val serializer: PlatformSerializer,
private val logger: EmbLogger
) : DeliveryService {
companion object {
private const val SEND_SESSION_TIMEOUT = 1L
private const val CRASH_TIMEOUT = 1L // Seconds to wait before timing out when sending a crash
}
/**
* Caches a generated session message, with performance information generated up to the current
* point.
*/
override fun sendSession(sessionMessage: SessionMessage, snapshotType: SessionSnapshotType) {
cacheManager.saveSession(sessionMessage, snapshotType)
if (snapshotType == SessionSnapshotType.PERIODIC_CACHE) {
return
}
try {
val sessionId = sessionMessage.session.sessionId
val action = cacheManager.loadSessionAsAction(sessionId) ?: { stream ->
// fallback if initial caching failed for whatever reason, so we don't drop
// the data
ConditionalGzipOutputStream(stream).use {
serializer.toJson(sessionMessage, SessionMessage::class.java, it)
}
}
val future = apiService.sendSession(sessionMessage.isV2Payload(), action) { successful ->
if (!successful) {
val message =
"Session deleted without request being sent: ID $sessionId, timestamp ${sessionMessage.session.startTime}"
logger.logWarning(message, SessionPurgeException(message))
}
cacheManager.deleteSession(sessionId)
}
if (snapshotType == SessionSnapshotType.JVM_CRASH) {
future?.get(SEND_SESSION_TIMEOUT, TimeUnit.SECONDS)
}
} catch (ex: Exception) {
logger.logInfo(
"Failed to send session end message. Embrace will store the " +
"session message and attempt to deliver it at a future date."
)
}
}
override fun sendLog(eventMessage: EventMessage) {
apiService.sendLog(eventMessage)
}
override fun sendLogs(logEnvelope: Envelope<LogPayload>) {
apiService.sendLogEnvelope(logEnvelope)
}
override fun saveLogs(logEnvelope: Envelope<LogPayload>) {
apiService.saveLogEnvelope(logEnvelope)
}
override fun sendNetworkCall(networkEvent: NetworkEvent) {
apiService.sendNetworkCall(networkEvent)
}
override fun sendCrash(crash: EventMessage, processTerminating: Boolean) {
runCatching {
cacheManager.saveCrash(crash)
val future = apiService.sendCrash(crash)
if (processTerminating) {
future.get(CRASH_TIMEOUT, TimeUnit.SECONDS)
}
}
}
override fun sendCachedSessions(
nativeCrashServiceProvider: Provider<NativeCrashService?>,
sessionIdTracker: SessionIdTracker
) {
sendCachedCrash()
backgroundWorker.submit(TaskPriority.HIGH) {
val allSessions = cacheManager.getAllCachedSessionIds().filter {
it.sessionId != sessionIdTracker.getActiveSessionId()
}
allSessions.map { it.sessionId }.forEach { sessionId ->
cacheManager.transformSession(sessionId = sessionId) { sessionMessage ->
val completedSpanIds = sessionMessage.spans?.map { it.spanId }?.toSet() ?: emptySet()
val spansToFail = sessionMessage.spanSnapshots
?.filterNot { completedSpanIds.contains(it.spanId) }
?.map { it.toFailedSpan(sessionMessage.session.endTime ?: 0L) }
?: emptyList()
val completedSpans = (sessionMessage.spans ?: emptyList()) + spansToFail
sessionMessage.copy(spans = completedSpans, spanSnapshots = emptyList())
}
}
nativeCrashServiceProvider()?.let { service ->
val nativeCrashData = service.getAndSendNativeCrash()
if (nativeCrashData != null) {
addCrashDataToCachedSession(nativeCrashData)
}
}
sendCachedSessions(allSessions)
}
}
private fun sendCachedCrash() {
val crash = cacheManager.loadCrash()
crash?.let {
apiService.sendCrash(it)
}
}
private fun addCrashDataToCachedSession(nativeCrashData: NativeCrashData) {
cacheManager.transformSession(nativeCrashData.sessionId) { sessionMessage ->
attachCrashToSession(nativeCrashData, sessionMessage)
}
}
private fun attachCrashToSession(
nativeCrashData: NativeCrashData,
sessionMessage: SessionMessage
): SessionMessage {
val session = sessionMessage.session.copy(crashReportId = nativeCrashData.nativeCrashId)
return sessionMessage.copy(session = session)
}
private fun sendCachedSessions(cachedSessions: List<CachedSession>) {
cachedSessions.forEach { cachedSession ->
try {
val sessionId = cachedSession.sessionId
val action = cacheManager.loadSessionAsAction(sessionId)
if (action != null) {
// temporarily assume all sessions are v1. Future changeset
// will encode this information in the filename.
apiService.sendSession(false, action) { successful ->
if (!successful) {
val message = "Cached session deleted without request being sent. File name: ${cachedSession.filename}"
logger.logWarning(message, SessionPurgeException(message))
}
cacheManager.deleteSession(sessionId)
}
} else {
logger.logError("Session $sessionId not found")
}
} catch (ex: Throwable) {
logger.logError("Could not send cached session", ex)
}
}
}
override fun sendMoment(eventMessage: EventMessage) {
apiService.sendEvent(eventMessage)
}
}