Skip to content

Commit

Permalink
use Frame count in receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
haileys committed Dec 29, 2023
1 parent 69eb04d commit 45fe663
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 58 deletions.
4 changes: 4 additions & 0 deletions bark-core/src/audio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ pub type Sample = f32;
#[repr(C)]
pub struct Frame(pub Sample, pub Sample);

#[derive(Copy, Clone, Debug)]
#[repr(transparent)]
pub struct FrameCount(pub usize);

pub fn from_interleaved(samples: &[Sample]) -> &[Frame] {
// ensure samples contains whole frames only
assert_eq!(0, samples.len() % usize::from(CHANNELS));
Expand Down
18 changes: 11 additions & 7 deletions bark-core/src/decode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ pub mod pcm;

use core::fmt::Display;

use bark_protocol::packet::Audio;
use thiserror::Error;

use bark_protocol::FRAMES_PER_PACKET;
use bark_protocol::packet::Audio;
use bark_protocol::types::{AudioPacketHeader, AudioPacketFormat};
use bark_protocol::SAMPLES_PER_PACKET;

use crate::audio::Frame;

#[derive(Debug, Error)]
pub enum NewDecoderError {
Expand All @@ -19,8 +21,10 @@ pub enum NewDecoderError {

#[derive(Debug, Error)]
pub enum DecodeError {
#[error("wrong length: {length}, expected: {expected}")]
#[error("wrong byte length: {length}, expected: {expected}")]
WrongLength { length: usize, expected: usize },
#[error("wrong frame count: {frames}, expected: {expected}")]
WrongFrameCount { frames: usize, expected: usize },
#[error("opus codec error: {0}")]
Opus(#[from] ::opus::Error),
}
Expand All @@ -29,7 +33,7 @@ pub struct Decoder {
decode: DecodeFormat,
}

pub type SampleBuffer = [f32; SAMPLES_PER_PACKET];
pub type FrameBuffer = [Frame; FRAMES_PER_PACKET];

impl Decoder {
pub fn new(header: &AudioPacketHeader) -> Result<Self, NewDecoderError> {
Expand All @@ -47,14 +51,14 @@ impl Decoder {
&self.decode as &dyn Display
}

pub fn decode(&mut self, packet: Option<&Audio>, out: &mut SampleBuffer) -> Result<(), DecodeError> {
pub fn decode(&mut self, packet: Option<&Audio>, out: &mut FrameBuffer) -> Result<(), DecodeError> {
let bytes = packet.map(|packet| packet.buffer_bytes());
self.decode.decode_packet(bytes, out)
}
}

trait Decode: Display {
fn decode_packet(&mut self, bytes: Option<&[u8]>, out: &mut SampleBuffer) -> Result<(), DecodeError>;
fn decode_packet(&mut self, bytes: Option<&[u8]>, out: &mut FrameBuffer) -> Result<(), DecodeError>;
}

enum DecodeFormat {
Expand All @@ -64,7 +68,7 @@ enum DecodeFormat {
}

impl Decode for DecodeFormat {
fn decode_packet(&mut self, bytes: Option<&[u8]>, out: &mut SampleBuffer) -> Result<(), DecodeError> {
fn decode_packet(&mut self, bytes: Option<&[u8]>, out: &mut FrameBuffer) -> Result<(), DecodeError> {
match self {
DecodeFormat::S16LE(dec) => dec.decode_packet(bytes, out),
DecodeFormat::F32LE(dec) => dec.decode_packet(bytes, out),
Expand Down
18 changes: 10 additions & 8 deletions bark-core/src/decode/opus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use core::fmt::{self, Display};

use bark_protocol::SAMPLE_RATE;

use super::{Decode, DecodeError, SampleBuffer};
use crate::audio;

use super::{Decode, DecodeError, FrameBuffer};

pub struct OpusDecoder {
opus: opus::Decoder,
Expand All @@ -26,16 +28,16 @@ impl Display for OpusDecoder {
}

impl Decode for OpusDecoder {
fn decode_packet(&mut self, bytes: Option<&[u8]>, out: &mut SampleBuffer) -> Result<(), DecodeError> {
let expected = out.len() / 2;
fn decode_packet(&mut self, bytes: Option<&[u8]>, out: &mut FrameBuffer) -> Result<(), DecodeError> {
let expected = out.len();

let length = match bytes {
Some(bytes) => self.opus.decode_float(bytes, out, false)?,
None => self.opus.decode_float(&[], out, true)?,
let frames = match bytes {
Some(bytes) => self.opus.decode_float(bytes, audio::to_interleaved_mut(out), false)?,
None => self.opus.decode_float(&[], audio::to_interleaved_mut(out), true)?,
};

if expected != length {
return Err(DecodeError::WrongLength { length, expected });
if expected != frames {
return Err(DecodeError::WrongFrameCount { frames, expected });
}

Ok(())
Expand Down
18 changes: 11 additions & 7 deletions bark-core/src/decode/pcm.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use core::fmt::{self, Display};

use super::{Decode, DecodeError, SampleBuffer};
use crate::audio;

use super::{Decode, DecodeError, FrameBuffer};

pub struct S16LEDecoder;

Expand All @@ -11,7 +13,7 @@ impl Display for S16LEDecoder {
}

impl Decode for S16LEDecoder {
fn decode_packet(&mut self, bytes: Option<&[u8]>, out: &mut SampleBuffer) -> Result<(), DecodeError> {
fn decode_packet(&mut self, bytes: Option<&[u8]>, out: &mut FrameBuffer) -> Result<(), DecodeError> {
decode_packed(bytes, out, |bytes| {
let input = i16::from_le_bytes(bytes);
let scale = i16::MAX as f32;
Expand All @@ -29,26 +31,28 @@ impl Display for F32LEDecoder {
}

impl Decode for F32LEDecoder {
fn decode_packet(&mut self, bytes: Option<&[u8]>, out: &mut SampleBuffer) -> Result<(), DecodeError> {
fn decode_packet(&mut self, bytes: Option<&[u8]>, out: &mut FrameBuffer) -> Result<(), DecodeError> {
decode_packed(bytes, out, f32::from_le_bytes)
}
}

fn decode_packed<const N: usize>(
bytes: Option<&[u8]>,
out: &mut SampleBuffer,
out: &mut FrameBuffer,
func: impl Fn([u8; N]) -> f32,
) -> Result<(), DecodeError> {
let out_samples = audio::to_interleaved_mut(out);

let Some(bytes) = bytes else {
// PCM codecs have no packet loss correction
// just zero fill and return
out.fill(0.0);
out_samples.fill(0.0);
return Ok(());
};

check_length(bytes, out.len() * N)?;
check_length(bytes, out_samples.len() * N)?;

for (input, output) in bytes.chunks_exact(N).zip(out) {
for (input, output) in bytes.chunks_exact(N).zip(out_samples) {
// when array_chunks stabilises we can use that instead
// but for now use try_into to turn a &[u8] (guaranteed len == width)
// into a [u8; width]
Expand Down
27 changes: 11 additions & 16 deletions bark-core/src/receive/resample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ffi::{c_void, c_int, CStr};
use std::fmt::Debug;
use std::ptr;

use bark_protocol::time::SampleDuration;
use crate::audio::{Frame, FrameCount};

use self::ffi::speex_resampler_strerror;

Expand Down Expand Up @@ -46,10 +46,8 @@ pub struct Resampler {
unsafe impl Send for Resampler {}

pub struct ProcessResult {
/// per-channel
pub input_read: SampleDuration,
/// per-channel
pub output_written: SampleDuration,
pub input_read: FrameCount,
pub output_written: FrameCount,
}

impl Resampler {
Expand Down Expand Up @@ -93,30 +91,27 @@ impl Resampler {
Ok(())
}

pub fn process_interleaved(&mut self, input: &[f32], output: &mut [f32])
pub fn process(&mut self, input: &[Frame], output: &mut [Frame])
-> Result<ProcessResult, SpeexError>
{
// speex API takes frame count:
let input_len = input.len() / usize::from(bark_protocol::CHANNELS);
let output_len = output.len() / usize::from(bark_protocol::CHANNELS);

// usize could technically be 64 bit, speex only takes u32 sizes,
// we don't want to panic or truncate, so let's just pick a reasonable
// length and cap input and output since the API allows us to.
// i'm going to say a reasonable length for a single call is 1<<20.
let max_reasonable_len = 1 << 20;
let input_len = std::cmp::min(input_len, max_reasonable_len);
let output_len = std::cmp::min(output_len, max_reasonable_len);
let input_len = std::cmp::min(input.len(), max_reasonable_len);
let output_len = std::cmp::min(output.len(), max_reasonable_len);

let mut input_len = u32::try_from(input_len).unwrap();
let mut output_len = u32::try_from(output_len).unwrap();

let err = unsafe {
ffi::speex_resampler_process_interleaved_float(
self.ptr.0,
input.as_ptr(),
input.as_ptr().cast(),
// speex API takes frame count already:
&mut input_len,
output.as_mut_ptr(),
output.as_mut_ptr().cast(),
&mut output_len,
)
};
Expand All @@ -126,8 +121,8 @@ impl Resampler {
}

Ok(ProcessResult {
input_read: SampleDuration::from_frame_count(u64::from(input_len)),
output_written: SampleDuration::from_frame_count(u64::from(output_len)),
input_read: FrameCount(usize::try_from(input_len).unwrap()),
output_written: FrameCount(usize::try_from(output_len).unwrap()),
})
}
}
Expand Down
12 changes: 5 additions & 7 deletions bark/src/audio/output.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alsa::Direction;
use alsa::pcm::PCM;
use bark_protocol::CHANNELS;
use bark_core::audio::{Frame, self};
use bark_protocol::time::SampleDuration;
use nix::errno::Errno;
use thiserror::Error;
Expand All @@ -23,7 +23,7 @@ impl Output {
Ok(Output { pcm })
}

pub fn write(&self, mut audio: &[f32]) -> Result<(), WriteAudioError> {
pub fn write(&self, mut audio: &[Frame]) -> Result<(), WriteAudioError> {
while audio.len() > 0 {
let n = self.write_partial(audio)?;
audio = &audio[n..];
Expand All @@ -32,7 +32,7 @@ impl Output {
Ok(())
}

fn write_partial(&self, audio: &[f32]) -> Result<usize, WriteAudioError> {
fn write_partial(&self, audio: &[Frame]) -> Result<usize, WriteAudioError> {
let io = unsafe {
// the checked versions of this function call
// snd_pcm_hw_params_current which mallocs under the hood
Expand All @@ -41,10 +41,8 @@ impl Output {

loop {
// try to write audio
let err = match io.writei(audio) {
Ok(n) => {
return Ok(n * CHANNELS.0 as usize);
}
let err = match io.writei(audio::to_interleaved(audio)) {
Ok(n) => { return Ok(n) },
Err(e) => e,
};

Expand Down
27 changes: 14 additions & 13 deletions bark/src/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use std::array;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use bark_core::decode::{Decoder, SampleBuffer};
use bark_core::audio::Frame;
use bark_core::decode::{Decoder, FrameBuffer};
use bytemuck::Zeroable;
use structopt::StructOpt;

use bark_core::receive::queue::PacketQueue;
use bark_core::receive::resample::Resampler;

use bark_protocol::{SampleRate, SAMPLES_PER_PACKET, FRAMES_PER_PACKET};
use bark_protocol::{SampleRate, FRAMES_PER_PACKET};
use bark_protocol::time::{Timestamp, SampleDuration, TimestampDelta, ClockDelta};
use bark_protocol::types::{SessionId, ReceiverId, TimePhase, AudioPacketHeader};
use bark_protocol::types::stats::receiver::{ReceiverStats, StreamStatus};
Expand Down Expand Up @@ -163,12 +164,12 @@ impl Receiver {
}
}

pub fn write_audio(&mut self, buffer: &mut [f32], pts: Timestamp) -> SampleDuration {
pub fn write_audio(&mut self, buffer: &mut [Frame], pts: Timestamp) -> usize {
// get stream start timing information:
let Some(stream) = self.stream.as_mut() else {
// stream hasn't started, just fill buffer with silence and return
buffer[0..SAMPLES_PER_PACKET].fill(0f32);
return SampleDuration::ONE_PACKET;
buffer[0..FRAMES_PER_PACKET].fill(Frame::zeroed());
return FRAMES_PER_PACKET;
};

// get next packet from queue, or None if missing (packet loss)
Expand Down Expand Up @@ -203,29 +204,29 @@ impl Receiver {
}

// decode packet
let mut decode_buffer: SampleBuffer = array::from_fn(|_| 0.0);
let mut decode_buffer: FrameBuffer = array::from_fn(|_| Frame::zeroed());
if let Some(decoder) = stream.decoder.as_mut() {
match decoder.decode(packet.as_ref(), &mut decode_buffer) {
Ok(()) => {}
Err(e) => {
log::warn!("error in decoder, skipping packet: {e}");
decode_buffer.fill(0.0);
decode_buffer.fill(Frame::zeroed());
}
}
}

// resample decoded audio
let resample = stream.resampler.process_interleaved(&decode_buffer, buffer)
let resample = stream.resampler.process(&decode_buffer, buffer)
.expect("resample error!");

assert_eq!(resample.input_read.as_buffer_offset(), decode_buffer.len());
assert_eq!(resample.input_read.0, decode_buffer.len());

// report stats and return
self.stats.set_buffer_length(
SampleDuration::from_frame_count(
(FRAMES_PER_PACKET * stream.queue.len()).try_into().unwrap()));

resample.output_written
resample.output_written.0
}
}

Expand Down Expand Up @@ -381,14 +382,14 @@ pub fn run(opt: ReceiveOpt) -> Result<(), RunError> {

// this should be large enough for `write_audio` to process an
// entire packet with:
let mut buffer = [0f32; SAMPLES_PER_PACKET * 2];
let duration = state.recv.write_audio(&mut buffer, pts);
let mut buffer = [Frame::zeroed(); FRAMES_PER_PACKET * 2];
let count = state.recv.write_audio(&mut buffer, pts);

// drop lock before calling `Output::write` (blocking!)
drop(state);

// send audio to ALSA
match output.write(&buffer[0..duration.as_buffer_offset()]) {
match output.write(&buffer[0..count]) {
Ok(()) => {}
Err(e) => {
log::error!("error playing audio: {e}");
Expand Down

0 comments on commit 45fe663

Please sign in to comment.