Skip to content

Commit

Permalink
Simplify code using bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
en committed Aug 10, 2017
1 parent 108bfce commit 4322de1
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 120 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ version = "0.1.0"
authors = ["Yuanchao Sun <[email protected]>"]

[dependencies]
byteorder = "1"
bytes = "0.4"
env_logger = { version = "0.4", default-features = false }
futures = "0.1"
Expand Down
177 changes: 61 additions & 116 deletions src/kcp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::cmp;
use std::collections::VecDeque;
use std::io::{self, Error, ErrorKind, Write};
use std::io::{self, Cursor, Error, ErrorKind, Write};

use bytes::{Buf, BufMut, BytesMut, LittleEndian};

const KCP_RTO_NDL: u32 = 30; // no delay min rto
const KCP_RTO_MIN: u32 = 100; // normal min rto
Expand Down Expand Up @@ -96,7 +98,7 @@ pub struct KCP<W: Write> {
acklist: Vec<(u32, u32)>,

// user: String,
buffer: Vec<u8>,
buffer: BytesMut,

fastresend: u32,

Expand All @@ -110,7 +112,7 @@ impl<W: Write> KCP<W> {
/// create a new kcp control object, `conv` must equal in two endpoint
/// from the same connection. `user` will be passed to the output callback
pub fn new(conv: u32, output: W) -> KCP<W> {
let mut kcp = KCP {
KCP {
// state: 0,
snd_una: 0,
snd_nxt: 0,
Expand Down Expand Up @@ -139,7 +141,7 @@ impl<W: Write> KCP<W> {
mtu: KCP_MTU_DEF,
mss: KCP_MTU_DEF - KCP_OVERHEAD,
// user: user,
buffer: Vec::new(),
buffer: BytesMut::with_capacity(((KCP_MTU_DEF + KCP_OVERHEAD) * 3) as usize),
snd_queue: VecDeque::new(),
rcv_queue: VecDeque::new(),
snd_buf: VecDeque::new(),
Expand All @@ -151,12 +153,7 @@ impl<W: Write> KCP<W> {
ts_flush: KCP_INTERVAL,
ssthresh: KCP_THRESH_INIT, // dead_link: KCP_DEADLINK,
output: output,
};
kcp.buffer.resize(
((KCP_MTU_DEF + KCP_OVERHEAD) * 3) as usize,
0,
);
kcp
}
}

/// user/upper level recv: returns size, returns Err for EAGAIN
Expand All @@ -176,12 +173,10 @@ impl<W: Write> KCP<W> {
let recover = self.rcv_queue.len() >= self.rcv_wnd as usize;

// merge fragment
let mut p: usize = 0;
let mut buf = Cursor::new(buf);
let mut index: usize = 0;
for seg in &self.rcv_queue {
let l = seg.data.len();
buf[p..p + l].copy_from_slice(&seg.data[..]);
p += l;
buf.write_all(&seg.data)?;
index += 1;
if seg.frg == 0 {
break;
Expand All @@ -191,7 +186,7 @@ impl<W: Write> KCP<W> {
let new_rcv_queue = self.rcv_queue.split_off(index);
self.rcv_queue = new_rcv_queue;
}
assert!(p == peeksize);
assert!(buf.position() as usize == peeksize);

// move available data from rcv_buf -> rcv_queue
index = 0;
Expand All @@ -218,7 +213,7 @@ impl<W: Write> KCP<W> {
// tell remote my window size
self.probe |= KCP_ASK_TELL;
}
Ok(p)
Ok(buf.position() as usize)
}

/// check the size of next message in the recv queue
Expand Down Expand Up @@ -414,36 +409,35 @@ impl<W: Write> KCP<W> {

/// when you received a low level packet (eg. UDP packet), call it
pub fn input(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut n = buf.len();
if n < KCP_OVERHEAD as usize {
let n = buf.len();
let mut buf = Cursor::new(buf);

if buf.remaining() < KCP_OVERHEAD as usize {
return Err(Error::new(ErrorKind::InvalidData, "invalid data"));
}
let old_una = self.snd_una;
let mut p: usize = 0;
let mut flag = false;
let mut maxack: u32 = 0;
loop {
if n < KCP_OVERHEAD as usize {
if buf.remaining() < KCP_OVERHEAD as usize {
break;
}

let conv = decode32u(buf, &mut p);
let conv = buf.get_u32::<LittleEndian>();
if conv != self.conv {
return Err(Error::new(ErrorKind::InvalidData, "invalid data"));
}

let cmd = decode8u(buf, &mut p);
let frg = decode8u(buf, &mut p);
let wnd = decode16u(buf, &mut p);
let ts = decode32u(buf, &mut p);
let sn = decode32u(buf, &mut p);
let una = decode32u(buf, &mut p);
let len = decode32u(buf, &mut p);

n -= KCP_OVERHEAD as usize;
let cmd = buf.get_u8();
let frg = buf.get_u8();
let wnd = buf.get_u16::<LittleEndian>();
let ts = buf.get_u32::<LittleEndian>();
let sn = buf.get_u32::<LittleEndian>();
let una = buf.get_u32::<LittleEndian>();
let len = buf.get_u32::<LittleEndian>();

let len = len as usize;
if n < len {
if buf.remaining() < len {
return Err(Error::new(ErrorKind::UnexpectedEof, "unexpected EOF"));
}

Expand Down Expand Up @@ -484,7 +478,8 @@ impl<W: Write> KCP<W> {
seg.ts = ts;
seg.sn = sn;
seg.una = una;
seg.data.extend_from_slice(&buf[p..p + len]);
seg.data.resize(len, 0);
buf.copy_to_slice(&mut seg.data);
self.parse_data(seg);
}
}
Expand All @@ -497,9 +492,6 @@ impl<W: Write> KCP<W> {
} else {
return Err(Error::new(ErrorKind::InvalidData, "invalid data"));
}

p += len;
n -= len;
}
if flag {
self.parse_fastack(maxack);
Expand All @@ -526,7 +518,7 @@ impl<W: Write> KCP<W> {
}
}
}
Ok(p)
Ok(n - buf.remaining())
}

fn wnd_unused(&self) -> u32 {
Expand All @@ -546,7 +538,6 @@ impl<W: Write> KCP<W> {
let current = self.current;
let mut lost = false;
let mut change = false;
let mut p: usize = 0;
let mut seg = Segment::new(0);

seg.conv = self.conv;
Expand All @@ -556,13 +547,13 @@ impl<W: Write> KCP<W> {

// flush acknowledges
for ack in &self.acklist {
if p as u32 + KCP_OVERHEAD > self.mtu {
self.output.write_all(&self.buffer[..p]);
p = 0;
if self.buffer.remaining_mut() as u32 + KCP_OVERHEAD > self.mtu {
self.output.write_all(&self.buffer);
self.buffer.clear();
}
seg.sn = ack.0;
seg.ts = ack.1;
encode_seg(&mut self.buffer, &mut p, &seg);
encode_seg(&mut self.buffer, &seg);
}
self.acklist.clear();

Expand Down Expand Up @@ -592,21 +583,21 @@ impl<W: Write> KCP<W> {
// flush window probing commands
if (self.probe & KCP_ASK_SEND) != 0 {
seg.cmd = KCP_CMD_WASK;
if p as u32 + KCP_OVERHEAD > self.mtu {
self.output.write_all(&self.buffer[..p]);
p = 0;
if self.buffer.remaining_mut() as u32 + KCP_OVERHEAD > self.mtu {
self.output.write_all(&self.buffer);
self.buffer.clear();
}
encode_seg(&mut self.buffer, &mut p, &seg);
encode_seg(&mut self.buffer, &seg);
}

// flush window probing commands
if (self.probe & KCP_ASK_TELL) != 0 {
seg.cmd = KCP_CMD_WINS;
if p as u32 + KCP_OVERHEAD > self.mtu {
self.output.write_all(&self.buffer[..p]);
p = 0;
if self.buffer.remaining_mut() as u32 + KCP_OVERHEAD > self.mtu {
self.output.write_all(&self.buffer);
self.buffer.clear();
}
encode_seg(&mut self.buffer, &mut p, &seg);
encode_seg(&mut self.buffer, &seg);
}
self.probe = 0;

Expand Down Expand Up @@ -682,16 +673,11 @@ impl<W: Write> KCP<W> {
let len = segment.data.len();
let need = KCP_OVERHEAD as usize + len;

if p + need > self.mtu as usize {
self.output.write_all(&self.buffer[..p]);
p = 0;
}
encode_seg(&mut self.buffer, &mut p, &segment);

if len > 0 {
&self.buffer[p..p + len].copy_from_slice(&segment.data[..]);
p += len;
if self.buffer.remaining_mut() + need > self.mtu as usize {
self.output.write_all(&self.buffer);
self.buffer.clear();
}
encode_seg(&mut self.buffer, &segment);

// never used
// if segment.xmit >= self.dead_link {
Expand All @@ -701,8 +687,9 @@ impl<W: Write> KCP<W> {
}

// flash remain segments
if p > 0 {
self.output.write_all(&self.buffer[..p]);
if self.buffer.remaining_mut() > 0 {
self.output.write_all(&self.buffer);
self.buffer.clear();
}

// update ssthresh
Expand Down Expand Up @@ -802,7 +789,10 @@ impl<W: Write> KCP<W> {
}
self.mtu = mtu;
self.mss = self.mtu - KCP_OVERHEAD;
self.buffer.resize(((mtu + KCP_OVERHEAD) * 3) as usize, 0);
let additional = ((mtu + KCP_OVERHEAD) * 3) as usize - self.buffer.capacity();
if additional > 0 {
self.buffer.reserve(additional);
}
true
}

Expand Down Expand Up @@ -862,59 +852,14 @@ fn bound(lower: u32, v: u32, upper: u32) -> u32 {
cmp::min(cmp::max(lower, v), upper)
}

#[inline]
fn encode8u(buf: &mut [u8], p: &mut usize, n: u8) {
buf[*p] = n;
*p += 1;
}

#[inline]
fn decode8u(buf: &[u8], p: &mut usize) -> u8 {
let n = buf[*p];
*p += 1;
n
}

#[inline]
fn encode16u(buf: &mut [u8], p: &mut usize, n: u16) {
let n = n.to_le();
buf[*p] = n as u8;
buf[*p + 1] = (n >> 8) as u8;
*p += 2;
}

#[inline]
fn decode16u(buf: &[u8], p: &mut usize) -> u16 {
let n = (buf[*p] as u16) | (buf[*p + 1] as u16) << 8;
*p += 2;
u16::from_le(n)
}

#[inline]
fn encode32u(buf: &mut [u8], p: &mut usize, n: u32) {
let n = n.to_le();
buf[*p] = n as u8;
buf[*p + 1] = (n >> 8) as u8;
buf[*p + 2] = (n >> 16) as u8;
buf[*p + 3] = (n >> 24) as u8;
*p += 4;
}

#[inline]
fn decode32u(buf: &[u8], p: &mut usize) -> u32 {
let n = (buf[*p] as u32) | (buf[*p + 1] as u32) << 8 | (buf[*p + 2] as u32) << 16 |
(buf[*p + 3] as u32) << 24;
*p += 4;
u32::from_le(n)
}

fn encode_seg(buf: &mut [u8], p: &mut usize, seg: &Segment) {
encode32u(buf, p, seg.conv);
encode8u(buf, p, seg.cmd as u8);
encode8u(buf, p, seg.frg as u8);
encode16u(buf, p, seg.wnd as u16);
encode32u(buf, p, seg.ts);
encode32u(buf, p, seg.sn);
encode32u(buf, p, seg.una);
encode32u(buf, p, seg.data.len() as u32);
fn encode_seg(buf: &mut BytesMut, seg: &Segment) {
buf.put_u32::<LittleEndian>(seg.conv);
buf.put::<u8>(seg.cmd as u8);
buf.put::<u8>(seg.frg as u8);
buf.put_u16::<LittleEndian>(seg.wnd as u16);
buf.put_u32::<LittleEndian>(seg.ts);
buf.put_u32::<LittleEndian>(seg.sn);
buf.put_u32::<LittleEndian>(seg.una);
buf.put_u32::<LittleEndian>(seg.data.len() as u32);
buf.put_slice(&seg.data);
}
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ extern crate rand;
extern crate tokio_core;
extern crate tokio_io;
extern crate time;
extern crate byteorder;
extern crate bytes;
extern crate time as ctime;
extern crate mio;
Expand All @@ -22,13 +21,12 @@ use std::collections::HashMap;
use std::time::{Duration, Instant};
use std::rc::Rc;

use bytes::{Buf, BufMut};
use bytes::{Buf, BufMut, ByteOrder, LittleEndian};
use futures::{Poll, Async, Future};
use futures::stream::Stream;
use tokio_core::net::UdpSocket;
use tokio_core::reactor::{Handle, PollEvented, Timeout};
use tokio_io::{AsyncRead, AsyncWrite};
use byteorder::{ByteOrder, LittleEndian};
use mio::{Ready, Registration, PollOpt, Token, SetReadiness};
use mio::event::Evented;
use iovec::IoVec;
Expand Down

0 comments on commit 4322de1

Please sign in to comment.