Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add recv_many for mpsc channels #6010

Merged
merged 37 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f13ea3e
Method recv_many returns a Vec<T> for mpsc channels
Jul 1, 2023
0133425
Update recv_many to populate a passed-in buffer.
Sep 9, 2023
7d0d8db
fix rustfmt issues
Sep 15, 2023
487f887
fix returning the result of a binding from a block
Sep 15, 2023
7086b34
fix rustfmt braces around continue
Sep 15, 2023
9a170b0
Update from bencher and merge benchmarks into sync_mpsc.rs
Sep 16, 2023
01b2270
fix Cargo.toml to reflect deletion of old benchmark
Sep 16, 2023
8c8b06f
Simplified recv_many implementation
Sep 25, 2023
dc81be3
Fix test send_recv_unbounded for arbitrarty BLOCK_CAP values
Sep 25, 2023
8f5b00c
Fix for rustfmt
Sep 25, 2023
3c1dc33
switch to break to exit loop
Sep 26, 2023
757bc33
resolve return value 0 for recv_many
Sep 27, 2023
0948857
Change to recv_many to no longer clear the output buffer
Sep 27, 2023
1dc2dbf
Filled buffer handling now fail-safe, not fail-fast
Sep 28, 2023
6bf430c
improved doctest example
Sep 30, 2023
cdc3665
fixed nested if; resolve documentation issues for recv_many
Sep 30, 2023
4c1321f
Update tokio/src/sync/mpsc/unbounded.rs
aschweig Oct 1, 2023
27ae226
Update tokio/src/sync/mpsc/unbounded.rs
aschweig Oct 1, 2023
9a1d9bc
Updated doctests to be similar; clarified documentation
Oct 1, 2023
7fca954
accomodate that with_capacity(n) gives capacity of at least n
Oct 1, 2023
fe44b32
Improved documentation for `recv_many` and added test
Oct 1, 2023
aac0eda
Update tokio/src/sync/mpsc/bounded.rs
aschweig Oct 1, 2023
088537c
Update tokio/src/sync/mpsc/bounded.rs
aschweig Oct 1, 2023
effb0e5
change recv_many doctests to use drop not scope
Oct 1, 2023
1fb08df
adopt improved wording on buffer capacity behavior
Oct 1, 2023
bd1ef3a
Only reserve if no unused capacity and there is 1+ value(s) to receive
Oct 1, 2023
d31fd17
Updated `recv_many` to accept a new argument `limit`
Oct 2, 2023
3cd2e46
rename to_go to remaining
Oct 2, 2023
326fa53
Change behavior of recv_many(buf, 0) to return 0 immediately.
Oct 8, 2023
e8950ce
Fix doc issues
Oct 8, 2023
ead4ca6
Update tokio/src/sync/mpsc/chan.rs
aschweig Oct 11, 2023
2fcb715
Update tokio/src/sync/mpsc/chan.rs
aschweig Oct 11, 2023
ba46b7c
check the coop budget in limit==0 case
Oct 11, 2023
b1d1cae
clean up some recv_many tests
Oct 11, 2023
f162ca7
Update tokio/src/sync/mpsc/bounded.rs
aschweig Oct 11, 2023
9f39dc7
Reflow doc comments; adjust comment in chan.rs
Oct 11, 2023
27e39d9
Merge branch 'master' into recv_many
Darksonn Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update from bencher and merge benchmarks into sync_mpsc.rs
Previous benchmarks in sync_mpsc_recv_many.rs included a
contrived example that is removed.
  • Loading branch information
Aaron Schweiger authored and aschweig committed Oct 9, 2023
commit 9a170b0856ec9fcec921e4e5a827d4d059015b35
130 changes: 130 additions & 0 deletions benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,33 @@ fn contention_bounded(g: &mut BenchmarkGroup<WallTime>) {
});
}

fn contention_bounded_recv_many(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

g.bench_function("bounded_recv_many", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<usize>(1_000_000);

for _ in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..1000 {
tx.send(i).await.unwrap();
}
});
}

let mut buffer = Vec::<usize>::with_capacity(5_000);
let mut total = 0;
while total < 1_000 * 5 {
total += rx.recv_many(&mut buffer).await;
}
})
})
});
}

fn contention_bounded_full(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

Expand All @@ -98,6 +125,33 @@ fn contention_bounded_full(g: &mut BenchmarkGroup<WallTime>) {
});
}

fn contention_bounded_full_recv_many(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

g.bench_function("bounded_full_recv_many", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<usize>(100);

for _ in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..1000 {
tx.send(i).await.unwrap();
}
});
}

let mut buffer = Vec::<usize>::with_capacity(5_000);
let mut total = 0;
while total < 1_000 * 5 {
total += rx.recv_many(&mut buffer).await;
}
})
})
});
}

fn contention_unbounded(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

Expand All @@ -123,6 +177,33 @@ fn contention_unbounded(g: &mut BenchmarkGroup<WallTime>) {
});
}

fn contention_unbounded_recv_many(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

g.bench_function("unbounded_recv_many", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::unbounded_channel::<usize>();

for _ in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..1000 {
tx.send(i).unwrap();
}
});
}

let mut buffer = Vec::<usize>::with_capacity(5_000);
let mut total = 0;
while total < 1_000 * 5 {
total += rx.recv_many(&mut buffer).await;
}
})
})
});
}

fn uncontented_bounded(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

Expand All @@ -143,6 +224,28 @@ fn uncontented_bounded(g: &mut BenchmarkGroup<WallTime>) {
});
}

fn uncontented_bounded_recv_many(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

g.bench_function("bounded_recv_many", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<usize>(1_000_000);

for i in 0..5000 {
tx.send(i).await.unwrap();
}

let mut buffer = Vec::<usize>::with_capacity(5_000);
let mut total = 0;
while total < 1_000 * 5 {
total += rx.recv_many(&mut buffer).await;
}
})
})
});
}

fn uncontented_unbounded(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

Expand All @@ -163,6 +266,28 @@ fn uncontented_unbounded(g: &mut BenchmarkGroup<WallTime>) {
});
}

fn uncontented_unbounded_recv_many(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

g.bench_function("unbounded_recv_many", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::unbounded_channel::<usize>();

for i in 0..5000 {
tx.send(i).unwrap();
}

let mut buffer = Vec::<usize>::with_capacity(5_000);
let mut total = 0;
while total < 1_000 * 5 {
total += rx.recv_many(&mut buffer).await;
}
})
})
});
}

fn bench_create_medium(c: &mut Criterion) {
let mut group = c.benchmark_group("create_medium");
create_medium::<1>(&mut group);
Expand All @@ -181,15 +306,20 @@ fn bench_send(c: &mut Criterion) {
fn bench_contention(c: &mut Criterion) {
let mut group = c.benchmark_group("contention");
contention_bounded(&mut group);
contention_bounded_recv_many(&mut group);
contention_bounded_full(&mut group);
contention_bounded_full_recv_many(&mut group);
contention_unbounded(&mut group);
contention_unbounded_recv_many(&mut group);
group.finish();
}

fn bench_uncontented(c: &mut Criterion) {
let mut group = c.benchmark_group("uncontented");
uncontented_bounded(&mut group);
uncontented_bounded_recv_many(&mut group);
uncontented_unbounded(&mut group);
uncontented_unbounded_recv_many(&mut group);
group.finish();
}

Expand Down
Loading