Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZmqError::BufferFull unhandled #145

Closed
njeisecke opened this issue Oct 29, 2021 · 2 comments · Fixed by #146
Closed

ZmqError::BufferFull unhandled #145

njeisecke opened this issue Oct 29, 2021 · 2 comments · Fixed by #146

Comments

@njeisecke
Copy link

Occasionally a panic is raised with a backtrace like this:

[/Users/njeiseck/.cargo/registry/src/github.com-1ecc6299db9ec823/zeromq-0.2.2/src/pub.rs:185] e = BufferFull(
    "Sink is full",
)
thread 'main' panicked at 'not yet implemented', /Users/njeiseck/.cargo/registry/src/github.com-1ecc6299db9ec823/zeromq-0.2.2/src/pub.rs:186:29
stack backtrace:
   0: rust_begin_unwind
             at /rustc/09c42c45858d5f3aedfa670698275303a3d19afa/library/std/src/panicking.rs:517:5
   1: core::panicking::panic_fmt
             at /rustc/09c42c45858d5f3aedfa670698275303a3d19afa/library/core/src/panicking.rs:101:14
   2: core::panicking::panic
             at /rustc/09c42c45858d5f3aedfa670698275303a3d19afa/library/core/src/panicking.rs:50:5
   3: <zeromq::pub::PubSocket as zeromq::SocketSend>::send::{{closure}}
             at /Users/njeiseck/.cargo/registry/src/github.com-1ecc6299db9ec823/zeromq-0.2.2/src/pub.rs:186:29
   4: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/09c42c45858d5f3aedfa670698275303a3d19afa/library/core/src/future/mod.rs:80:19
   5: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/09c42c45858d5f3aedfa670698275303a3d19afa/library/core/src/future/future.rs:119:9
   6: lg::main::{{closure}}
             at ./src/main.rs:546:38
   7: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/09c42c45858d5f3aedfa670698275303a3d19afa/library/core/src/future/mod.rs:80:19
   8: tokio::park::thread::CachedParkThread::block_on::{{closure}}
             at /Users/njeiseck/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.12.0/src/park/thread.rs:263:54
   9: tokio::coop::with_budget::{{closure}}
             at /Users/njeiseck/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.12.0/src/coop.rs:106:9
  10: std::thread::local::LocalKey<T>::try_with
             at /rustc/09c42c45858d5f3aedfa670698275303a3d19afa/library/std/src/thread/local.rs:399:16
  11: std::thread::local::LocalKey<T>::with
             at /rustc/09c42c45858d5f3aedfa670698275303a3d19afa/library/std/src/thread/local.rs:375:9
  12: tokio::coop::with_budget
             at /Users/njeiseck/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.12.0/src/coop.rs:99:5
  13: tokio::coop::budget
             at /Users/njeiseck/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.12.0/src/coop.rs:76:5
  14: tokio::park::thread::CachedParkThread::block_on
             at /Users/njeiseck/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.12.0/src/park/thread.rs:263:31
  15: tokio::runtime::enter::Enter::block_on
             at /Users/njeiseck/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.12.0/src/runtime/enter.rs:151:13
  16: tokio::runtime::thread_pool::ThreadPool::block_on
             at /Users/njeiseck/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.12.0/src/runtime/thread_pool/mod.rs:77:9
  17: tokio::runtime::Runtime::block_on
             at /Users/njeiseck/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.12.0/src/runtime/mod.rs:463:43
  18: lg::main
             at ./src/main.rs:368:5
  19: core::ops::function::FnOnce::call_once
             at /rustc/09c42c45858d5f3aedfa670698275303a3d19afa/library/core/src/ops/function.rs:227:5

Obviously ZmqError::BufferFull is not handled yet:

                    let res = subscriber
                        .send_queue
                        .as_mut()
                        .try_send(Message::Message(message.clone()));
                    match res {
                        Ok(()) => {}
                        Err(ZmqError::Codec(CodecError::Io(e))) => {
                            if e.kind() == ErrorKind::BrokenPipe {
                                dead_peers.push(subscriber.key().clone());
                            } else {
                                dbg!(e);
                            }
                        }
                        Err(e) => {
                            dbg!(e);
                            todo!() // oops <<<
                        }
                    }

This is where this error is thrown:

impl TrySend for ZmqFramedWrite {
    fn try_send(mut self: Pin<&mut Self>, item: Message) -> ZmqResult<()> {
        let waker = futures::task::noop_waker();
        let mut cx = futures::task::Context::from_waker(&waker);
        match self.as_mut().poll_ready(&mut cx) {
            Poll::Ready(Ok(())) => {
                self.as_mut().start_send(item)?;
                let _ = self.as_mut().poll_flush(&mut cx); // ignore result just hope that it flush eventually
                Ok(())
            }
            Poll::Ready(Err(e)) => Err(e.into()),
            Poll::Pending => Err(ZmqError::BufferFull("Sink is full")),
        }
    }
}

I've no clue what "the right thing to do" could be in this situation. Would it make sense to handle this like a broken pipe?

@Alexei-Kornienko
Copy link
Collaborator

It seems that in this case one of the subscribers is not able to keep up with the rate the messages are produced.
I think the right way to handle it would be to ignore this error. This will lead to this particular message to be lost for this subscriber but in theory he can read older messages from the buffer and when there will be some space in it he will be able to receive new messages. (this is compliant to spec cause - https://rfc.zeromq.org/spec/29/ it says that "SHALL silently drop the message if the queue for a subscriber is full.")

njeisecke pushed a commit to njeisecke/zmq.rs that referenced this issue Oct 29, 2021
…mq specification)

This is a situation which can occur when subscribers read too slow.

Fixes zeromq#145.
njeisecke pushed a commit to njeisecke/zmq.rs that referenced this issue Oct 29, 2021
This is a situation which can occur when subscribers read too slow.

Complies to the zmq specification (https://rfc.zeromq.org/spec/29/).

Fixes zeromq#145.
@njeisecke
Copy link
Author

Thanks for your quick response. I've created a pull request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants