Skip to content

Commit

Permalink
Merge branch 'main' into dom/merkle-field-mask
Browse files Browse the repository at this point in the history
  • Loading branch information
domodwyer committed Sep 26, 2023
2 parents 7b221b2 + fe28568 commit 89e8fe1
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 56 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions ingester_query_client/src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use async_trait::async_trait;
use futures::StreamExt;
use std::{fmt::Debug, sync::Arc, task::Poll};
use trace::span::{Span, SpanRecorder};
use trace::span::{Span, SpanEvent, SpanRecorder};

use crate::{
error::DynError,
Expand Down Expand Up @@ -68,7 +68,7 @@ where
}) => {
tracker
.span_recorder
.event("ingester response stream starts");
.event(SpanEvent::new("ingester response stream starts"));

Ok(QueryResponse {
metadata,
Expand All @@ -79,7 +79,7 @@ where
Poll::Ready(Some(Ok(_))) => {
tracker
.span_recorder
.event("ingester response stream response");
.event(SpanEvent::new("ingester response stream response"));
}
Poll::Ready(Some(Err(e))) => {
tracker.span_recorder.error(e.to_string());
Expand Down Expand Up @@ -114,7 +114,7 @@ struct CancelationTracker {
impl Drop for CancelationTracker {
fn drop(&mut self) {
if !self.done {
self.span_recorder.event("cancelled");
self.span_recorder.event(SpanEvent::new("cancelled"));
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions iox_query/src/exec/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use query_functions::{register_scalar_functions, selectors::register_selector_ag
use std::{fmt, num::NonZeroUsize, sync::Arc};
use trace::{
ctx::SpanContext,
span::{MetaValue, Span, SpanExt, SpanRecorder},
span::{MetaValue, Span, SpanEvent, SpanExt, SpanRecorder},
};

// Reuse DataFusion error and Result types for this module
Expand Down Expand Up @@ -378,7 +378,7 @@ impl IOxSessionContext {
debug!(text=%logical_plan.display_indent_schema(), "create_physical_plan: initial plan");
let physical_plan = ctx.inner.state().create_physical_plan(logical_plan).await?;

ctx.recorder.event("physical plan");
ctx.recorder.event(SpanEvent::new("physical plan"));
debug!(text=%displayable(physical_plan.as_ref()).indent(false), "create_physical_plan: plan to run");
Ok(physical_plan)
}
Expand Down Expand Up @@ -671,7 +671,7 @@ impl IOxSessionContext {

/// Record an event on the span recorder
pub fn record_event(&mut self, name: &'static str) {
self.recorder.event(name);
self.recorder.event(SpanEvent::new(name));
}

/// Record an event on the span recorder
Expand Down
21 changes: 14 additions & 7 deletions querier/src/ingester/v1/circuit_breaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use observability_deps::tracing::{info, warn};
use parking_lot::Mutex;
use pin_project::{pin_project, pinned_drop};
use rand::rngs::mock::StepRng;
use trace::{ctx::SpanContext, span::SpanRecorder};
use trace::{
ctx::SpanContext,
span::{SpanEvent, SpanRecorder},
};

use super::flight_client::{Error as FlightClientError, IngesterFlightClient, QueryData};

Expand Down Expand Up @@ -314,7 +317,8 @@ impl IngesterFlightClient for CircuitBreakerFlightClient {
ingester_address = ingester_addr.as_ref(),
"Circuit open, not contacting ingester",
);
span_recorder.event("Circuit open, not contacting ingester");
span_recorder
.event(SpanEvent::new("Circuit open, not contacting ingester"));
(true, None, *gen)
}
Circuit::HalfOpen {
Expand All @@ -331,13 +335,15 @@ impl IngesterFlightClient for CircuitBreakerFlightClient {
ingester_address = ingester_addr.as_ref(),
"Circuit half-open and this is a test request",
);
span_recorder.event("Circuit half-open and this is a test request");
span_recorder.event(SpanEvent::new(
"Circuit half-open and this is a test request",
));
} else {
info!(
ingester_address = ingester_addr.as_ref(),
"Circuit half-open but a test request is already running, not contacting ingester",
);
span_recorder.event("Circuit half-open but a test request is already running, not contacting ingester");
span_recorder.event(SpanEvent::new("Circuit half-open but a test request is already running, not contacting ingester"));
}

(
Expand Down Expand Up @@ -417,8 +423,9 @@ impl IngesterFlightClient for CircuitBreakerFlightClient {
ingester_address = ingester_addr.as_ref(),
"Error contacting ingester, circuit opened"
);
span_recorder
.event("Error contacting ingester, circuit opened");
span_recorder.event(SpanEvent::new(
"Error contacting ingester, circuit opened",
));

(
Backoff::new_with_rng(
Expand Down Expand Up @@ -469,7 +476,7 @@ impl IngesterFlightClient for CircuitBreakerFlightClient {
Circuit::HalfOpen { metrics, gen, .. } => {
if start_gen == *gen {
info!(ingester_address = ingester_addr.as_ref(), "Circuit closed",);
span_recorder.event("Circuit closed");
span_recorder.event(SpanEvent::new("Circuit closed"));

metrics.set_closed();
*o = Circuit::Closed {
Expand Down
7 changes: 5 additions & 2 deletions querier/src/ingester/v1/invalidate_on_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::sync::Arc;

use async_trait::async_trait;
use ingester_query_grpc::IngesterQueryRequest;
use trace::{ctx::SpanContext, span::SpanRecorder};
use trace::{
ctx::SpanContext,
span::{SpanEvent, SpanRecorder},
};

use super::flight_client::{Error as FlightClientError, IngesterFlightClient, QueryData};

Expand Down Expand Up @@ -51,7 +54,7 @@ impl IngesterFlightClient for InvalidateOnErrorFlightClient {

if is_err {
self.inner.invalidate_connection(ingester_addr).await;
span_recorder.event("invalidate connection");
span_recorder.event(SpanEvent::new("invalidate connection"));
}

res
Expand Down
57 changes: 41 additions & 16 deletions trace/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,26 @@ impl Span {
}

/// Record an event on this `Span`
pub fn event(&mut self, meta: impl Into<Cow<'static, str>>) {
let event = SpanEvent {
time: Utc::now(),
msg: meta.into(),
};
self.events.push(event)
pub fn event(&mut self, event: SpanEvent) {
self.events.push(event);
}

/// Record success on this `Span` setting the status if it isn't already set
pub fn ok(&mut self, meta: impl Into<Cow<'static, str>>) {
self.event(meta);
if self.status == SpanStatus::Unknown {
self.status = SpanStatus::Ok;
}
pub fn ok(&mut self, msg: impl Into<Cow<'static, str>>) {
self.event(SpanEvent::new(msg));
self.status(SpanStatus::Ok);
}

/// Record an error on this `Span` setting the status if it isn't already set
pub fn error(&mut self, meta: impl Into<Cow<'static, str>>) {
self.event(meta);
pub fn error(&mut self, msg: impl Into<Cow<'static, str>>) {
self.event(SpanEvent::new(msg));
self.status(SpanStatus::Err);
}

/// Set status of `Span`
pub fn status(&mut self, status: SpanStatus) {
if self.status == SpanStatus::Unknown {
self.status = SpanStatus::Err;
self.status = status;
}
}

Expand Down Expand Up @@ -110,6 +109,25 @@ pub struct SpanEvent {
pub time: DateTime<Utc>,

pub msg: Cow<'static, str>,

pub metadata: HashMap<Cow<'static, str>, MetaValue>,
}

impl SpanEvent {
/// Create new event.
pub fn new(msg: impl Into<Cow<'static, str>>) -> Self {
Self {
time: Utc::now(),
msg: msg.into(),
// assume no metadata by default
metadata: HashMap::with_capacity(0),
}
}

/// Set meta data.
pub fn set_metadata(&mut self, key: impl Into<Cow<'static, str>>, value: impl Into<MetaValue>) {
self.metadata.insert(key.into(), value.into());
}
}

/// Values that can be stored in a Span's metadata and events
Expand Down Expand Up @@ -183,9 +201,9 @@ impl SpanRecorder {
}

/// Record an event on the contained `Span` if any
pub fn event(&mut self, meta: impl Into<Cow<'static, str>>) {
pub fn event(&mut self, event: SpanEvent) {
if let Some(span) = self.span.as_mut() {
span.event(meta)
span.event(event);
}
}

Expand All @@ -203,6 +221,13 @@ impl SpanRecorder {
}
}

/// Set status of contained `Span` if any
pub fn status(&mut self, status: SpanStatus) {
if let Some(span) = self.span.as_mut() {
span.status(status);
}
}

/// Take the contents of this recorder returning a new recorder
///
/// From this point on `self` will behave as if it were created with no span
Expand Down
14 changes: 11 additions & 3 deletions trace_exporters/src/jaeger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,13 @@ mod tests {
use crate::thrift::agent::{AgentSyncHandler, AgentSyncProcessor};
use chrono::{TimeZone, Utc};
use iox_time::SystemProvider;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use thrift::server::TProcessor;
use thrift::transport::TBufferChannel;
use trace::ctx::{SpanContext, SpanId, TraceId};
use trace::span::{SpanEvent, SpanStatus};
use trace::span::{MetaValue, SpanEvent, SpanStatus};

struct TestHandler {
batches: Arc<Mutex<Vec<jaeger::Batch>>>,
Expand Down Expand Up @@ -382,9 +384,11 @@ mod tests {
span.events = vec![SpanEvent {
time: Utc.timestamp_nanos(200000),
msg: "hello".into(),
metadata: HashMap::from([(Cow::from("evt_md"), MetaValue::Int(42))]),
}];
span.start = Some(Utc.timestamp_nanos(100000));
span.end = Some(Utc.timestamp_nanos(300000));
span.metadata = HashMap::from([(Cow::from("span_md"), MetaValue::Int(1337))]);

exporter.export(vec![span.clone(), span.clone()]).await;
exporter.export(vec![span.clone()]).await;
Expand Down Expand Up @@ -452,14 +456,18 @@ mod tests {
let logs = b1_s0.logs.as_ref().unwrap();
assert_eq!(logs.len(), 1);
assert_eq!(logs[0].timestamp, 200);
assert_eq!(logs[0].fields.len(), 1);
assert_eq!(logs[0].fields.len(), 2);
assert_eq!(logs[0].fields[0].key.as_str(), "event");
assert_eq!(logs[0].fields[0].v_str.as_ref().unwrap().as_str(), "hello");
assert_eq!(logs[0].fields[1].key.as_str(), "evt_md");
assert_eq!(logs[0].fields[1].v_long.unwrap(), 42);

let tags = b1_s0.tags.as_ref().unwrap();
assert_eq!(tags.len(), 1);
assert_eq!(tags.len(), 2);
assert_eq!(tags[0].key.as_str(), "ok");
assert!(tags[0].v_bool.unwrap());
assert_eq!(tags[1].key.as_str(), "span_md");
assert_eq!(tags[1].v_long.unwrap(), 1337);
}

#[test]
Expand Down
24 changes: 16 additions & 8 deletions trace_exporters/src/jaeger/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ impl TryFrom<Span> for jaeger::Span {

let tags = match s.metadata.is_empty() {
true => None,
false => Some(
s.metadata
.into_iter()
.map(|(name, value)| tag_from_meta(name.to_string(), value))
.collect(),
),
false => {
let mut md = s.metadata.into_iter().collect::<Vec<_>>();
md.sort_by(|(k1, _v1), (k2, _v2)| k1.cmp(k2));
Some(
md.into_iter()
.map(|(name, value)| tag_from_meta(name.to_string(), value))
.collect(),
)
}
};

let logs = match s.events.is_empty() {
Expand Down Expand Up @@ -115,19 +118,24 @@ impl TryFrom<SpanEvent> for jaeger::Log {
type Error = String;

fn try_from(event: SpanEvent) -> Result<Self, Self::Error> {
let mut md = event.metadata.into_iter().collect::<Vec<_>>();
md.sort_by(|(k1, _v1), (k2, _v2)| k1.cmp(k2));

Ok(Self {
timestamp: event.time.timestamp_nanos_opt().ok_or_else(|| {
format!("timestamp cannot be represented as nanos: {}", event.time)
})? / 1000,
fields: vec![jaeger::Tag {
fields: std::iter::once(jaeger::Tag {
key: "event".to_string(),
v_type: jaeger::TagType::String,
v_str: Some(event.msg.to_string()),
v_double: None,
v_bool: None,
v_long: None,
v_binary: None,
}],
})
.chain(md.into_iter().map(|(k, v)| tag_from_meta(k.to_string(), v)))
.collect(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions trace_http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition.workspace = true
license.workspace = true

[dependencies]
bytes = "1.5"
trace = { path = "../trace" }
futures = "0.3"
hashbrown = { workspace = true }
Expand Down
Loading

0 comments on commit 89e8fe1

Please sign in to comment.