Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous I/O is not implemented correctly #2

Closed
sippykup opened this issue Aug 24, 2010 · 4 comments
Closed

Asynchronous I/O is not implemented correctly #2

sippykup opened this issue Aug 24, 2010 · 4 comments

Comments

@sippykup
Copy link

WebSocket's send() method is wrong. The write() operation on an asynchronous SocketChannel will only send as many bytes as can be written to the socket's buffer without blocking (which is what makes it asynchronous). So you can't simply assume that you can just call write() once and you're done. There may still be data in the ByteBuffer that needs to be sent. You need to call remaining() on the ByteBuffer to check this.

I'm attaching a rather large patch that implements the asynchronous I/O the right way. So far, it's working for me.

@sippykup
Copy link
Author

Sorry, never used GitHub before. How the hell do I submit a patch? Just paste it into a comment? Guess that's what I'll do. Sorry if I'm missing something.

@sippykup
Copy link
Author

diff -r ./net/tootallnate/websocket/WebSocketClient.java ../../../TooTallNate-Java-WebSocket-bdf470d/src/net/tootallnate/websocket/WebSocketClient.java
14a15

import java.util.concurrent.LinkedBlockingQueue;
122c123

< this.conn = new WebSocket(client, this);

  this.conn = new WebSocket(client, new LinkedBlockingQueue<ByteBuffer>(), this);

diff -r ./net/tootallnate/websocket/WebSocket.java ../../../TooTallNate-Java-WebSocket-bdf470d/src/net/tootallnate/websocket/WebSocket.java
5a6,7
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
8a11
import java.util.concurrent.BlockingQueue;
78a82,90
/**

  • Queue of buffers that need to be sent to the client.
    /
    private BlockingQueue bufferQueue;
    /
    *

  • Lock object to ensure that data is sent from the bufferQueue in

  • the proper order
    */
    private Object bufferQueueMutex = new Object();
    86a99,100

  • @param bufferQueue The Queue that we should use to buffer data that

  •                 hasn't been sent to the client yet.
    

    90c104,106
    < WebSocket(SocketChannel socketChannel, WebSocketListener listener) {

    WebSocket(SocketChannel socketChannel, BlockingQueue bufferQueue,
    WebSocketListener listener)
    {
    91a108
    this.bufferQueue = bufferQueue;
    136c153,157
    < public void send(String text) throws IOException {

    /**

  • @return True if all of the text was sent to the client by this thread.

  •  False if some of the text had to be buffered to be sent later.
    

    */
    public boolean send(String text) throws IOException {
    145a167
    b.rewind();
    147,149c169,184
    < // Write the ByteBuffer to the socket
    < b.rewind();

    < this.socketChannel.write(b);

    // See if we have any backlog that needs to be sent first
    if (handleWrite()) {
    // Write the ByteBuffer to the socket
    this.socketChannel.write(b);
    }

    // If we didn't get it all sent, add it to the buffer of buffers
    if (b.remaining() > 0) {
    if (!this.bufferQueue.offer(b)) {
    throw new IOException("Buffers are full, message could not be sent to" +
    this.socketChannel.socket().getRemoteSocketAddress());
    }
    return false;
    }

    return true;
    151a187,210
    boolean hasBufferedData() {
    return !this.bufferQueue.isEmpty();
    }

    /**

  • @return True if all data has been sent to the client, false if there

  •  is still some buffered.
    

    */
    boolean handleWrite() throws IOException {
    synchronized (this.bufferQueueMutex) {
    ByteBuffer buffer = this.bufferQueue.peek();
    while (buffer != null) {
    this.socketChannel.write(buffer);
    if (buffer.remaining() > 0) {
    return false; // Didn't finish this buffer. There's more to send.
    } else {
    this.bufferQueue.poll(); // Buffer finished. Remove it.
    buffer = this.bufferQueue.peek();
    }
    }
    return true;
    }
    }

257a317

diff -r ./net/tootallnate/websocket/WebSocketServer.java ../../../TooTallNate-Java-WebSocket-bdf470d/src/net/tootallnate/websocket/WebSocketServer.java
13a14
import java.util.concurrent.BlockingQueue;
14a16
import java.util.concurrent.LinkedBlockingQueue;
169a172,183
/**

  • @return A BlockingQueue that should be used by a WebSocket
  •  to hold data that is waiting to be sent to the client.
    
  •  The default implementation returns an unbounded
    
  •  LinkedBlockingQueue, but you may choose to override
    
  •  this to return a bounded queue to protect against
    
  •  running out of memory.
    
    */
    protected BlockingQueue newBufferQueue() {
    return new LinkedBlockingQueue();
    }

182,208c196,250
< selector.select();
< Set keys = selector.selectedKeys();
< Iterator i = keys.iterator();
<
< while(i.hasNext()) {
< SelectionKey key = i.next();
<
< // Remove the current key
< i.remove();
<
< // if isAccetable == true
< // then a client required a connection
< if (key.isAcceptable()) {
< SocketChannel client = server.accept();
< client.configureBlocking(false);
< WebSocket c = new WebSocket(client, this);
< client.register(selector, SelectionKey.OP_READ, c);
< }
<
< // if isReadable == true
< // then the server is ready to read
< if (key.isReadable()) {
< WebSocket conn = (WebSocket)key.attachment();
< conn.handleRead();
< }
< }

< }

          try {
      selector.select(100L);
      Set<SelectionKey> keys = selector.selectedKeys();
      Iterator<SelectionKey> i = keys.iterator();

      while(i.hasNext()) {
        SelectionKey key = i.next();

        // Remove the current key
        i.remove();

        // if isAcceptable == true
        // then a client required a connection
        if (key.isAcceptable()) {
          SocketChannel client = server.accept();
          client.configureBlocking(false);
          WebSocket c = new WebSocket(client, newBufferQueue(), this);
          client.register(selector, SelectionKey.OP_READ, c);
        }

        // if isReadable == true
        // then the server is ready to read
        if (key.isReadable()) {
          WebSocket conn = (WebSocket)key.attachment();
          conn.handleRead();
        }

                  // if isWritable == true
                  // then we need to send the rest of the data to the client
                  if (key.isWritable()) {
                      WebSocket conn = (WebSocket)key.attachment();
                      if (conn.handleWrite()) {
                          conn.socketChannel().register(selector,
                                  SelectionKey.OP_READ, conn);
                      }
                  }
      }

              for (WebSocket conn : this.connections) {
                  // We have to do this check here, and not in the thread that
                  // adds the buffered data to the WebSocket, because the
                  // Selector is not thread-safe, and can only be accessed
                  // by this thread.
                  if (conn.hasBufferedData()) {
                      conn.socketChannel().register(selector,
                              SelectionKey.OP_READ | SelectionKey.OP_WRITE, conn);
                  }
              }

    } catch (IOException e) {
              e.printStackTrace();
          } catch (RuntimeException e) {
              e.printStackTrace();
          }
      }

@TooTallNate
Copy link
Owner

Thanks for the fix. I suggest that you "fork" the repository, which will make a copy of this one under your profile, which you have write access to. From there you can push your fix to GitHub yourself, which I can review and pull.

@TooTallNate
Copy link
Owner

Ok, I pulled your patch. Looks good! Thanks for spotting!

orishoshan referenced this issue in Rookout/Java-WebSocket Mar 26, 2020
orishoshan referenced this issue in Rookout/Java-WebSocket Mar 27, 2020
marci4 pushed a commit that referenced this issue Mar 25, 2022
Issue-1160 Put whole Error into Exception during Error processing, not just stack trace
This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants