Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio_util::io::StreamReader uses incorrect generic parameter for Sink impl #6642

Closed
eric-seppanen opened this issue Jun 14, 2024 · 0 comments · Fixed by #6647
Closed

tokio_util::io::StreamReader uses incorrect generic parameter for Sink impl #6642

eric-seppanen opened this issue Jun 14, 2024 · 0 comments · Fixed by #6647
Labels
A-tokio-util Area: The tokio-util crate C-bug Category: This is a bug. M-io Module: tokio/io

Comments

@eric-seppanen
Copy link
Contributor

eric-seppanen commented Jun 14, 2024

Version

tokio v1.38.0
tokio-stream v0.1.15
tokio-util v0.7.11

Platform

Linux 6.2.0-39-generic #40~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Thu Nov 16 10:53:04 UTC 2 x86_64 x86_64 x86_64 GNU/Linux

Description

I think that the generic parameters are used incorrectly where tokio-util's StreamReader implements Sink:

impl<S: Sink<T, Error = E>, E, T> Sink<T> for StreamReader<S, E> {
type Error = E;

It is written as though StreamReader's second parameter is its error type, which is not correct-- the second parameter is its buffer type.

This breaks the ability to use StreamReader and SinkWriter together. In this example the error message makes it clear that the types have become confused:

use std::io::{self, Cursor};

use futures_sink::Sink;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::Stream;
use tokio_util::io::{SinkWriter, StreamReader};

pub struct Foo;

impl Foo {
    pub fn into_async_rw(self) -> impl AsyncRead + AsyncWrite {
        // This is broken because StreamReader doesn't correctly pass through the Sink impl types.
        SinkWriter::new(StreamReader::new(self))
    }
}

impl Stream for Foo {
    type Item = io::Result<Cursor<Vec<u8>>>;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        todo!()
    }
}

impl<'a> Sink<&'a [u8]> for Foo {
    type Error = io::Error;

    fn poll_ready(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        todo!()
    }

    fn start_send(self: std::pin::Pin<&mut Self>, _item: &'a [u8]) -> Result<(), Self::Error> {
        todo!()
    }

    fn poll_flush(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        todo!()
    }

    fn poll_close(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        todo!()
    }
}
error[E0271]: type mismatch resolving `<Foo as Sink<&[u8]>>::Error == Cursor<Vec<u8>>`
  --> src/lib.rs:11:35
   |
11 |     pub fn into_async_rw(self) -> impl AsyncRead + AsyncWrite {
   |                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^ type mismatch resolving `<Foo as Sink<&[u8]>>::Error == Cursor<Vec<u8>>`
   |
note: expected this to be `std::io::Error`
  --> src/lib.rs:28:18
   |
28 |     type Error = io::Error;
   |                  ^^^^^^^^^
   = note: expected struct `std::io::Error`
              found struct `std::io::Cursor<Vec<u8>>`
   = note: required for `StreamReader<Foo, std::io::Cursor<Vec<u8>>>` to implement `for<'a> futures_sink::Sink<&'a [u8]>`
   = note: required for `SinkWriter<StreamReader<Foo, std::io::Cursor<Vec<u8>>>>` to implement `AsyncWrite`

error[E0277]: the trait bound `std::io::Error: From<std::io::Cursor<Vec<u8>>>` is not satisfied
  --> src/lib.rs:11:35
   |
11 |     pub fn into_async_rw(self) -> impl AsyncRead + AsyncWrite {
   |                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `From<std::io::Cursor<Vec<u8>>>` is not implemented for `std::io::Error`, which is required by `SinkWriter<StreamReader<Foo, std::io::Cursor<Vec<u8>>>>: AsyncWrite`
   |
   = help: the following other types implement trait `From<T>`:
             <std::io::Error as From<tokio_stream::Elapsed>>
             <std::io::Error as From<tokio::time::error::Elapsed>>
             <std::io::Error as From<TryReserveError>>
             <std::io::Error as From<NulError>>
             <std::io::Error as From<IntoInnerError<W>>>
             <std::io::Error as From<ErrorKind>>
   = note: required for `std::io::Cursor<Vec<u8>>` to implement `Into<std::io::Error>`
   = note: required for `SinkWriter<StreamReader<Foo, std::io::Cursor<Vec<u8>>>>` to implement `AsyncWrite`

@eric-seppanen eric-seppanen added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Jun 14, 2024
@Darksonn Darksonn added A-tokio-util Area: The tokio-util crate M-io Module: tokio/io and removed A-tokio Area: The main tokio crate labels Jun 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio-util Area: The tokio-util crate C-bug Category: This is a bug. M-io Module: tokio/io
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants