Skip to content

Commit

Permalink
Merge pull request #544 from embrace-io/lucas/fix_race_condition_logs
Browse files Browse the repository at this point in the history
[EMBR-2802] Fix race condition on LogSink and LogOrchestrator.
  • Loading branch information
lucaslabari committed Mar 11, 2024
2 parents b261d20 + 9bd44a7 commit c1735c9
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.embrace.android.embracesdk.worker.ScheduledWorker
import java.lang.Long.min
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

internal class LogOrchestrator(
private val logOrchestratorScheduledWorker: ScheduledWorker,
Expand All @@ -14,10 +15,10 @@ internal class LogOrchestrator(
private val deliveryService: DeliveryService
) {
@Volatile
private var lastLogTime: Long = 0
private var lastLogTime: AtomicLong = AtomicLong(0)

@Volatile
private var firstLogInBatchTime: Long = 0
private var firstLogInBatchTime: AtomicLong = AtomicLong(0)

@Volatile
private var scheduledCheckFuture: ScheduledFuture<*>? = null
Expand All @@ -27,16 +28,12 @@ internal class LogOrchestrator(
}

private fun onLogsAdded() {
lastLogTime = clock.now()
if (firstLogInBatchTime == 0L) {
firstLogInBatchTime = lastLogTime
}
lastLogTime.set(clock.now())
firstLogInBatchTime.compareAndSet(0, lastLogTime.get())
if (!sendLogsIfNeeded()) {
// If [firstLogInBatchTime] was cleared by a concurrent call to [sendLogsIfNeeded]
// then update it to the time of this log
if (firstLogInBatchTime == 0L) {
firstLogInBatchTime = lastLogTime
}
firstLogInBatchTime.compareAndSet(0, lastLogTime.get())
scheduleCheck()
}
}
Expand All @@ -47,20 +44,19 @@ internal class LogOrchestrator(
@Synchronized
private fun sendLogsIfNeeded(): Boolean {
val now = clock.now()
val shouldSendLogs = sink.completedLogs().size >= MAX_LOGS_PER_BATCH ||
now - lastLogTime > MAX_INACTIVITY_TIME ||
(firstLogInBatchTime != 0L && now - firstLogInBatchTime > MAX_BATCH_TIME)
val shouldSendLogs = isMaxLogsPerBatchReached() ||
isMaxInactivityTimeReached(now) ||
isMaxBatchTimeReached(now)

if (!shouldSendLogs) {
// None of the conditions to send the logs is met
return false
}

scheduledCheckFuture?.cancel(false)
scheduledCheckFuture = null
firstLogInBatchTime = 0
firstLogInBatchTime.set(0)

val storedLogs = sink.flushLogs()
val storedLogs = sink.flushLogs(MAX_LOGS_PER_BATCH)

if (storedLogs.isNotEmpty()) {
deliveryService.sendLogs(LogPayload(logs = storedLogs))
Expand All @@ -71,8 +67,8 @@ internal class LogOrchestrator(

private fun scheduleCheck() {
val now = clock.now()
val nextBatchCheck = MAX_BATCH_TIME - (now - firstLogInBatchTime)
val nextInactivityCheck = MAX_INACTIVITY_TIME - (now - lastLogTime)
val nextBatchCheck = MAX_BATCH_TIME - (now - firstLogInBatchTime.get())
val nextInactivityCheck = MAX_INACTIVITY_TIME - (now - lastLogTime.get())
scheduledCheckFuture?.cancel(false)
scheduledCheckFuture = logOrchestratorScheduledWorker.schedule<Unit>(
::sendLogsIfNeeded,
Expand All @@ -81,6 +77,16 @@ internal class LogOrchestrator(
)
}

private fun isMaxLogsPerBatchReached(): Boolean =
sink.completedLogs().size >= MAX_LOGS_PER_BATCH

private fun isMaxInactivityTimeReached(now: Long): Boolean =
now - lastLogTime.get() > MAX_INACTIVITY_TIME

private fun isMaxBatchTimeReached(now: Long): Boolean {
val firstLogInBatchTime = firstLogInBatchTime.get()
return firstLogInBatchTime != 0L && now - firstLogInBatchTime > MAX_BATCH_TIME
}
companion object {
private const val MAX_LOGS_PER_BATCH = 50
private const val MAX_BATCH_TIME = 5000L // In milliseconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ internal interface LogSink {
/**
* Returns and clears the currently stored Logs. Implementations of this method must make sure the clearing and returning is
* atomic, i.e. logs cannot be added during this operation.
* @param max The maximum number of logs to flush. If null, all logs are flushed.
*/
fun flushLogs(): List<EmbraceLogRecordData>
fun flushLogs(max: Int? = null): List<EmbraceLogRecordData>

/**
* Registers a callback to be called when logs are stored.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package io.embrace.android.embracesdk.internal.logs

import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.logs.data.LogRecordData
import java.util.concurrent.ConcurrentLinkedQueue

internal class LogSinkImpl : LogSink {
private val storedLogs: MutableList<EmbraceLogRecordData> = mutableListOf()
private val storedLogs: ConcurrentLinkedQueue<EmbraceLogRecordData> = ConcurrentLinkedQueue()
private var onLogsStored: (() -> Unit)? = null
private val flushLock = Any()

override fun storeLogs(logs: List<LogRecordData>): CompletableResultCode {
try {
synchronized(storedLogs) {
storedLogs += logs.map { EmbraceLogRecordData(logRecordData = it) }
}
storedLogs.addAll(logs.map { EmbraceLogRecordData(logRecordData = it) })
onLogsStored?.invoke()
} catch (t: Throwable) {
return CompletableResultCode.ofFailure()
Expand All @@ -21,15 +21,16 @@ internal class LogSinkImpl : LogSink {
}

override fun completedLogs(): List<EmbraceLogRecordData> {
synchronized(storedLogs) {
return storedLogs.toList()
}
return storedLogs.toList()
}

override fun flushLogs(): List<EmbraceLogRecordData> {
synchronized(storedLogs) {
val flushedLogs = storedLogs.toList()
storedLogs.clear()
override fun flushLogs(max: Int?): List<EmbraceLogRecordData> {
synchronized(flushLock) {
val maxIndex = max?.let {
minOf(storedLogs.size, it)
} ?: storedLogs.size
val flushedLogs = storedLogs.take(maxIndex)
storedLogs.removeAll(flushedLogs.toSet())
return flushedLogs
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.embrace.android.embracesdk.internal.logs

import io.embrace.android.embracesdk.FakeDeliveryService
import io.embrace.android.embracesdk.concurrency.BlockingScheduledExecutorService
import io.embrace.android.embracesdk.concurrency.SingleThreadTestScheduledExecutor
import io.embrace.android.embracesdk.fakes.FakeClock
import io.embrace.android.embracesdk.fakes.FakeLogRecordData
import io.embrace.android.embracesdk.worker.ScheduledWorker
Expand All @@ -12,6 +13,9 @@ import org.junit.Assert.assertNotNull
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit

internal class LogOrchestratorTest {

Expand Down Expand Up @@ -89,6 +93,34 @@ internal class LogOrchestratorTest {
verifyPayload(4)
}

@Test
fun `simulate race condition`() {
val fakeLog = FakeLogRecordData()
val fakeLogs = mutableListOf<LogRecordData>()
val threads = mutableListOf<ScheduledExecutorService>()
val latch = CountDownLatch(49)
repeat(49) {
fakeLogs.add(fakeLog)
threads.add(SingleThreadTestScheduledExecutor())
}
logSink.storeLogs(fakeLogs)
threads.forEach { thread ->
thread.schedule(
{
logSink.storeLogs(listOf(fakeLog))
latch.countDown()
},
10L,
TimeUnit.MILLISECONDS
)
}

latch.await(1000L, TimeUnit.MILLISECONDS)

assertEquals("Too many payloads sent", 1, deliveryService.lastSentLogPayloads.size)
assertEquals("Too many logs in payload", 50, deliveryService.lastSentLogPayloads[0].logs.size)
}

private fun verifyPayload(numberOfLogs: Int) {
assertNotNull(deliveryService.lastSentLogPayloads)
assertEquals(1, deliveryService.lastSentLogPayloads.size)
Expand Down

0 comments on commit c1735c9

Please sign in to comment.