Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature "async" #68

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2fb2709
Add feature "async"
RamType0 Nov 6, 2021
516018f
Rename internal method
RamType0 Nov 6, 2021
44169dc
Add push_async,pop_async,peek_async
RamType0 Nov 7, 2021
d8016b2
Move AsyncChunkError from rtrb to rtrb::chunks
RamType0 Nov 7, 2021
cdf5fbd
Rename ReadWriteReactor to AsyncReactor
RamType0 Nov 7, 2021
7caa714
Define some "Async" types
RamType0 Nov 7, 2021
35ad6e9
Add `#[inline]` attribute for DummyReactor methods
RamType0 Nov 7, 2021
401a3ad
Implement `Send` for `WriteChunkUninit` and `ReadChunk`
RamType0 Nov 7, 2021
9e444b8
Add tests for "async" feature
RamType0 Nov 7, 2021
e58e883
Replace unnecessary `Ordering::Acquire` with `Ordering::Relaxed`
RamType0 Nov 8, 2021
ffc0ded
Rename `AsyncReactor` to `MutexReactor`
RamType0 Nov 8, 2021
962c437
Refactor `MutexReactor`
RamType0 Nov 8, 2021
7b0930f
Separate `async_rtrb` to modules
RamType0 Nov 9, 2021
f6db6d0
Replace MutexReactor with CommitterWaitFreeReactor
RamType0 Nov 19, 2021
c2f0430
Remove dependency which is no longer needed
RamType0 Nov 19, 2021
ae3b774
Fix unregistering opponent's waker
RamType0 Nov 19, 2021
01a4180
Fix incorrect memory ordering
RamType0 Nov 19, 2021
b6dbaee
Make `Reactor` accessible
RamType0 Nov 20, 2021
72a8052
Fix warning and doc test failing
RamType0 Nov 20, 2021
022e0ee
Fix incorrect memory ordering
RamType0 Nov 22, 2021
e6e5b85
Remove unintended std dependency
RamType0 Nov 22, 2021
9e37118
Fix `WAKING` count corruption when concurrent committing
RamType0 Dec 3, 2021
f34cb3a
Fix missing `registered = true`
RamType0 Dec 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ exclude = [
[features]
default = ["std"]
std = []
async = []

[dependencies]
cache-padded = "1.1"

[dev-dependencies]
rand = "0.7"
criterion = "0.3"
tokio = {version = "1.13.0", features = ["rt","macros","time"]}

[lib]
bench = false # Don't disturb criterion command line parsing
Expand Down
307 changes: 307 additions & 0 deletions src/async_rtrb/async_reactor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
use core::{cell::{Cell, UnsafeCell}, fmt::Debug, sync::atomic::{AtomicUsize, Ordering}, task::Waker};

use crate::{Consumer, Producer,reactor::Reactor};

pub trait AsyncReactor:Reactor{
#[must_use]
fn register_read_slots_available<T>(consumer:&&mut Consumer<T,Self>,waker:&Waker,required_slots:usize)->AsyncReactorRegisterResult;
#[must_use]
fn register_write_slots_available<T>(producer:&&mut Producer<T,Self>,waker:&Waker,required_slots:usize)->AsyncReactorRegisterResult;

fn unregister_read_slots_available(&self);
fn unregister_write_slots_available(&self);
}
#[derive(Debug)]
pub enum AsyncReactorRegisterResult{
Registered,
AlreadyAvailable,
TooFewSlotsAndAbandoned(usize),
WillDeadlock(usize,usize),
}

#[derive(Debug,Default)]
pub struct CommitterWaitFreeReactor{
state:AtomicUsize,
required_slots:Cell<usize>,
producer_waker:UnsafeCell<Option<Waker>>,
consumer_waker:UnsafeCell<Option<Waker>>,
cached_abandoned:Cell<bool>,
}

const WAITING:usize = 0;
const REGISTERING:usize = 0b00001;
const ABANDONED:usize = 0b00010;
const UNREGISTERED_PRODUCER:usize = 0b00100;
const UNREGISTERED_CONSUMER:usize = 0b01000;
const WAKING:usize = 0b10000;
const MAX_WAKINGS:usize = usize::MAX >> WAKING.trailing_zeros();
impl Reactor for CommitterWaitFreeReactor{

fn pushed<T>(producer:&Producer<T,Self>) {
let buffer = &producer.buffer;
let reactor = &buffer.reactor;
if reactor.cached_abandoned.get(){
return;
}
match reactor.state.fetch_add(WAKING,Ordering::AcqRel){
UNREGISTERED_CONSUMER => {
unsafe{(*reactor.consumer_waker.get()) = None};
reactor.state.fetch_sub(WAKING | UNREGISTERED_CONSUMER,Ordering::Release);
},
s => {
if s & !UNREGISTERED_PRODUCER == 0 {
let available_slots = producer.buffer.distance(buffer.head.load(Ordering::Acquire), producer.tail.get());
if reactor.required_slots.get() <= available_slots{
if let Some(waker) = unsafe{(*reactor.consumer_waker.get()).take()}{
waker.wake();
}
}
reactor.state.fetch_sub(WAKING | (s & UNREGISTERED_PRODUCER),Ordering::Release);
}
else if s & (REGISTERING | ABANDONED) == 0 {
debug_assert!(s >> WAKING.trailing_zeros() == 1);
reactor.state.fetch_sub(WAKING,Ordering::Release);
}
else if s & ABANDONED != 0{
producer.head.set(buffer.head.load(Ordering::Relaxed));
reactor.cached_abandoned.set(true);
}
else if s >= !(WAKING-1){
panic!("Commited more than {} times while opponent thread is registering. This might be a bug or opponent thread has been aborted.",MAX_WAKINGS)
}
},
}
}

fn popped<T>(consumer:&Consumer<T,Self>) {
let buffer = &consumer.buffer;
let reactor = &buffer.reactor;
if reactor.cached_abandoned.get(){
return;
}
match reactor.state.fetch_add(WAKING,Ordering::AcqRel){
UNREGISTERED_PRODUCER => {
unsafe{(*reactor.producer_waker.get()) = None};
reactor.state.fetch_sub(WAKING | UNREGISTERED_PRODUCER,Ordering::Release);
},
s => {
if s & !UNREGISTERED_CONSUMER == 0 {
let available_slots = buffer.capacity - consumer.buffer.distance(consumer.head.get(), buffer.tail.load(Ordering::Acquire));
if reactor.required_slots.get() <= available_slots{
if let Some(waker) = unsafe{(*reactor.producer_waker.get()).take()}{
waker.wake();
}
}
reactor.state.fetch_sub(WAKING | (s & UNREGISTERED_CONSUMER),Ordering::Release);
}
else if s & (REGISTERING | ABANDONED) == 0 {
debug_assert!(s >> WAKING.trailing_zeros() == 1);
reactor.state.fetch_sub(WAKING,Ordering::Release);
}
else if s & ABANDONED != 0{
consumer.tail.set(buffer.tail.load(Ordering::Relaxed));
reactor.cached_abandoned.set(true);
}
else if s >= !(WAKING-1){
panic!("Commited more than {} times while opponent thread is registering. This might be a bug or opponent thread has been aborted.",MAX_WAKINGS)
}
},
}
}

fn dropping_producer<T>(producer:&Producer<T,Self>) {
let reactor = &producer.buffer.reactor;
match reactor.state.fetch_or(ABANDONED,Ordering::AcqRel){
WAITING=> {
if let Some(waker) = unsafe{(*reactor.consumer_waker.get()).take()}{
waker.wake();
}
},
_ => (),
}
}

fn dropping_consumer<T>(consumer:&Consumer<T,Self>) {
let reactor = &consumer.buffer.reactor;
match reactor.state.fetch_or(ABANDONED,Ordering::AcqRel){
WAITING=> {
if let Some(waker) = unsafe{(*reactor.producer_waker.get()).take()}{
waker.wake();
}
},
_ => (),
}
}
}

impl AsyncReactor for CommitterWaitFreeReactor{
fn register_read_slots_available<T>(consumer:&&mut Consumer<T,Self>,waker:&Waker,required_slots:usize)->AsyncReactorRegisterResult {
let buffer = consumer.buffer.as_ref();
let reactor = &buffer.reactor;
let cached_abandoned = &reactor.cached_abandoned;
let head = consumer.head.get();
if cached_abandoned.get(){
let available_slots = buffer.distance(head, consumer.tail.get());
return AsyncReactorRegisterResult::TooFewSlotsAndAbandoned(available_slots);
}
let tail = &buffer.tail;
let state = &reactor.state;
let mut expected = WAITING;
loop{
match state.compare_exchange_weak(expected, REGISTERING, Ordering::Acquire, Ordering::Acquire){
Ok(_) => {
if expected & UNREGISTERED_PRODUCER != 0{
unsafe{*reactor.producer_waker.get() = None};
}
if expected & UNREGISTERED_CONSUMER !=0{
unsafe{*reactor.consumer_waker.get() = None};
}
let read_slots = buffer.distance(head, tail.load(Ordering::Acquire));
let available_slots = read_slots;
if required_slots <= available_slots{
state.fetch_and(ABANDONED, Ordering::Release);
return AsyncReactorRegisterResult::AlreadyAvailable;
}
if unsafe{(*reactor.producer_waker.get()).is_some()}{
state.fetch_and(ABANDONED, Ordering::Release);
let write_slots = buffer.capacity- read_slots;
let write_insufficient = reactor.required_slots.get() - write_slots;
return AsyncReactorRegisterResult::WillDeadlock(write_insufficient,available_slots);
}
unsafe{*reactor.consumer_waker.get() = Some(waker.clone())};
reactor.required_slots.set(required_slots);
let mut expected = REGISTERING;
loop{
match state.compare_exchange_weak(expected, WAITING, Ordering::AcqRel, Ordering::Acquire){
Ok(_) => {
return AsyncReactorRegisterResult::Registered;
},
Err(actual) => {
if actual & ABANDONED != 0{
break;
}
let available_slots = buffer.distance(head, tail.load(Ordering::Acquire));
if required_slots <= available_slots{
unsafe{*reactor.consumer_waker.get() = None};
state.fetch_and(ABANDONED, Ordering::Release);
return AsyncReactorRegisterResult::AlreadyAvailable;
}
expected = actual;
},
}
}
break;
},
Err(actual) => {
if actual & ABANDONED !=0{
break;
}
let available_slots = buffer.distance(head, tail.load(Ordering::Acquire));
if required_slots <= available_slots{
return AsyncReactorRegisterResult::AlreadyAvailable;
}
expected = actual & (UNREGISTERED_PRODUCER | UNREGISTERED_CONSUMER);
},
}
}
// When acquired `ABANDONED`
let tail = tail.load(Ordering::Relaxed);
consumer.tail.set(tail);
cached_abandoned.set(true);
let available_slots = buffer.distance(head, tail);
if available_slots < required_slots{
return AsyncReactorRegisterResult::TooFewSlotsAndAbandoned(available_slots);
}else{
return AsyncReactorRegisterResult::AlreadyAvailable;
}
}

fn register_write_slots_available<T>(producer:&&mut Producer<T,Self>,waker:&Waker,required_slots:usize)->AsyncReactorRegisterResult {
let buffer = producer.buffer.as_ref();
let capacity = buffer.capacity;
let reactor = &buffer.reactor;
let cached_abandoned = &reactor.cached_abandoned;
let tail = producer.tail.get();
if cached_abandoned.get(){
let available_slots = capacity - buffer.distance(producer.head.get(), tail);
return AsyncReactorRegisterResult::TooFewSlotsAndAbandoned(available_slots);
}
let head = &buffer.head;
let state = &reactor.state;
let mut expected = WAITING;
loop{
match state.compare_exchange_weak(expected, REGISTERING, Ordering::Acquire, Ordering::Acquire){
Ok(_) => {
if expected & UNREGISTERED_PRODUCER != 0{
unsafe{*reactor.producer_waker.get() = None};
}
if expected & UNREGISTERED_CONSUMER !=0{
unsafe{*reactor.consumer_waker.get() = None};
}
let read_slots = buffer.distance(head.load(Ordering::Acquire), tail);
let available_slots = capacity - read_slots;
if required_slots <= available_slots{
state.fetch_and(ABANDONED, Ordering::Release);
return AsyncReactorRegisterResult::AlreadyAvailable;
}
if unsafe{(*reactor.consumer_waker.get()).is_some()}{
state.fetch_and(ABANDONED, Ordering::Release);
let read_insufficient = reactor.required_slots.get() - read_slots;
return AsyncReactorRegisterResult::WillDeadlock(read_insufficient,available_slots);
}
unsafe{*reactor.producer_waker.get() = Some(waker.clone())};
reactor.required_slots.set(required_slots);
let mut expected = REGISTERING;
loop{
match state.compare_exchange_weak(expected, WAITING, Ordering::AcqRel, Ordering::Acquire){
Ok(_) => {
return AsyncReactorRegisterResult::Registered;
},
Err(actual) => {
if actual & ABANDONED != 0{
break;
}
let available_slots = capacity - buffer.distance(head.load(Ordering::Acquire), tail);
if required_slots <= available_slots{
unsafe{*reactor.consumer_waker.get() = None};
state.fetch_and(ABANDONED, Ordering::Release);
return AsyncReactorRegisterResult::AlreadyAvailable;
}
expected = actual;
},
}
}
break;
},
Err(actual) => {
if actual & ABANDONED !=0{
break;
}
let available_slots = capacity - buffer.distance(head.load(Ordering::Acquire), tail);
if required_slots <= available_slots{
return AsyncReactorRegisterResult::AlreadyAvailable;
}
expected = actual & (UNREGISTERED_PRODUCER | UNREGISTERED_CONSUMER);
},
}
}
// When acquired `ABANDONED`
let head = head.load(Ordering::Relaxed);
producer.head.set(head);
cached_abandoned.set(true);
let available_slots = capacity - buffer.distance(head, tail);
if available_slots < required_slots{
return AsyncReactorRegisterResult::TooFewSlotsAndAbandoned(available_slots);
}else{
return AsyncReactorRegisterResult::AlreadyAvailable;
}
}

fn unregister_read_slots_available(&self) {
self.state.fetch_or(UNREGISTERED_CONSUMER, Ordering::Release);
}

fn unregister_write_slots_available(&self) {
self.state.fetch_or(UNREGISTERED_PRODUCER, Ordering::Release);
}
}
Loading