package io.embrace.android.embracesdk.internal.spans import io.embrace.android.embracesdk.arch.schema.EmbraceAttributeKey import io.embrace.android.embracesdk.arch.schema.FixedAttribute import io.embrace.android.embracesdk.internal.clock.millisToNanos import io.embrace.android.embracesdk.internal.clock.nanosToMillis import io.embrace.android.embracesdk.internal.clock.normalizeTimestampAsMillis import io.embrace.android.embracesdk.internal.payload.Span import io.embrace.android.embracesdk.internal.payload.toNewPayload import io.embrace.android.embracesdk.spans.EmbraceSpan import io.embrace.android.embracesdk.spans.EmbraceSpanEvent import io.embrace.android.embracesdk.spans.EmbraceSpanEvent.Companion.inputsValid import io.embrace.android.embracesdk.spans.ErrorCode import io.embrace.android.embracesdk.spans.PersistableEmbraceSpan import io.opentelemetry.api.common.Attributes import io.opentelemetry.sdk.common.Clock import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference internal class EmbraceSpanImpl( private val spanBuilder: EmbraceSpanBuilder, private val openTelemetryClock: Clock, private val spanRepository: SpanRepository ) : PersistableEmbraceSpan { private val startedSpan: AtomicReference = AtomicReference(null) private var spanStartTimeMs: Long? = null private var spanEndTimeMs: Long? = null private var status = Span.Status.UNSET private val events = ConcurrentLinkedQueue() private val schemaAttributes = spanBuilder.fixedAttributes.associate { it.toEmbraceKeyValuePair() }.toMutableMap() private val attributes = ConcurrentHashMap() // size for ConcurrentLinkedQueues is not a constant operation, so it could be subject to race conditions // do the bookkeeping separately so we don't have to worry about this private val eventCount = AtomicInteger(0) override val parent: EmbraceSpan? = spanBuilder.parent override val traceId: String? get() = startedSpan.get()?.spanContext?.traceId override val spanId: String? get() = startedSpan.get()?.spanContext?.spanId override val isRecording: Boolean get() = startedSpan.get()?.isRecording == true override fun start(startTimeMs: Long?): Boolean { return if (startedSpan.get() != null) { false } else { var successful: Boolean val attemptedStartTimeMs = startTimeMs?.normalizeTimestampAsMillis() ?: openTelemetryClock.now().nanosToMillis() synchronized(startedSpan) { startedSpan.set(spanBuilder.startSpan(attemptedStartTimeMs)) successful = startedSpan.get() != null } if (successful) { spanStartTimeMs = attemptedStartTimeMs spanRepository.trackStartedSpan(this) } return successful } } override fun stop(errorCode: ErrorCode?, endTimeMs: Long?): Boolean { return if (!isRecording) { false } else { var successful = false val attemptedEndTimeMs = endTimeMs?.normalizeTimestampAsMillis() ?: openTelemetryClock.now().nanosToMillis() synchronized(startedSpan) { startedSpan.get()?.let { spanToStop -> allAttributes().forEach { attribute -> spanToStop.setAttribute(attribute.key, attribute.value) } events.forEach { event -> val eventAttributes = if (event.attributes.isNotEmpty()) { Attributes.builder().fromMap(event.attributes).build() } else { Attributes.empty() } spanToStop.addEvent( event.name, eventAttributes, event.timestampNanos, TimeUnit.NANOSECONDS ) } spanToStop.endSpan(errorCode, attemptedEndTimeMs) successful = !isRecording } } if (successful) { status = if (errorCode != null) { Span.Status.ERROR } else { Span.Status.OK } spanEndTimeMs = attemptedEndTimeMs spanId?.let { spanRepository.trackedSpanStopped(it) } } return successful } } override fun addEvent(name: String, timestampMs: Long?, attributes: Map?): Boolean { if (eventCount.get() < MAX_EVENT_COUNT && inputsValid(name, attributes)) { val newEvent = EmbraceSpanEvent.create( name = name, timestampMs = timestampMs?.normalizeTimestampAsMillis() ?: openTelemetryClock.now().nanosToMillis(), attributes = attributes ) synchronized(eventCount) { if (eventCount.get() < MAX_EVENT_COUNT && isRecording) { events.add(newEvent) eventCount.incrementAndGet() return true } } } return false } override fun addAttribute(key: String, value: String): Boolean { if (attributes.size < MAX_ATTRIBUTE_COUNT && attributeValid(key, value)) { synchronized(attributes) { if (attributes.size < MAX_ATTRIBUTE_COUNT && isRecording) { attributes[key] = value return true } } } return false } override fun snapshot(): Span? { return if (canSnapshot()) { Span( traceId = traceId, spanId = spanId, parentSpanId = parent?.spanId, name = spanBuilder.spanName, startTimeUnixNano = spanStartTimeMs?.millisToNanos(), endTimeUnixNano = spanEndTimeMs?.millisToNanos(), status = status, events = events.map(EmbraceSpanEvent::toNewPayload), attributes = allAttributes().toNewPayload() ) } else { null } } override fun hasEmbraceAttribute(fixedAttribute: FixedAttribute): Boolean = allAttributes().hasFixedAttribute(fixedAttribute) override fun getAttribute(key: EmbraceAttributeKey): String? = allAttributes()[key.name] override fun removeCustomAttribute(key: String): Boolean = attributes.remove(key) != null private fun allAttributes(): Map = attributes + schemaAttributes private fun canSnapshot(): Boolean = spanId != null && spanStartTimeMs != null internal fun wrappedSpan(): io.opentelemetry.api.trace.Span? = startedSpan.get() companion object { internal const val MAX_NAME_LENGTH = 50 internal const val MAX_EVENT_COUNT = 10 internal const val MAX_ATTRIBUTE_COUNT = 50 internal const val MAX_ATTRIBUTE_KEY_LENGTH = 50 internal const val MAX_ATTRIBUTE_VALUE_LENGTH = 200 internal fun attributeValid(key: String, value: String) = key.length <= MAX_ATTRIBUTE_KEY_LENGTH && value.length <= MAX_ATTRIBUTE_VALUE_LENGTH internal fun EmbraceSpan.setFixedAttribute(fixedAttribute: FixedAttribute): EmbraceSpan { addAttribute(fixedAttribute.key.name, fixedAttribute.value) return this } } }