Skip to content

Commit

Permalink
start pulling code out
Browse files Browse the repository at this point in the history
  • Loading branch information
haileys committed Jun 21, 2024
1 parent 59c156e commit 0358d4d
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 1 deletion.
4 changes: 4 additions & 0 deletions bark-protocol/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ impl ClockDelta {
pub struct TimestampDelta(i64);

impl TimestampDelta {
pub fn zero() -> TimestampDelta {
TimestampDelta(0)
}

pub fn from_clock_delta_lossy(delta: ClockDelta) -> TimestampDelta {
TimestampDelta((delta.0 * i64::from(SAMPLE_RATE.0)) / 1_000_000)
}
Expand Down
4 changes: 3 additions & 1 deletion bark/src/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ use crate::socket::{ProtocolSocket, Socket, SocketOpt};
use crate::{time, stats, thread};
use crate::RunError;

mod queue;
mod stream;

pub struct Receiver {
stats: ReceiverStats,
stream: Option<Stream>,
Expand All @@ -39,7 +42,6 @@ impl Stream {
pub fn new(header: &AudioPacketHeader) -> Self {
let queue = PacketQueue::new(header);


Stream {
sid: header.sid,
latency: Aggregate::new(),
Expand Down
68 changes: 68 additions & 0 deletions bark/src/receive/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::sync::{Arc, Condvar, Mutex};

use bark_core::receive::queue::PacketQueue;
use bark_protocol::packet::Audio;

pub struct QueueSender {
shared: Arc<Shared>,
}

pub struct QueueReceiver {
shared: Arc<Shared>,
}

struct Shared {
queue: Mutex<Option<PacketQueue>>,
notify: Condvar,
}

pub fn channel(queue: PacketQueue) -> (QueueSender, QueueReceiver) {
let shared = Arc::new(Shared {
queue: Mutex::new(Some(queue)),
notify: Condvar::new(),
});

let tx = QueueSender { shared: shared.clone() };
let rx = QueueReceiver { shared: shared.clone() };

(tx, rx)
}

#[derive(Debug, Clone, Copy)]
pub struct Disconnected;

impl QueueSender {
pub fn send(&self, packet: Audio) -> Result<usize, Disconnected> {
let mut queue = self.shared.queue.lock().unwrap();

let Some(queue) = queue.as_mut() else {
return Err(Disconnected);
};

queue.insert_packet(packet);

self.shared.notify.notify_all();

Ok(queue.len())
}
}

impl QueueReceiver {
pub fn recv(&self) -> Result<Option<Audio>, Disconnected> {
let mut queue_lock = self.shared.queue.lock().unwrap();

loop {
let Some(queue) = queue_lock.as_mut() else {
return Err(Disconnected);
};

if queue.len() > 0 {
return Ok(queue.pop_front());
}

// if queue is empty we'll block until notified
queue_lock = self.shared.notify.wait(queue_lock).unwrap();
continue;
}
}
}
128 changes: 128 additions & 0 deletions bark/src/receive/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use std::{thread, time::Duration};

use bark_core::{audio::Frame, receive::{pipeline::Pipeline, queue::PacketQueue, timing::Timing}};
use bark_protocol::{time::{ClockDelta, SampleDuration, Timestamp, TimestampDelta}, types::{stats::receiver::StreamStatus, AudioPacketHeader, SessionId}, FRAMES_PER_PACKET};
use bytemuck::Zeroable;

use crate::{audio::Output, time};

use super::{queue::{self, Disconnected, QueueReceiver, QueueSender}, Aggregate};

pub struct Stream {
tx: QueueSender,
sid: SessionId,
}

impl Stream {
pub fn new(header: &AudioPacketHeader, output: Output) -> Self {
let queue = PacketQueue::new(header);
let (tx, rx) = queue::channel(queue);

let state = StreamState {
clock_delta: Aggregate::new(),
queue: rx,
pipeline: Pipeline::new(header),
output,
};

thread::spawn(move || {
run_stream(state);
});

Stream {
tx,
sid: header.sid,
}
}
}

struct StreamState {
clock_delta: Aggregate<ClockDelta>,
queue: QueueReceiver,
pipeline: Pipeline,
output: Output,
}

pub struct StreamStats {
status: StreamStatus,
audio_latency: TimestampDelta,
output_latency: SampleDuration,
}

impl Default for StreamStats {
fn default() -> Self {
StreamStats {
status: StreamStatus::Seek,
audio_latency: TimestampDelta::zero(),
output_latency: SampleDuration::zero(),
}
}
}

fn run_stream(mut stream: StreamState) {
let mut stats = StreamStats::default();

loop {
// get next packet from queue, or None if missing (packet loss)
let packet = match stream.queue.recv() {
Ok(rx) => rx,
Err(_) => { return; } // disconnected
};

// pass packet through decode pipeline
let mut buffer = [Frame::zeroed(); FRAMES_PER_PACKET * 2];
let frames = stream.pipeline.process(packet.as_ref(), &mut buffer);
let buffer = &buffer[0..frames];

// get current output delay
let delay = stream.output.delay().unwrap();
stats.output_latency = delay;

// calculate presentation timestamp based on output delay
let pts = time::now();
let pts = Timestamp::from_micros_lossy(pts);
let pts = pts.add(delay);

// calculate stream timing from packet timing info if present
let header_pts = packet.as_ref()
.map(|packet| packet.header().pts)
.map(Timestamp::from_micros_lossy);

let stream_pts = header_pts
.and_then(|header_pts| adjust_pts(&stream, header_pts));

let timing = stream_pts.map(|stream_pts| Timing {
real: pts,
play: stream_pts,
});

// adjust resampler rate based on stream timing info
if let Some(timing) = timing {
stream.pipeline.set_timing(timing);

if stream.pipeline.slew() {
stats.status = StreamStatus::Slew;
} else {
stats.status = StreamStatus::Sync;
}

stats.audio_latency = timing.real.delta(timing.play);
}

// send audio to ALSA
match stream.output.write(buffer) {
Ok(()) => {}
Err(e) => {
log::error!("error playing audio: {e}");
break;
}
}
}
}

/// Adjust pts from remote time to local time
fn adjust_pts(stream: &StreamState, pts: Timestamp) -> Option<Timestamp> {
stream.clock_delta.median().map(|delta| {
pts.adjust(TimestampDelta::from_clock_delta_lossy(delta))
})
}

0 comments on commit 0358d4d

Please sign in to comment.