From 6be2a293f4a31d147652af3c9b7edb6501765c24 Mon Sep 17 00:00:00 2001 From: Lucas Date: Mon, 11 Mar 2024 13:06:48 -0300 Subject: [PATCH 1/2] Fix race condition on LogSink and LogOrchestrator. --- .../internal/logs/LogOrchestrator.kt | 40 +++++++++++-------- .../embracesdk/internal/logs/LogSink.kt | 3 +- .../embracesdk/internal/logs/LogSinkImpl.kt | 23 ++++++----- .../internal/logs/LogOrchestratorTest.kt | 32 +++++++++++++++ 4 files changed, 69 insertions(+), 29 deletions(-) diff --git a/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogOrchestrator.kt b/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogOrchestrator.kt index ac162e97f..9eece6706 100644 --- a/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogOrchestrator.kt +++ b/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogOrchestrator.kt @@ -6,6 +6,7 @@ import io.embrace.android.embracesdk.worker.ScheduledWorker import java.lang.Long.min import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong internal class LogOrchestrator( private val logOrchestratorScheduledWorker: ScheduledWorker, @@ -14,10 +15,10 @@ internal class LogOrchestrator( private val deliveryService: DeliveryService ) { @Volatile - private var lastLogTime: Long = 0 + private var lastLogTime: AtomicLong = AtomicLong(0) @Volatile - private var firstLogInBatchTime: Long = 0 + private var firstLogInBatchTime: AtomicLong = AtomicLong(0) @Volatile private var scheduledCheckFuture: ScheduledFuture<*>? = null @@ -27,16 +28,12 @@ internal class LogOrchestrator( } private fun onLogsAdded() { - lastLogTime = clock.now() - if (firstLogInBatchTime == 0L) { - firstLogInBatchTime = lastLogTime - } + lastLogTime.set(clock.now()) + firstLogInBatchTime.compareAndSet(0, lastLogTime.get()) if (!sendLogsIfNeeded()) { // If [firstLogInBatchTime] was cleared by a concurrent call to [sendLogsIfNeeded] // then update it to the time of this log - if (firstLogInBatchTime == 0L) { - firstLogInBatchTime = lastLogTime - } + firstLogInBatchTime.compareAndSet(0, lastLogTime.get()) scheduleCheck() } } @@ -47,20 +44,19 @@ internal class LogOrchestrator( @Synchronized private fun sendLogsIfNeeded(): Boolean { val now = clock.now() - val shouldSendLogs = sink.completedLogs().size >= MAX_LOGS_PER_BATCH || - now - lastLogTime > MAX_INACTIVITY_TIME || - (firstLogInBatchTime != 0L && now - firstLogInBatchTime > MAX_BATCH_TIME) + val shouldSendLogs = isMaxLogsPerBatchReached() || + isMaxInactivityTimeReached(now) || + isMaxBatchTimeReached(now) if (!shouldSendLogs) { - // None of the conditions to send the logs is met return false } scheduledCheckFuture?.cancel(false) scheduledCheckFuture = null - firstLogInBatchTime = 0 + firstLogInBatchTime.set(0) - val storedLogs = sink.flushLogs() + val storedLogs = sink.flushLogs(MAX_LOGS_PER_BATCH) if (storedLogs.isNotEmpty()) { deliveryService.sendLogs(LogPayload(logs = storedLogs)) @@ -71,8 +67,8 @@ internal class LogOrchestrator( private fun scheduleCheck() { val now = clock.now() - val nextBatchCheck = MAX_BATCH_TIME - (now - firstLogInBatchTime) - val nextInactivityCheck = MAX_INACTIVITY_TIME - (now - lastLogTime) + val nextBatchCheck = MAX_BATCH_TIME - (now - firstLogInBatchTime.get()) + val nextInactivityCheck = MAX_INACTIVITY_TIME - (now - lastLogTime.get()) scheduledCheckFuture?.cancel(false) scheduledCheckFuture = logOrchestratorScheduledWorker.schedule( ::sendLogsIfNeeded, @@ -81,6 +77,16 @@ internal class LogOrchestrator( ) } + private fun isMaxLogsPerBatchReached(): Boolean = + sink.completedLogs().size >= MAX_LOGS_PER_BATCH + + private fun isMaxInactivityTimeReached(now: Long): Boolean = + now - lastLogTime.get() > MAX_INACTIVITY_TIME + + private fun isMaxBatchTimeReached(now: Long): Boolean { + val firstLogInBatchTime = firstLogInBatchTime.get() + return firstLogInBatchTime != 0L && now - firstLogInBatchTime > MAX_BATCH_TIME + } companion object { private const val MAX_LOGS_PER_BATCH = 50 private const val MAX_BATCH_TIME = 5000L // In milliseconds diff --git a/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogSink.kt b/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogSink.kt index a6c6b7f1a..1b2f495c5 100644 --- a/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogSink.kt +++ b/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogSink.kt @@ -22,8 +22,9 @@ internal interface LogSink { /** * Returns and clears the currently stored Logs. Implementations of this method must make sure the clearing and returning is * atomic, i.e. logs cannot be added during this operation. + * @param max The maximum number of logs to flush. If null, all logs are flushed. */ - fun flushLogs(): List + fun flushLogs(max: Int? = null): List /** * Registers a callback to be called when logs are stored. diff --git a/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogSinkImpl.kt b/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogSinkImpl.kt index 4ba56f789..7ff29f644 100644 --- a/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogSinkImpl.kt +++ b/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogSinkImpl.kt @@ -2,16 +2,16 @@ package io.embrace.android.embracesdk.internal.logs import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.logs.data.LogRecordData +import java.util.concurrent.ConcurrentLinkedQueue internal class LogSinkImpl : LogSink { - private val storedLogs: MutableList = mutableListOf() + private val storedLogs: ConcurrentLinkedQueue = ConcurrentLinkedQueue() private var onLogsStored: (() -> Unit)? = null + private val flushLock = Any() override fun storeLogs(logs: List): CompletableResultCode { try { - synchronized(storedLogs) { - storedLogs += logs.map { EmbraceLogRecordData(logRecordData = it) } - } + storedLogs.addAll(logs.map { EmbraceLogRecordData(logRecordData = it) }) onLogsStored?.invoke() } catch (t: Throwable) { return CompletableResultCode.ofFailure() @@ -21,15 +21,16 @@ internal class LogSinkImpl : LogSink { } override fun completedLogs(): List { - synchronized(storedLogs) { - return storedLogs.toList() - } + return storedLogs.toList() } - override fun flushLogs(): List { - synchronized(storedLogs) { - val flushedLogs = storedLogs.toList() - storedLogs.clear() + override fun flushLogs(max: Int?): List { + synchronized(flushLock) { + val maxIndex = max?.let { + minOf(storedLogs.size, it) + } ?: storedLogs.size + val flushedLogs = storedLogs.toList().subList(0, maxIndex) + storedLogs.removeAll(flushedLogs.toSet()) return flushedLogs } } diff --git a/embrace-android-sdk/src/test/java/io/embrace/android/embracesdk/internal/logs/LogOrchestratorTest.kt b/embrace-android-sdk/src/test/java/io/embrace/android/embracesdk/internal/logs/LogOrchestratorTest.kt index 79038077d..80c103203 100644 --- a/embrace-android-sdk/src/test/java/io/embrace/android/embracesdk/internal/logs/LogOrchestratorTest.kt +++ b/embrace-android-sdk/src/test/java/io/embrace/android/embracesdk/internal/logs/LogOrchestratorTest.kt @@ -2,6 +2,7 @@ package io.embrace.android.embracesdk.internal.logs import io.embrace.android.embracesdk.FakeDeliveryService import io.embrace.android.embracesdk.concurrency.BlockingScheduledExecutorService +import io.embrace.android.embracesdk.concurrency.SingleThreadTestScheduledExecutor import io.embrace.android.embracesdk.fakes.FakeClock import io.embrace.android.embracesdk.fakes.FakeLogRecordData import io.embrace.android.embracesdk.worker.ScheduledWorker @@ -12,6 +13,9 @@ import org.junit.Assert.assertNotNull import org.junit.Assert.assertTrue import org.junit.Before import org.junit.Test +import java.util.concurrent.CountDownLatch +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit internal class LogOrchestratorTest { @@ -89,6 +93,34 @@ internal class LogOrchestratorTest { verifyPayload(4) } + @Test + fun `simulate race condition`() { + val fakeLog = FakeLogRecordData() + val fakeLogs = mutableListOf() + val threads = mutableListOf() + val latch = CountDownLatch(49) + repeat(49) { + fakeLogs.add(fakeLog) + threads.add(SingleThreadTestScheduledExecutor()) + } + logSink.storeLogs(fakeLogs) + threads.forEach { thread -> + thread.schedule( + { + logSink.storeLogs(listOf(fakeLog)) + latch.countDown() + }, + 10L, + TimeUnit.MILLISECONDS + ) + } + + latch.await(1000L, TimeUnit.MILLISECONDS) + + assertEquals("Too many payloads sent", 1, deliveryService.lastSentLogPayloads.size) + assertEquals("Too many logs in payload", 50, deliveryService.lastSentLogPayloads[0].logs.size) + } + private fun verifyPayload(numberOfLogs: Int) { assertNotNull(deliveryService.lastSentLogPayloads) assertEquals(1, deliveryService.lastSentLogPayloads.size) From 9bd44a7060a017386a24a6ab60f202887d7dd1e6 Mon Sep 17 00:00:00 2001 From: Lucas Date: Mon, 11 Mar 2024 13:45:22 -0300 Subject: [PATCH 2/2] Improve method call to take a sublist of logs. --- .../io/embrace/android/embracesdk/internal/logs/LogSinkImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogSinkImpl.kt b/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogSinkImpl.kt index 7ff29f644..dac133daa 100644 --- a/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogSinkImpl.kt +++ b/embrace-android-sdk/src/main/java/io/embrace/android/embracesdk/internal/logs/LogSinkImpl.kt @@ -29,7 +29,7 @@ internal class LogSinkImpl : LogSink { val maxIndex = max?.let { minOf(storedLogs.size, it) } ?: storedLogs.size - val flushedLogs = storedLogs.toList().subList(0, maxIndex) + val flushedLogs = storedLogs.take(maxIndex) storedLogs.removeAll(flushedLogs.toSet()) return flushedLogs }