Skip to content

Commit

Permalink
AudioServer+LibAudio: Make mixing queue-based instead of buffer-based.
Browse files Browse the repository at this point in the history
Each client connection now sets up an ASBufferQueue, which is basically a
queue of ABuffers. This allows us to immediately start streaming the next
pending buffer whenever our current buffer runs out of samples.

This makes the majority of the skippiness go away for me. :^)

Also get rid of the old PlayBuffer API, since we don't need it anymore.
  • Loading branch information
awesomekling committed Jul 28, 2019
1 parent 66db6f4 commit be31e22
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 113 deletions.
10 changes: 0 additions & 10 deletions Libraries/LibAudio/AClientConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,6 @@ void AClientConnection::handshake()
set_my_client_id(response.greeting.your_client_id);
}

void AClientConnection::play(const ABuffer& buffer, bool block)
{
const_cast<ABuffer&>(buffer).shared_buffer().share_with(server_pid());
ASAPI_ClientMessage request;
request.type = ASAPI_ClientMessage::Type::PlayBuffer;
request.play_buffer.buffer_id = buffer.shared_buffer_id();
sync_request(request, block ? ASAPI_ServerMessage::Type::FinishedPlayingBuffer : ASAPI_ServerMessage::Type::PlayingBuffer);
}

void AClientConnection::enqueue(const ABuffer& buffer)
{
for (;;) {
Expand All @@ -36,7 +27,6 @@ void AClientConnection::enqueue(const ABuffer& buffer)
auto response = sync_request(request, ASAPI_ServerMessage::Type::EnqueueBufferResponse);
if (response.success)
break;
dbg() << "EnqueueBuffer failed, retrying...";
sleep(1);
}
}
1 change: 0 additions & 1 deletion Libraries/LibAudio/AClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@ class AClientConnection : public IPC::Client::Connection<ASAPI_ServerMessage, AS
AClientConnection();

virtual void handshake() override;
void play(const ABuffer&, bool block);
void enqueue(const ABuffer&);
};
1 change: 0 additions & 1 deletion Libraries/LibAudio/ASAPI.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ struct ASAPI_ClientMessage {
enum class Type {
Invalid,
Greeting,
PlayBuffer,
EnqueueBuffer,
};

Expand Down
41 changes: 6 additions & 35 deletions Servers/AudioServer/ASClientConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,44 +38,26 @@ bool ASClientConnection::handle_message(const ASAPI_ClientMessage& message, cons
case ASAPI_ClientMessage::Type::Greeting:
set_client_pid(message.greeting.client_pid);
break;
case ASAPI_ClientMessage::Type::PlayBuffer: {
auto shared_buffer = SharedBuffer::create_from_shared_buffer_id(message.play_buffer.buffer_id);
if (!shared_buffer) {
did_misbehave();
return false;
}

// we no longer need the buffer, so acknowledge that it's playing
ASAPI_ServerMessage reply;
reply.type = ASAPI_ServerMessage::Type::PlayingBuffer;
reply.playing_buffer.buffer_id = message.play_buffer.buffer_id;
post_message(reply);

m_mixer.queue(*this, ABuffer::create_with_shared_buffer(*shared_buffer));
break;
}
case ASAPI_ClientMessage::Type::EnqueueBuffer: {
auto shared_buffer = SharedBuffer::create_from_shared_buffer_id(message.play_buffer.buffer_id);
if (!shared_buffer) {
did_misbehave();
return false;
}

static const int max_in_queue = 2;

ASAPI_ServerMessage reply;
reply.type = ASAPI_ServerMessage::Type::EnqueueBufferResponse;
reply.playing_buffer.buffer_id = message.play_buffer.buffer_id;
if (m_buffer_queue.size() >= max_in_queue) {

if (!m_queue)
m_queue = m_mixer.create_queue(*this);

if (m_queue->is_full()) {
reply.success = false;
} else {
m_buffer_queue.enqueue(ABuffer::create_with_shared_buffer(*shared_buffer));
m_queue->enqueue(ABuffer::create_with_shared_buffer(*shared_buffer));
}
post_message(reply);

if (m_playing_queued_buffer_id == -1)
play_next_in_queue();

break;
}
case ASAPI_ClientMessage::Type::Invalid:
Expand All @@ -89,19 +71,8 @@ bool ASClientConnection::handle_message(const ASAPI_ClientMessage& message, cons

void ASClientConnection::did_finish_playing_buffer(Badge<ASMixer>, int buffer_id)
{
if (m_playing_queued_buffer_id == buffer_id)
play_next_in_queue();

ASAPI_ServerMessage reply;
reply.type = ASAPI_ServerMessage::Type::FinishedPlayingBuffer;
reply.playing_buffer.buffer_id = buffer_id;
post_message(reply);
}

void ASClientConnection::play_next_in_queue()
{
dbg() << "Playing next in queue (" << m_buffer_queue.size() << " queued)";
auto buffer = m_buffer_queue.dequeue();
m_playing_queued_buffer_id = buffer->shared_buffer_id();
m_mixer.queue(*this, move(buffer));
}
7 changes: 2 additions & 5 deletions Servers/AudioServer/ASClientConnection.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#pragma once

#include <AK/Queue.h>
#include <LibAudio/ASAPI.h>
#include <LibCore/CoreIPCServer.h>

class ABuffer;
class ASBufferQueue;
class ASMixer;

class ASClientConnection final : public IPC::Server::Connection<ASAPI_ServerMessage, ASAPI_ClientMessage> {
Expand All @@ -18,9 +18,6 @@ class ASClientConnection final : public IPC::Server::Connection<ASAPI_ServerMess
void did_finish_playing_buffer(Badge<ASMixer>, int buffer_id);

private:
void play_next_in_queue();

ASMixer& m_mixer;
Queue<NonnullRefPtr<ABuffer>> m_buffer_queue;
int m_playing_queued_buffer_id { -1 };
RefPtr<ASBufferQueue> m_queue;
};
81 changes: 30 additions & 51 deletions Servers/AudioServer/ASMixer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,60 @@ ASMixer::ASMixer()
ASMixer* mixer = (ASMixer*)context;
mixer->mix();
return 0;
}, this);
},
this);
}

void ASMixer::queue(ASClientConnection& client, const ABuffer& buffer)
NonnullRefPtr<ASBufferQueue> ASMixer::create_queue(ASClientConnection& client)
{
ASSERT(buffer.size_in_bytes());
CLocker lock(m_lock);
m_pending_mixing.append(ASMixerBuffer(buffer, client));
LOCKER(m_lock);
auto queue = adopt(*new ASBufferQueue(client));
m_pending_mixing.append(*queue);
return queue;
}

void ASMixer::mix()
{
Vector<ASMixerBuffer> active_mix_buffers;
decltype(m_pending_mixing) active_mix_queues;

for (;;) {
{
CLocker lock(m_lock);
active_mix_buffers.append(move(m_pending_mixing));
LOCKER(m_lock);
active_mix_queues.append(move(m_pending_mixing));
}

// ### use a wakeup of some kind rather than this garbage
if (active_mix_buffers.size() == 0) {
if (active_mix_queues.size() == 0) {
// nothing to mix yet
usleep(10000);
continue;
}

int max_size = 0;

for (auto& buffer : active_mix_buffers) {
if (buffer.done)
continue;
ASSERT(buffer.buffer->size_in_bytes()); // zero sized buffer? how?
max_size = max(max_size, buffer.buffer->size_in_bytes() - buffer.pos);
}

// ### clear up 'done' buffers more aggressively
if (max_size == 0) {
active_mix_buffers.clear();
continue;
}

max_size = min(1023, max_size);

Vector<ASample, 1024> mixed_buffer;
mixed_buffer.resize(max_size);
ASample mixed_buffer[1024];
auto mixed_buffer_length = (int)(sizeof(mixed_buffer) / sizeof(ASample));

// Mix the buffers together into the output
for (auto& buffer : active_mix_buffers) {
if (buffer.done)
for (auto& queue : active_mix_queues) {
if (!queue->client()) {
queue->clear();
continue;
auto* samples = buffer.buffer->samples();
auto sample_count = buffer.buffer->sample_count();

for (int i = 0; i < max_size && buffer.pos < sample_count; ++buffer.pos, ++i) {
auto& mixed_sample = mixed_buffer[i];
mixed_sample += samples[buffer.pos];
}

// clear it later
if (buffer.pos == sample_count) {
if (buffer.m_client)
buffer.m_client->did_finish_playing_buffer({}, buffer.buffer->shared_buffer_id());
buffer.done = true;
for (int i = 0; i < mixed_buffer_length; ++i) {
auto& mixed_sample = mixed_buffer[i];
ASample sample;
if (!queue->get_next_sample(sample))
break;
mixed_sample += sample;
}
}

// output the mixed stuff to the device
// max_size is 0 indexed, so add 1.
const int output_buffer_byte_size = (max_size + 1) * 2 * 2;
ASSERT(output_buffer_byte_size == 4096);
u8 raw_buffer[4096];
auto buffer = ByteBuffer::wrap(raw_buffer, sizeof(raw_buffer));
BufferStream stream(buffer);

for (int i = 0; i < mixed_buffer.size(); ++i) {
for (int i = 0; i < mixed_buffer_length; ++i) {
auto& mixed_sample = mixed_buffer[i];
mixed_sample.clip();

Expand All @@ -102,20 +80,21 @@ void ASMixer::mix()
ASSERT(!stream.at_end()); // we should have enough space for both channels in one buffer!
out_sample = mixed_sample.right * std::numeric_limits<i16>::max();
stream << out_sample;

ASSERT(!stream.at_end());
}

if (stream.offset() != 0) {
buffer.trim(stream.offset());
m_device.write(buffer);
mixed_buffer.resize(0);
}
}
}

ASMixer::ASMixerBuffer::ASMixerBuffer(const NonnullRefPtr<ABuffer>& buf, ASClientConnection& client)
: buffer(buf)
, m_client(client.make_weak_ptr())
ASBufferQueue::ASBufferQueue(ASClientConnection& client)
: m_client(client.make_weak_ptr())
{
}

void ASBufferQueue::enqueue(NonnullRefPtr<ABuffer>&& buffer)
{
m_queue.enqueue(move(buffer));
}
51 changes: 41 additions & 10 deletions Servers/AudioServer/ASMixer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <AK/ByteBuffer.h>
#include <AK/NonnullRefPtrVector.h>
#include <AK/Queue.h>
#include <AK/RefCounted.h>
#include <AK/WeakPtr.h>
#include <LibAudio/ABuffer.h>
Expand All @@ -10,22 +11,52 @@

class ASClientConnection;

class ASBufferQueue : public RefCounted<ASBufferQueue> {
public:
explicit ASBufferQueue(ASClientConnection&);
~ASBufferQueue() {}

bool is_full() const { return m_queue.size() >= 3; }
void enqueue(NonnullRefPtr<ABuffer>&&);

bool get_next_sample(ASample& sample)
{
while (!m_current && !m_queue.is_empty())
m_current = m_queue.dequeue();
if (!m_current)
return false;
sample = m_current->samples()[m_position++];
if (m_position >= m_current->sample_count()) {
m_current = nullptr;
m_position = 0;
}
return true;
}

ASClientConnection* client() { return m_client.ptr(); }
void clear()
{
m_queue.clear();
m_position = 0;
}

private:
RefPtr<ABuffer> m_current;
Queue<NonnullRefPtr<ABuffer>> m_queue;
int m_position { 0 };
int m_playing_queued_buffer_id { -1 };
WeakPtr<ASClientConnection> m_client;
};

class ASMixer : public RefCounted<ASMixer> {
public:
ASMixer();

void queue(ASClientConnection&, const ABuffer&);
NonnullRefPtr<ASBufferQueue> create_queue(ASClientConnection&);

private:
struct ASMixerBuffer {
ASMixerBuffer(const NonnullRefPtr<ABuffer>&, ASClientConnection&);
NonnullRefPtr<ABuffer> buffer;
int pos { 0 };
bool done { false };
WeakPtr<ASClientConnection> m_client;
};

Vector<ASMixerBuffer> m_pending_mixing;
Vector<NonnullRefPtr<ASBufferQueue>> m_pending_mixing;

CFile m_device;
CLock m_lock;

Expand Down

0 comments on commit be31e22

Please sign in to comment.