Skip to content

Commit

Permalink
Merge pull request fereidani#4 from adrian-budau/main
Browse files Browse the repository at this point in the history
Fix some sync sender/receiver undefined behavior.
  • Loading branch information
fereidani committed Oct 16, 2022
2 parents d8f816a + 773a4e7 commit f6b94ac
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 22 deletions.
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl<T> Sender<T> {
}
// send directly to wait list

let mut sig = SyncSignal::new(&mut data as *mut T, std::thread::current());
let sig = SyncSignal::new(&mut data as *mut T, std::thread::current());
internal.push_send(sig.as_signal());
drop(internal);
if !sig.wait() {
Expand Down Expand Up @@ -223,7 +223,7 @@ impl<T> Sender<T> {
}
// send directly to wait list

let mut sig = SyncSignal::new(&mut data as *mut T, std::thread::current());
let sig = SyncSignal::new(&mut data as *mut T, std::thread::current());
let cancelable_sig = sig.as_signal();
internal.push_send(cancelable_sig);
drop(internal);
Expand Down Expand Up @@ -274,7 +274,7 @@ impl<T> Sender<T> {
}
// send directly to wait list
let mut d = data.take().unwrap();
let mut sig = SyncSignal::new(&mut d as *mut T, std::thread::current());
let sig = SyncSignal::new(&mut d as *mut T, std::thread::current());
internal.push_send(sig.as_signal());
drop(internal);
if !sig.wait_timeout(deadline) {
Expand Down Expand Up @@ -434,7 +434,7 @@ impl<T> Receiver<T> {
}
// no active waiter so push to queue
let mut ret = MaybeUninit::<T>::uninit();
let mut sig = SyncSignal::new(ret.as_mut_ptr(), std::thread::current());
let sig = SyncSignal::new(ret.as_mut_ptr(), std::thread::current());
internal.push_recv(sig.as_signal());
drop(internal);

Expand Down Expand Up @@ -470,7 +470,7 @@ impl<T> Receiver<T> {
}
// no active waiter so push to queue
let mut ret = MaybeUninit::<T>::uninit();
let mut sig = SyncSignal::new(ret.as_mut_ptr() as *mut T, std::thread::current());
let sig = SyncSignal::new(ret.as_mut_ptr() as *mut T, std::thread::current());
internal.push_recv(sig.as_signal());
drop(internal);
if !sig.wait_timeout(deadline) {
Expand Down
47 changes: 30 additions & 17 deletions src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,34 +185,47 @@ impl<T> SyncSignal<T> {
}

// writes data to pointer, shall not be called more than once
// has to be done through a pointer because by the end of this scope
// the object might have been destroyes by the owning receiver
#[inline(always)]
pub unsafe fn send(&self, d: T) {
pub unsafe fn send(this: *const Self, d: T) {
if std::mem::size_of::<T>() > 0 {
std::ptr::write(self.ptr, d);
std::ptr::write((*this).ptr, d);
}
if !self.state.unlock() {
self.state.force_unlock();
self.thread.unpark();
if !(*this).state.unlock() {
// Clone the thread because this.thread might be destroyed
// sometime during unpark when the other thread wakes up
let thread = (*this).thread.clone();
(*this).state.force_unlock();
thread.unpark();
}
}

// read data from pointer, shall not be called more than once
// has to be done through a pointer because by the end of this scope
// the object might have been destroyed by the owning receiver
#[inline(always)]
pub unsafe fn recv(&self) -> T {
let d = read_ptr(self.ptr);
if !self.state.unlock() {
self.state.force_unlock();
self.thread.unpark();
pub unsafe fn recv(this: *const Self) -> T {
let d = read_ptr((*this).ptr);
if !(*this).state.unlock() {
// Same as send
let thread = (*this).thread.clone();
(*this).state.force_unlock();
thread.unpark();
}
d
}

// terminates operation and notifies the waiter , shall not be called more than once
// has to be done through a pointer because by the end of this scope
// the object might have been destroyed by the owner
#[inline(always)]
pub unsafe fn terminate(&self) {
if !self.state.terminate() {
self.state.force_terminate();
self.thread.unpark();
pub unsafe fn terminate(this: *const Self) {
if !(*this).state.terminate() {
// Same as send and recv
let thread = (*this).thread.clone();
(*this).state.force_terminate();
thread.unpark();
}
}

Expand Down Expand Up @@ -284,21 +297,21 @@ impl<T> Signal<T> {

pub unsafe fn send(&self, d: T) {
match self {
Signal::Sync(sig) => (**sig).send(d),
Signal::Sync(sig) => SyncSignal::send(*sig, d),
Signal::Async(sig) => (**sig).send(d),
}
}

pub unsafe fn recv(&mut self) -> T {
match self {
Signal::Sync(sig) => (**sig).recv(),
Signal::Sync(sig) => SyncSignal::recv(*sig),
Signal::Async(sig) => (**sig).recv(),
}
}

pub unsafe fn terminate(&self) {
match self {
Signal::Sync(sig) => (**sig).terminate(),
Signal::Sync(sig) => SyncSignal::terminate(*sig),
Signal::Async(sig) => (**sig).terminate(),
}
}
Expand Down

0 comments on commit f6b94ac

Please sign in to comment.