Skip to content

Commit

Permalink
0.1.0-pre1 release
Browse files Browse the repository at this point in the history
  • Loading branch information
fereidani committed Oct 16, 2022
1 parent 8b42bd1 commit 250289f
Show file tree
Hide file tree
Showing 8 changed files with 653 additions and 189 deletions.
14 changes: 10 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
[package]
name = "kanal"
version = "0.1.0-beta2"
version = "0.1.0-pre1"
edition = "2021"
authors = ["Khashayar Fereidani"]
description = "Fastest sync and async channel that Rust deserves"
description = "The fast sync and async channel that Rust deserves"
repository = "https://github.com/fereidani/kanal"
keywords = ["channel", "mpsc", "mpmc", "csp", "async"]
keywords = ["channel", "mpsc", "mpmc", "csp", "async", "golang"]
categories = ["concurrency", "data-structures","asynchronous"]
license = "MIT"
readme = "README.md"

[dependencies]
lock_api = "0.4.7"
lock_api = "0.4"
pin-project-lite = "0.2"

[dev-dependencies]
tokio = { version = "1", features =["rt-multi-thread", "test-util", "macros"] }
crossbeam = "0.8"

[features]
default = ["async","async_short_sync"]
async=[]
async_short_sync=[]
sync=[]
14 changes: 6 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@ This library focuses on bringing both sync and async API together to unify messa
**Performance is the main goal of Kanal.**

### Benchmark Results
Results are normalized based on kanal sync results, so 60x means the test for that framework takes 60 times slower than kanal.

Machine: `AMD Ryzen Threadripper 2950X 16-Core Processor`<br />
Rust: `rustc 1.62.0`<br />
Go: `go version go1.18.3 linux/amd64`<br />
OS (`uname -a`): `Linux 5.13.0-35-generic #40~20.04.1-Ubuntu SMP Mon Mar 7 09:18:32 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux`<br />
Date: July 15, 2022
Rust: `rustc rustc 1.64.0`<br />
Go: `go version go1.19.2 linux/amd64`<br />
OS (`uname -a`): `Linux 5.13.0-35-generic #40~20.04.1-Ubuntu SMP Mon Mar 7 09:18:32 UTC 2022 x86_64`<br />
Date: Oct 16, 2022

[Benchmark codes](https://github.com/fereidani/rust-channel-benchmarks)

![Benchmark bounded channel with size 0](https://i.imgur.com/NOP91jD.png)
![Benchmark bounded channel with size 1](https://i.imgur.com/MpsuWIi.png)
![Benchmark bounded channel with size n](https://i.imgur.com/9ebey2h.png)
![Benchmark unbounded channel](https://i.imgur.com/WgrFRtK.png)
![Benchmarks](https://i.imgur.com/WgrFRtK.png)
44 changes: 35 additions & 9 deletions src/internal.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
use std::{
collections::{LinkedList, VecDeque},
sync::Arc,
};
use std::{collections::VecDeque, sync::Arc};

use crate::mutex::Mutex;
use crate::mutex::{Mutex, MutexGuard};
//use std::sync::{Mutex, MutexGuard};
//use spin::mutex::Mutex;

use crate::signal::Signal;

pub type Internal<T> = Arc<Mutex<ChannelInternal<T>>>;

#[inline(always)]
pub fn acquire_internal<T>(internal: &'_ Internal<T>) -> MutexGuard<'_, ChannelInternal<T>> {
internal.lock()
}

/// Internal of channel that holds queues, wait lists and general state of channel,
/// it's shared among senders and receivers with an atomic counter and a mutex
pub struct ChannelInternal<T> {
// KEEP THE ORDER
pub queue: VecDeque<T>,
// receiver will wait until queue becomes full
pub recv_wait: LinkedList<Signal<T>>,
pub recv_wait: VecDeque<Signal<T>>,
// sender will wait until queue becomes empty
pub send_wait: LinkedList<Signal<T>>,
pub send_wait: VecDeque<Signal<T>>,
pub capacity: usize,
pub recv_count: u32,
pub send_count: u32,
Expand All @@ -34,8 +38,8 @@ impl<T> ChannelInternal<T> {

let ret = Self {
queue: VecDeque::with_capacity(capacity),
recv_wait: LinkedList::new(),
send_wait: LinkedList::new(),
recv_wait: VecDeque::new(),
send_wait: VecDeque::new(),
recv_count: 1,
send_count: 1,
capacity: abstract_capacity,
Expand Down Expand Up @@ -77,6 +81,28 @@ impl<T> ChannelInternal<T> {
pub fn push_recv(&mut self, s: Signal<T>) {
self.recv_wait.push_back(s);
}

#[inline(always)]
pub fn cancel_send_signal(&mut self, sig: Signal<T>) -> bool {
for (i, send) in self.send_wait.iter().enumerate() {
if sig == *send {
self.send_wait.remove(i);
return true;
}
}
false
}

#[inline(always)]
pub fn cancel_recv_signal(&mut self, sig: Signal<T>) -> bool {
for (i, recv) in self.recv_wait.iter().enumerate() {
if sig == *recv {
self.recv_wait.remove(i);
return true;
}
}
false
}
}

impl<T> Drop for ChannelInternal<T> {
Expand Down
2 changes: 1 addition & 1 deletion src/kanal_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod tests {
Sender,
};

const MESSAGES: usize = 100000;
const MESSAGES: usize = 1000000;
const THREADS: usize = 8;

fn new<T>(cap: Option<usize>) -> (Sender<T>, Receiver<T>) {
Expand Down
Loading

0 comments on commit 250289f

Please sign in to comment.