Skip to content

Commit

Permalink
Merge pull request #5697 from JuliaLang/kf/udp
Browse files Browse the repository at this point in the history
Implement UDP
  • Loading branch information
StefanKarpinski committed Feb 12, 2014
2 parents a89c793 + 3ed4cb1 commit 6585e3d
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 24 deletions.
4 changes: 4 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ export

# I/O and events
accept,
bind,
close,
connect,
countlines,
Expand Down Expand Up @@ -1085,9 +1086,11 @@ export
redirect_stderr,
redirect_stdin,
redirect_stdout,
recv,
seek,
seekend,
seekstart,
send,
serialize,
skip,
skipchars,
Expand All @@ -1103,6 +1106,7 @@ export
write,
writecsv,
writedlm,
UdpSocket,

# multiprocessing
addprocs,
Expand Down
151 changes: 128 additions & 23 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -300,19 +300,6 @@ function TcpServer()
this
end

#type UdpSocket <: Socket
# handle::Ptr{Void}
# open::Bool
# line_buffered::Bool
# buffer::IOBuffer
# readcb::Callback
# readnotify::Condition
# ccb::Callback
# connectnotify::Condition
# closecb::Callback
# closenotify::Condition
#end

isreadable(io::TcpSocket) = true
iswritable(io::TcpSocket) = true

Expand All @@ -321,9 +308,6 @@ show(io::IO,sock::TcpSocket) = print(io,"TcpSocket(",uv_status_string(sock),", "

show(io::IO,sock::TcpServer) = print(io,"TcpServer(",uv_status_string(sock),")")

#show(io::IO,sock::UdpSocket) = print(io,"UdpSocket(",uv_status_string(sock),", ",
# nb_available(sock.buffer)," bytes waiting)")

## VARIOUS METHODS TO BE MOVED TO BETTER LOCATION

_jl_connect_raw(sock::TcpSocket,sockaddr::Ptr{Void}) =
Expand All @@ -339,12 +323,56 @@ accept(server::PipeServer) = accept(server, Pipe())
##

bind(sock::TcpServer, addr::InetAddr) = bind(sock,addr.host,addr.port)
bind(sock::TcpServer, host::IpAddr, port) = bind(sock, InetAddr(host,port))

function bind(sock::TcpServer, host::IPv4, port::Uint16)
_bind(sock::TcpServer, host::IPv4, port::Uint16) = ccall(:jl_tcp_bind, Int32, (Ptr{Void}, Uint16, Uint32),
sock.handle, hton(port), hton(host.host))

_bind(sock::TcpServer, host::IPv6, port::Uint16) = ccall(:jl_tcp_bind6, Int32, (Ptr{Void}, Uint16, Ptr{Uint128}),
sock.handle, hton(port), &hton(host.host))

# UDP

type UdpSocket <: Socket
handle::Ptr{Void}
status::Int
recvnotify::Condition
sendnotify::Condition
closenotify::Condition
UdpSocket(handle::Ptr) = new(handle, StatusUninit, Condition(), Condition(), Condition())
end

function UdpSocket()
this = UdpSocket(c_malloc(_sizeof_uv_udp))
associate_julia_struct(this.handle, this)
err = ccall(:uv_udp_init,Cint,(Ptr{Void},Ptr{Void}),
eventloop(),this.handle)
if err != 0
c_free(this.handle)
this.handle = C_NULL
error(UVError("failed to create udp socket",err))
end
this.status = StatusInit
this
end

function _uv_hook_close(sock::UdpSocket)
sock.handle = 0
sock.status = StatusClosed
notify(sock.closenotify)
notify(sock.sendnotify)
notify_error(sock.recvnotify,EOFError())
end

# Disables dual stack mode. Only available when using ipv6 binf
const UV_UDP_IPV6ONLY = 1

# Indicates message was truncated because read buffer was too small. The
# remainder was discarded by the OS.
const UV_UDP_PARTIAL = 2

function bind(sock::Union(TcpServer,UdpSocket), host::IPv4, port::Integer)
@assert sock.status == StatusInit
err = ccall(:jl_tcp_bind, Int32, (Ptr{Void}, Uint16, Uint32),
sock.handle, hton(port), hton(host.host))
err = _bind(sock,host,uint16(port))
if err < 0
if err != UV_EADDRINUSE && err != UV_EACCES
error(UVError("bind",err))
Expand All @@ -356,10 +384,15 @@ function bind(sock::TcpServer, host::IPv4, port::Uint16)
true
end

function bind(sock::TcpServer, host::IPv6, port::Uint16)
_bind(sock::UdpSocket, host::IPv4, port::Uint16) = ccall(:jl_udp_bind, Int32, (Ptr{Void}, Uint16, Uint32, Uint32),
sock.handle, hton(port), hton(host.host), 0)

_bind(sock::UdpSocket, host::IPv6, port::Uint16, flags::Uint32 = uint32(0)) = ccall(:jl_udp_bind6, Int32, (Ptr{Void}, Uint16, Ptr{Uint128}, Uint32),
sock.handle, hton(port), &hton(host.host), flags)

function bind(sock::UdpSocket, host::IPv6, port::Uint16; ipv6only = false)
@assert sock.status == StatusInit
err = ccall(:jl_tcp_bind6, Int32, (Ptr{Void}, Uint16, Ptr{Uint128}),
sock.handle, hton(port), &hton(host.host))
err = _bind(sock,host,ipv6only ? UV_UDP_IPV6ONLY : 0)
if err < 0
if err != UV_EADDRINUSE && err != UV_EACCES
error(UVError("bind",err))
Expand All @@ -371,6 +404,78 @@ function bind(sock::TcpServer, host::IPv6, port::Uint16)
true
end


function setopt(sock::UdpSocket; multicast_loop = nothing, multicast_ttl=nothing, enable_broadcast=nothing, ttl=nothing)
if sock.status == StatusUninit
error("Cannot set options on unitialized socket")
end
if multicast_loop !== nothing
uv_error("multicast_loop",ccall(:uv_udp_set_multicast_loop,Cint,(Ptr{Void},Cint),sock.handle,multicast_loop) < 0)
end
if multicast_ttl !== nothing
uv_error("multicast_ttl",ccall(:uv_udp_set_multicast_ttl,Cint,(Ptr{Void},Cint),sock.handle,multicast_ttl))
end
if enable_broadcast !== nothing
uv_error("enable_broadcast",ccall(:uv_udp_set_broadcast,Cint,(Ptr{Void},Cint),sock.handle,enable_broadcast))
end
if ttl !== nothing
uv_error("ttl",ccall(:uv_udp_set_ttl,Cint,(Ptr{Void},Cint),sock.handle,ttl))
end
end

_uv_hook_alloc_buf(sock::UdpSocket,size::Int32) = (c_malloc(size),size)

_recv_start(sock::UdpSocket) = uv_error("recv_start",ccall(:uv_udp_recv_start,Cint,(Ptr{Void},Ptr{Void},Ptr{Void}),
sock.handle,cglobal(:jl_uv_alloc_buf),cglobal(:jl_uv_recvcb)))
_recv_stop(sock::UdpSocket) = uv_error("recv_stop",ccall(:uv_udp_recv_stop,Cint,(Ptr{Void},),sock.handle))

function recv(sock::UdpSocket)
# If the socket has not been bound, it will be bound implicitly to ::0 and a random port
if sock.status != StatusInit && sock.status != StatusOpen
error("Invalid socket state")
end
_recv_start(sock)
wait(sock.recvnotify)::Vector{Uint8}
end

function _uv_hook_recv(sock::UdpSocket, nread::Ptr{Void}, buf_addr::Ptr{Void}, buf_size::Int32, addr::Ptr{Void}, flags::Int32)
nread = convert(Cssize_t,nread)
if flags & UV_UDP_PARTIAL > 0
# TODO: Decide what to do in this case. For now throw an error
c_free(buf_addr)
notify_error(sock.recvnotify,"Partial message received")
end
buf = pointer_to_array(convert(Ptr{Uint8},buf_addr),int(buf_size),true)
notify(sock.recvnotify,buf[1:nread])
end

function _send(sock::UdpSocket,ipaddr::IPv4,port::Uint16,buf)
ccall(:jl_udp_send,Cint,(Ptr{Void},Uint16,Uint32,Ptr{Uint8},Csize_t),sock.handle,hton(port),hton(ipaddr.host),buf,sizeof(buf))
end

function _send(sock::UdpSocket,ipaddr::IPv6,port::Uint16,buf)
ccall(:jl_udp_send6,Cint,(Ptr{Void},Uint16,Ptr{Uint128},Ptr{Uint8},Csize_t),sock.handle,hton(port),&hton(ipaddr.host),buf,sizeof(buf))
end

function send(sock::UdpSocket,ipaddr,port,msg)
# If the socket has not been bound, it will be bound implicitly to ::0 and a random port
if sock.status != StatusInit && sock.status != StatusOpen
error("Invalid socket state")
end
uv_error("send",_send(sock,ipaddr,uint16(port),msg))
wait(sock.sendnotify)
nothing
end

function _uv_hook_send(sock::UdpSocket,status::Cint)
if status < 0
notify_error(sock.sendnotify,UVError("UDP send failed",status))
end
notify(sock.sendnotify)
end

##

callback_dict = ObjectIdDict()

function _uv_hook_getaddrinfo(cb::Function, addrinfo::Ptr{Void}, status::Int32)
Expand Down
66 changes: 65 additions & 1 deletion src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ enum CALLBACK_TYPE { CB_PTR, CB_INT32, CB_INT64 };
XX(isopen) \
XX(fseventscb) \
XX(writecb) \
XX(writecb_task)
XX(writecb_task) \
XX(recv) \
XX(send)
//TODO add UDP and other missing callbacks

#define JULIA_HOOK_(m,hook) ((jl_function_t*)jl_get_global(m, jl_symbol("_uv_hook_" #hook)))
Expand Down Expand Up @@ -231,6 +233,19 @@ DLLEXPORT void jl_uv_fseventscb(uv_fs_event_t* handle, const char* filename, int
(void)ret;
}

DLLEXPORT void jl_uv_recvcb(uv_udp_t* handle, ssize_t nread, uv_buf_t buf, struct sockaddr* addr, unsigned flags)
{
JULIA_CB(recv,handle->data,5,CB_PTR,nread,CB_PTR,(buf.base),CB_INT32,buf.len,CB_PTR,addr,CB_INT32,flags)
(void)ret;
}

DLLEXPORT void jl_uv_sendcb(uv_udp_send_t* handle, int status)
{
JULIA_CB(send,handle->data,1,CB_INT32,status)
free(handle);
(void)ret;
}

/** This file contains wrappers for most of libuv's stream functionailty. Once we can allocate structs in Julia, this file will be removed */

DLLEXPORT int jl_run_once(uv_loop_t *loop)
Expand Down Expand Up @@ -698,6 +713,55 @@ DLLEXPORT int jl_tcp_bind6(uv_tcp_t* handle, uint16_t port, void *host)
return uv_tcp_bind6(handle, addr);
}

DLLEXPORT int jl_udp_bind(uv_udp_t* handle, uint16_t port, uint32_t host, uint32_t flags)
{
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_port = port;
addr.sin_addr.s_addr = host;
addr.sin_family = AF_INET;
return uv_udp_bind(handle, addr, flags);
}
DLLEXPORT int jl_udp_bind6(uv_udp_t* handle, uint16_t port, void *host, uint32_t flags)
{
struct sockaddr_in6 addr;
memset(&addr, 0, sizeof(struct sockaddr_in6));
addr.sin6_port = port;
memcpy(&addr.sin6_addr, host, 16);
addr.sin6_family = AF_INET6;
return uv_udp_bind6(handle, addr, flags);
}

DLLEXPORT int jl_udp_send(uv_udp_t* handle, uint16_t port, uint32_t host, void *data, uint32_t size)
{
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_port = port;
addr.sin_addr.s_addr = host;
addr.sin_family = AF_INET;
uv_buf_t buf[1];
buf[0].base = data;
buf[0].len = size;
uv_udp_send_t *req = malloc(sizeof(uv_udp_send_t));
req->data = handle->data;
return uv_udp_send(req, handle, buf, 1, addr, &jl_uv_sendcb);
}

DLLEXPORT int jl_udp_send6(uv_udp_t* handle, uint16_t port, void *host, void *data, uint32_t size)
{
struct sockaddr_in6 addr;
memset(&addr, 0, sizeof(struct sockaddr_in6));
addr.sin6_port = port;
memcpy(&addr.sin6_addr, host, 16);
addr.sin6_family = AF_INET6;
uv_buf_t buf[1];
buf[0].base = data;
buf[0].len = size;
uv_udp_send_t *req = malloc(sizeof(uv_udp_send_t));
req->data = handle->data;
return uv_udp_send6(req, handle, buf, 1, addr, &jl_uv_sendcb);
}

DLLEXPORT int jl_uv_sizeof_interface_address()
{
return sizeof(uv_interface_address_t);
Expand Down
18 changes: 18 additions & 0 deletions test/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,21 @@ try
catch e
@test typeof(e) == Base.UVError
end

a = UdpSocket()
b = UdpSocket()
bind(a,ip"127.0.0.1",2134)
bind(b,ip"127.0.0.1",2135)

c = Condition()
@async begin
@test bytestring(recv(a)) == "Hello World"
notify(c)
end
send(b,ip"127.0.0.1",2134,"Hello World")
wait(c)

@test_throws bind(UdpSocket(),2134)

close(a)
close(b)

0 comments on commit 6585e3d

Please sign in to comment.