Skip to content

Commit

Permalink
Flush and save log requests on crash (#728)
Browse files Browse the repository at this point in the history
* Added log orchestrator to crash service. Extracted interface and moved implementation to LogOrchestratorImpl

* Allow the delivery service to store a log envelope as pending request instead of trying to send it.
  • Loading branch information
leandro-godon committed Apr 16, 2024
1 parent 0d7b1cc commit 59bd399
Show file tree
Hide file tree
Showing 28 changed files with 277 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.embrace.android.embracesdk.gating.GatingService
import io.embrace.android.embracesdk.internal.ApkToolsConfig
import io.embrace.android.embracesdk.internal.clock.Clock
import io.embrace.android.embracesdk.internal.crash.CrashFileMarker
import io.embrace.android.embracesdk.internal.logs.LogOrchestrator
import io.embrace.android.embracesdk.internal.utils.Uuid.getEmbUuid
import io.embrace.android.embracesdk.logging.InternalEmbraceLogger
import io.embrace.android.embracesdk.ndk.NdkService
Expand All @@ -29,6 +30,7 @@ import io.embrace.android.embracesdk.session.properties.SessionPropertiesService
*/
internal class EmbraceCrashService(
configService: ConfigService,
private val logOrchestrator: LogOrchestrator,
private val sessionOrchestrator: SessionOrchestrator,
private val sessionPropertiesService: SessionPropertiesService,
private val metadataService: MetadataService,
Expand Down Expand Up @@ -122,6 +124,9 @@ internal class EmbraceCrashService(
// attempt to send the crash and if it fails, we will send it again on the next launch.
deliveryService.sendCrash(crashEvent, true)

// Attempt to send any logs that are still waiting in the sink
logOrchestrator.flush(true)

// End, cache and send the session
sessionOrchestrator.endSessionWithCrash(crash.crashId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ internal interface ApiService {
*/
fun sendLogEnvelope(logEnvelope: Envelope<LogPayload>)

/**
* Saves a list of OTel Logs to disk to be sent on restart.
*
* @param logEnvelope containing the logs
*/
fun saveLogEnvelope(logEnvelope: Envelope<LogPayload>)

/**
* Sends an Application Exit Info (AEI) blob message to the API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ internal class EmbraceApiService(
post(logEnvelope, mapper::logsEnvelopeRequest, parameterizedType)
}

override fun saveLogEnvelope(logEnvelope: Envelope<LogPayload>) {
val parameterizedType = Types.newParameterizedType(Envelope::class.java, LogPayload::class.java)
val request: ApiRequest = mapper.logsEnvelopeRequest(logEnvelope)
val action: SerializationAction = { stream ->
ConditionalGzipOutputStream(stream).use {
serializer.toJson(logEnvelope, parameterizedType, it)
}
}
pendingApiCallsSender.savePendingApiCall(request, action, sync = true)
}

override fun sendAEIBlob(blobMessage: BlobMessage) {
post(blobMessage, mapper::aeiBlobRequest)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ internal interface DeliveryCacheManager {
fun savePayload(action: SerializationAction): String
fun loadPayloadAsAction(name: String): SerializationAction
fun deletePayload(name: String)
fun savePendingApiCalls(pendingApiCalls: PendingApiCalls)
fun savePendingApiCalls(pendingApiCalls: PendingApiCalls, sync: Boolean = false)
fun loadPendingApiCalls(): PendingApiCalls
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ internal interface DeliveryService {
fun sendCachedSessions(ndkService: NdkService?, sessionIdTracker: SessionIdTracker)
fun sendLog(eventMessage: EventMessage)
fun sendLogs(logEnvelope: Envelope<LogPayload>)
fun saveLogs(logEnvelope: Envelope<LogPayload>)
fun sendNetworkCall(networkEvent: NetworkEvent)
fun sendCrash(crash: EventMessage, processTerminating: Boolean)
fun sendMoment(eventMessage: EventMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,17 @@ internal class EmbraceDeliveryCacheManager(
/**
* Saves the [PendingApiCalls] map to a file named [PENDING_API_CALLS_FILE_NAME].
*/
override fun savePendingApiCalls(pendingApiCalls: PendingApiCalls) {
backgroundWorker.submit {
override fun savePendingApiCalls(pendingApiCalls: PendingApiCalls, sync: Boolean) {
if (sync) {
cacheService.cacheObject(PENDING_API_CALLS_FILE_NAME, pendingApiCalls, PendingApiCalls::class.java)
} else {
backgroundWorker.submit {
cacheService.cacheObject(
PENDING_API_CALLS_FILE_NAME,
pendingApiCalls,
PendingApiCalls::class.java
)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ internal class EmbraceDeliveryService(
apiService.sendLogEnvelope(logEnvelope)
}

override fun saveLogs(logEnvelope: Envelope<LogPayload>) {
apiService.saveLogEnvelope(logEnvelope)
}

override fun sendNetworkCall(networkEvent: NetworkEvent) {
apiService.sendNetworkCall(networkEvent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ internal class EmbracePendingApiCallsSender(
this.sendMethod = sendMethod
}

override fun savePendingApiCall(request: ApiRequest, action: SerializationAction) {
override fun savePendingApiCall(request: ApiRequest, action: SerializationAction, sync: Boolean) {
// Save the payload to disk.
val cachedPayloadName = cacheManager.savePayload(action)

// Save the pending api calls to disk.
val pendingApiCall = PendingApiCall(request, cachedPayloadName, clock.now())
pendingApiCalls.add(pendingApiCall)
cacheManager.savePendingApiCalls(pendingApiCalls)
cacheManager.savePendingApiCalls(pendingApiCalls, sync)
}

override fun scheduleRetry(response: ApiResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal interface PendingApiCallsSender {
/**
* Save an API call to be sent later.
*/
fun savePendingApiCall(request: ApiRequest, action: SerializationAction)
fun savePendingApiCall(request: ApiRequest, action: SerializationAction, sync: Boolean = false)

/**
* Schedules the retry of all pending API calls.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ internal class CrashModuleImpl(
sessionModule: SessionModule,
anrModule: AnrModule,
dataContainerModule: DataContainerModule,
androidServicesModule: AndroidServicesModule
androidServicesModule: AndroidServicesModule,
logModule: CustomerLogModule
) : CrashModule {

private val crashMarker: CrashFileMarker by singleton {
Expand All @@ -42,6 +43,7 @@ internal class CrashModuleImpl(
override val crashService: CrashService by singleton {
EmbraceCrashService(
essentialServiceModule.configService,
logModule.logOrchestrator,
sessionModule.sessionOrchestrator,
sessionModule.sessionPropertiesService,
essentialServiceModule.metadataService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.embrace.android.embracesdk.event.LogMessageService
import io.embrace.android.embracesdk.internal.logs.CompositeLogService
import io.embrace.android.embracesdk.internal.logs.EmbraceLogService
import io.embrace.android.embracesdk.internal.logs.LogOrchestrator
import io.embrace.android.embracesdk.internal.logs.LogOrchestratorImpl
import io.embrace.android.embracesdk.internal.logs.LogService
import io.embrace.android.embracesdk.network.logging.EmbraceNetworkCaptureService
import io.embrace.android.embracesdk.network.logging.EmbraceNetworkLoggingService
Expand Down Expand Up @@ -94,7 +95,7 @@ internal class CustomerLogModuleImpl(
}

override val logOrchestrator: LogOrchestrator by singleton {
LogOrchestrator(
LogOrchestratorImpl(
workerThreadModule.scheduledWorker(WorkerName.REMOTE_LOGGING),
initModule.clock,
openTelemetryModule.logSink,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,8 @@ internal class ModuleInitBootstrapper(
sessionModule,
anrModule,
dataContainerModule,
androidServicesModule
androidServicesModule,
customerLogModule
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,21 @@ import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

internal class LogOrchestrator(
internal interface LogOrchestrator {

/**
* Flushes immediately any log still in the sink
*/
fun flush(saveOnly: Boolean)
}

internal class LogOrchestratorImpl(
private val logOrchestratorScheduledWorker: ScheduledWorker,
private val clock: Clock,
private val sink: LogSink,
private val deliveryService: DeliveryService,
private val logEnvelopeSource: LogEnvelopeSource,
) {
) : LogOrchestrator {
@Volatile
private var lastLogTime: AtomicLong = AtomicLong(0)

Expand All @@ -29,6 +37,21 @@ internal class LogOrchestrator(
sink.callOnLogsStored(::onLogsAdded)
}

override fun flush(saveOnly: Boolean) {
scheduledCheckFuture?.cancel(false)
scheduledCheckFuture = null
firstLogInBatchTime.set(0)

val envelope = logEnvelopeSource.getEnvelope()
if (!envelope.data.logs.isNullOrEmpty()) {
if (saveOnly) {
deliveryService.saveLogs(envelope)
} else {
deliveryService.sendLogs(envelope)
}
}
}

private fun onLogsAdded() {
lastLogTime.set(clock.now())
firstLogInBatchTime.compareAndSet(0, lastLogTime.get())
Expand All @@ -53,21 +76,10 @@ internal class LogOrchestrator(
if (!shouldSendLogs) {
return false
}
flush()
flush(false)
return true
}

fun flush() {
scheduledCheckFuture?.cancel(false)
scheduledCheckFuture = null
firstLogInBatchTime.set(0)

val envelope = logEnvelopeSource.getEnvelope()
if (!envelope.data.logs.isNullOrEmpty()) {
deliveryService.sendLogs(envelope)
}
}

private fun scheduleCheck() {
val now = clock.now()
val nextBatchCheck = MAX_BATCH_TIME - (now - firstLogInBatchTime.get())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.embrace.android.embracesdk.internal.logs

import io.embrace.android.embracesdk.internal.logs.LogOrchestrator.Companion.MAX_LOGS_PER_BATCH
import io.embrace.android.embracesdk.internal.logs.LogOrchestratorImpl.Companion.MAX_LOGS_PER_BATCH
import io.embrace.android.embracesdk.internal.payload.Log
import io.embrace.android.embracesdk.internal.payload.toNewPayload
import io.embrace.android.embracesdk.utils.threadSafeTake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ internal typealias CrashModuleSupplier = (
sessionModule: SessionModule,
anrModule: AnrModule,
dataContainerModule: DataContainerModule,
androidServicesModule: AndroidServicesModule
androidServicesModule: AndroidServicesModule,
logModule: CustomerLogModule
) -> CrashModule

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.embrace.android.embracesdk.fakes.FakeAnrService
import io.embrace.android.embracesdk.fakes.FakeClock
import io.embrace.android.embracesdk.fakes.FakeConfigService
import io.embrace.android.embracesdk.fakes.FakeEventService
import io.embrace.android.embracesdk.fakes.FakeLogOrchestrator
import io.embrace.android.embracesdk.fakes.FakeMetadataService
import io.embrace.android.embracesdk.fakes.FakePreferenceService
import io.embrace.android.embracesdk.fakes.FakeSessionIdTracker
Expand All @@ -34,12 +35,14 @@ import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertSame
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Test

internal class EmbraceCrashServiceTest {

private lateinit var embraceCrashService: EmbraceCrashService
private lateinit var logOrchestrator: FakeLogOrchestrator
private lateinit var sessionOrchestrator: FakeSessionOrchestrator
private lateinit var sessionPropertiesService: SessionPropertiesService
private lateinit var metadataService: FakeMetadataService
Expand All @@ -64,6 +67,7 @@ internal class EmbraceCrashServiceTest {
mockkStatic(Crash::class)
mockkObject(CrashFactory)

logOrchestrator = FakeLogOrchestrator()
sessionOrchestrator = FakeSessionOrchestrator()
sessionPropertiesService = FakeSessionPropertiesService()
metadataService = FakeMetadataService()
Expand Down Expand Up @@ -97,6 +101,7 @@ internal class EmbraceCrashServiceTest {

embraceCrashService = EmbraceCrashService(
configService,
logOrchestrator,
sessionOrchestrator,
sessionPropertiesService,
metadataService,
Expand All @@ -116,6 +121,19 @@ internal class EmbraceCrashServiceTest {
metadataService.setAppForeground()
}

@Test
fun `test SessionOrchestrator and LogOrchestrator are called when handleCrash is called`() {
crash = CrashFactory.ofThrowable(logger, testException, null, 1)
setupForHandleCrash(false)

embraceCrashService.handleCrash(Thread.currentThread(), testException)

assertEquals(1, anrService.forceAnrTrackingStopOnCrashCount)
assertNotNull(deliveryService.lastSentCrash)
assertTrue(logOrchestrator.flushCalled)
assertNotNull(sessionOrchestrator.crashId)
}

@Test
fun `test ApiClient and SessionService are called when handleCrash is called with JSException`() {
setupForHandleCrash(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import io.embrace.android.embracesdk.worker.BackgroundWorker
import io.mockk.clearAllMocks
import io.mockk.every
import io.mockk.spyk
import io.mockk.verify
import org.junit.After
import org.junit.Assert.assertArrayEquals
import org.junit.Assert.assertEquals
Expand Down Expand Up @@ -296,6 +297,64 @@ internal class EmbraceDeliveryCacheManagerTest {
assertNull(cachedCalls.pollNextPendingApiCall())
}

@Test
fun `save pending api calls with sync on runs on the main thread`() {
val spyWorker = spyk(worker)
deliveryCacheManager = EmbraceDeliveryCacheManager(
cacheService,
spyWorker,
logger
)

val pendingApiCalls = PendingApiCalls()
val request1 = ApiRequest(
url = EmbraceUrl.create("https://test.url/sessions"),
httpMethod = HttpMethod.POST,
appId = "test_app_id_1",
deviceId = "test_device_id",
eventId = "request_1",
contentEncoding = "gzip"
)
val pendingApiCall1 = PendingApiCall(request1, "payload_1.json", fakeClock.now())
pendingApiCalls.add(pendingApiCall1)

val request2 = ApiRequest(
url = EmbraceUrl.create("https://test.url/events"),
httpMethod = HttpMethod.POST,
appId = "test_app_id",
deviceId = "test_device_id",
eventId = "request_2",
contentEncoding = "gzip"
)
fakeClock.tickSecond()
val pendingApiCall2 = PendingApiCall(request2, "payload_2.json", fakeClock.now())
pendingApiCalls.add(pendingApiCall2)

val request3 = ApiRequest(
url = EmbraceUrl.create("https://test.url/logging"),
httpMethod = HttpMethod.POST,
appId = "test_app_id",
deviceId = "test_device_id",
eventId = "request_3",
contentEncoding = "gzip"
)
fakeClock.tickSecond()
val pendingApiCall3 = PendingApiCall(request3, "payload_3.json", fakeClock.now())
pendingApiCalls.add(pendingApiCall3)

deliveryCacheManager.savePendingApiCalls(pendingApiCalls, true)

// Verify that the caching was not done in a background worker
verify(exactly = 0) { spyWorker.submit(any(), any()) }

val cachedCalls = deliveryCacheManager.loadPendingApiCalls()

assertEquals(pendingApiCall1, cachedCalls.pollNextPendingApiCall())
assertEquals(pendingApiCall2, cachedCalls.pollNextPendingApiCall())
assertEquals(pendingApiCall3, cachedCalls.pollNextPendingApiCall())
assertNull(cachedCalls.pollNextPendingApiCall())
}

/**
* The current version is storing [PendingApiCalls] in a file, but previous versions
* were storing a list of [PendingApiCall]. This test checks that the current
Expand Down
Loading

0 comments on commit 59bd399

Please sign in to comment.