Skip to content

Commit

Permalink
Allow writes and read to not block SpanSinkImpl and only synchronize …
Browse files Browse the repository at this point in the history
…flush calls (#547)

## Goal

First step to persisting active spans is to sort out the concurrency story and ensuring that we don't lose data if the app crashes in a state where recorded spans are only in memory.

This change stores the flushed spans in a separate list that we can serialize before clearing the completedSpans queue.

## Testing

Added test to test race condition caused by concurrent access.
  • Loading branch information
bidetofevil committed Mar 12, 2024
2 parents 033a88b + f2bbde3 commit 7d88675
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package io.embrace.android.embracesdk.internal.spans

import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.trace.data.SpanData
import java.util.Queue
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicReference

internal class SpanSinkImpl : SpanSink {
private val completedSpans: MutableList<EmbraceSpanData> = mutableListOf()
private val completedSpans: Queue<EmbraceSpanData> = ConcurrentLinkedQueue()
private val spansToFlush = AtomicReference<List<EmbraceSpanData>>(listOf())

override fun storeCompletedSpans(spans: List<SpanData>): CompletableResultCode {
try {
synchronized(completedSpans) {
completedSpans += spans.map { EmbraceSpanData(spanData = it) }
}
completedSpans += spans.map { EmbraceSpanData(spanData = it) }
} catch (t: Throwable) {
return CompletableResultCode.ofFailure()
}
Expand All @@ -19,16 +21,15 @@ internal class SpanSinkImpl : SpanSink {
}

override fun completedSpans(): List<EmbraceSpanData> {
synchronized(completedSpans) {
return completedSpans.toList()
}
val spansToReturn = completedSpans.size
return completedSpans.take(spansToReturn)
}

override fun flushSpans(): List<EmbraceSpanData> {
synchronized(completedSpans) {
val flushedSpans = completedSpans.toList()
completedSpans.clear()
return flushedSpans
synchronized(spansToFlush) {
spansToFlush.set(completedSpans())
completedSpans.removeAll(spansToFlush.get().toSet())
return spansToFlush.get()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package io.embrace.android.embracesdk.internal.spans

import io.mockk.mockk
import io.embrace.android.embracesdk.concurrency.SingleThreadTestScheduledExecutor
import io.embrace.android.embracesdk.fakes.FakeSpanData
import io.mockk.every
import io.mockk.spyk
import io.mockk.unmockkObject
import io.opentelemetry.sdk.common.CompletableResultCode
import org.junit.Assert.assertEquals
import org.junit.Assert.assertSame
import org.junit.Before
import org.junit.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

internal class SpanSinkImplTests {
private lateinit var spanSink: SpanSink

@Before
fun setup() {
spanSink = SpanSinkImpl()
spanSink = spyk(SpanSinkImpl())
}

@Test
Expand All @@ -23,8 +30,8 @@ internal class SpanSinkImplTests {
}

@Test
fun `flushing clears completed spans and current session span`() {
spanSink.storeCompletedSpans(listOf(mockk(relaxed = true), mockk(relaxed = true)))
fun `flushing clears completed spans`() {
spanSink.storeCompletedSpans(listOf(FakeSpanData(), FakeSpanData()))
val snapshot = spanSink.completedSpans()
assertEquals(2, snapshot.size)

Expand All @@ -35,4 +42,48 @@ internal class SpanSinkImplTests {
}
assertEquals(0, spanSink.completedSpans().size)
}

@Test
fun `flushing does not block writing and does not clear the spans added before the flush determines what to flush`() {
spanSink.storeCompletedSpans(listOf(FakeSpanData(name = "fake1"), FakeSpanData(name = "fake2")))
val unblockCompletedSpansLatch = CountDownLatch(1)
val unblockFlushLatch = CountDownLatch(1)
val checkLock = CountDownLatch(1)
val flushedCount = AtomicInteger(-1)

// Artificially block the completedSpans() - and thus flushSpans() - from completing
every { spanSink.completedSpans() } answers {
val spans = callOriginal()
unblockFlushLatch.countDown()
unblockCompletedSpansLatch.await(1, TimeUnit.SECONDS)
spans
}

// Produces this order of operations:
// 1. thread1 flushes spanSink and is about to return 2 spans but execution is paused
// 2. thread2 adds a new span to spanSink and then unblocks thread1
// 3. thread1 will return 2 spans despite spanSink already containing the extra span added by thread2
// 4. thread1 will clear the two spans that it has flushed and returns, unblocking the check
// 5. spanSink should have 1 span in it after the flush only removing the spans that it has flushed

val thread1 = SingleThreadTestScheduledExecutor()
thread1.submit {
val flushedSpans = spanSink.flushSpans()
flushedCount.set(flushedSpans.size)
checkLock.countDown()
}

unblockFlushLatch.await(1, TimeUnit.SECONDS)
val thread2 = SingleThreadTestScheduledExecutor()
thread2.submit {
spanSink.storeCompletedSpans(listOf(FakeSpanData(name = "fake3")))
unblockCompletedSpansLatch.countDown()
}

checkLock.await(1, TimeUnit.SECONDS)
assertEquals(2, flushedCount.get())

unmockkObject(spanSink)
assertEquals(1, spanSink.completedSpans().size)
}
}

0 comments on commit 7d88675

Please sign in to comment.