Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow writes and read to not block SpanSinkImpl and only synchronize flush calls #547

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package io.embrace.android.embracesdk.internal.spans

import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.trace.data.SpanData
import java.util.Queue
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicReference

internal class SpanSinkImpl : SpanSink {
private val completedSpans: MutableList<EmbraceSpanData> = mutableListOf()
private val completedSpans: Queue<EmbraceSpanData> = ConcurrentLinkedQueue()
private val spansToFlush = AtomicReference<List<EmbraceSpanData>>(listOf())

override fun storeCompletedSpans(spans: List<SpanData>): CompletableResultCode {
try {
synchronized(completedSpans) {
completedSpans += spans.map { EmbraceSpanData(spanData = it) }
}
completedSpans += spans.map { EmbraceSpanData(spanData = it) }
} catch (t: Throwable) {
return CompletableResultCode.ofFailure()
}
Expand All @@ -19,16 +21,15 @@ internal class SpanSinkImpl : SpanSink {
}

override fun completedSpans(): List<EmbraceSpanData> {
synchronized(completedSpans) {
return completedSpans.toList()
}
val spansToReturn = completedSpans.size
return completedSpans.take(spansToReturn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Instead of just returning completedSpans.toList()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we're not synchronizing this and and write, we can have the completedSpans queue being modified while we are taking a snapshot. Just to be predictable especially for the flush use case, we are doing this so we aren't trying to flush events that were added while the flush was taking place.

Once the flush is triggered, everything that comes after should not be part of what's being flushed.

}

override fun flushSpans(): List<EmbraceSpanData> {
synchronized(completedSpans) {
val flushedSpans = completedSpans.toList()
completedSpans.clear()
return flushedSpans
synchronized(spansToFlush) {
spansToFlush.set(completedSpans())
completedSpans.removeAll(spansToFlush.get().toSet())
return spansToFlush.get()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package io.embrace.android.embracesdk.internal.spans

import io.mockk.mockk
import io.embrace.android.embracesdk.concurrency.SingleThreadTestScheduledExecutor
import io.embrace.android.embracesdk.fakes.FakeSpanData
import io.mockk.every
import io.mockk.spyk
import io.mockk.unmockkObject
import io.opentelemetry.sdk.common.CompletableResultCode
import org.junit.Assert.assertEquals
import org.junit.Assert.assertSame
import org.junit.Before
import org.junit.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

internal class SpanSinkImplTests {
private lateinit var spanSink: SpanSink

@Before
fun setup() {
spanSink = SpanSinkImpl()
spanSink = spyk(SpanSinkImpl())
}

@Test
Expand All @@ -23,8 +30,8 @@ internal class SpanSinkImplTests {
}

@Test
fun `flushing clears completed spans and current session span`() {
spanSink.storeCompletedSpans(listOf(mockk(relaxed = true), mockk(relaxed = true)))
fun `flushing clears completed spans`() {
spanSink.storeCompletedSpans(listOf(FakeSpanData(), FakeSpanData()))
val snapshot = spanSink.completedSpans()
assertEquals(2, snapshot.size)

Expand All @@ -35,4 +42,48 @@ internal class SpanSinkImplTests {
}
assertEquals(0, spanSink.completedSpans().size)
}

@Test
fun `flushing does not block writing and does not clear the spans added before the flush determines what to flush`() {
spanSink.storeCompletedSpans(listOf(FakeSpanData(name = "fake1"), FakeSpanData(name = "fake2")))
val unblockCompletedSpansLatch = CountDownLatch(1)
val unblockFlushLatch = CountDownLatch(1)
val checkLock = CountDownLatch(1)
val flushedCount = AtomicInteger(-1)

// Artificially block the completedSpans() - and thus flushSpans() - from completing
every { spanSink.completedSpans() } answers {
val spans = callOriginal()
unblockFlushLatch.countDown()
unblockCompletedSpansLatch.await(1, TimeUnit.SECONDS)
spans
}

// Produces this order of operations:
// 1. thread1 flushes spanSink and is about to return 2 spans but execution is paused
// 2. thread2 adds a new span to spanSink and then unblocks thread1
// 3. thread1 will return 2 spans despite spanSink already containing the extra span added by thread2
// 4. thread1 will clear the two spans that it has flushed and returns, unblocking the check
// 5. spanSink should have 1 span in it after the flush only removing the spans that it has flushed

val thread1 = SingleThreadTestScheduledExecutor()
thread1.submit {
val flushedSpans = spanSink.flushSpans()
flushedCount.set(flushedSpans.size)
checkLock.countDown()
}

unblockFlushLatch.await(1, TimeUnit.SECONDS)
val thread2 = SingleThreadTestScheduledExecutor()
thread2.submit {
spanSink.storeCompletedSpans(listOf(FakeSpanData(name = "fake3")))
unblockCompletedSpansLatch.countDown()
}

checkLock.await(1, TimeUnit.SECONDS)
assertEquals(2, flushedCount.get())

unmockkObject(spanSink)
assertEquals(1, spanSink.completedSpans().size)
}
}