Skip to content

Commit

Permalink
Merge pull request JuliaLang#10232 from amitmurthy/amitm/asyncbuffer
Browse files Browse the repository at this point in the history
Buffered send in AsyncStreams
  • Loading branch information
amitmurthy committed Feb 25, 2015
2 parents 897395b + 927744b commit f8e7783
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 143 deletions.
9 changes: 9 additions & 0 deletions base/base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,12 @@ Array{T}(::Type{T}, d::Integer...) = Array{T}(convert((Int...), d))
Array{T}(::Type{T}, m::Integer) = Array{T}(m)
Array{T}(::Type{T}, m::Integer,n::Integer) = Array{T}(m,n)
Array{T}(::Type{T}, m::Integer,n::Integer,o::Integer) = Array{T}(m,n,o)

immutable Nullable{T}
isnull::Bool
value::T

Nullable() = new(true)
Nullable(value::T) = new(false, value)
end

1 change: 1 addition & 0 deletions base/iobuffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ IOBuffer() = IOBuffer(UInt8[], true, true)
IOBuffer(maxsize::Int) = (x=IOBuffer(Array(UInt8,maxsize),true,true,maxsize); x.size=0; x)

is_maxsize_unlimited(io::IOBuffer) = (io.maxsize == typemax(Int))
maxsize(io::IOBuffer) = io.maxsize

read!(from::IOBuffer, a::Array) = read_sub(from, a, 1, length(a))

Expand Down
141 changes: 19 additions & 122 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,12 @@ type WorkerConfig
end
end

# Optimizing writes to socket
type BufferedAsyncStream <: IO
w_stream::AsyncStream
sendbuf::IOBuffer
locked_by::Nullable{Task} # The task that has locked the write stream
acq_lock_cond::Condition # Object to wait on to acquire lock

BufferedAsyncStream(s) = new(s, PipeBuffer(), Nullable{Task}(), Condition())
end



type Worker
id::Int
r_stream::AsyncStream
w_io::BufferedAsyncStream
w_stream::AsyncStream
manager::ClusterManager
config::WorkerConfig

Expand All @@ -143,7 +133,7 @@ type Worker
gcflag::Bool

Worker(id, r_stream, w_stream, manager, config) =
new(id, r_stream, BufferedAsyncStream(w_stream), manager, config, [], [], false)
new(id, r_stream, buffer_writes(w_stream), manager, config, [], [], false)
end

Worker(id, r_stream, w_stream, manager) = Worker(id, r_stream, w_stream, manager, WorkerConfig())
Expand Down Expand Up @@ -174,42 +164,21 @@ function flush_gc_msgs(w::Worker)
end
end

# Acquire lock for write, recording the current task as owning the lock.
# Returns true if lock was set, false if lock was already acquired
function acquire_write_lock(w::Worker)
t = current_task()
while true
if isnull(w.w_io.locked_by)
w.w_io.locked_by = t; return true
else
if get(w.w_io.locked_by) == t
return false
end
wait(w.w_io.acq_lock_cond)
end
end
end

function release_write_lock(w::Worker)
w.w_io.locked_by = Nullable{Task}()
notify(w.w_io.acq_lock_cond)
end

function send_msg_(w::Worker, kind, args, now::Bool)
#println("Sending msg $kind")
release_lock = acquire_write_lock(w)

serialize(w.w_io, kind)
for arg in args
serialize(w.w_io, arg)
end
io = w.w_stream
lock(io) do io
serialize(io, kind)
for arg in args
serialize(io, arg)
end

if !now && w.gcflag
flush_gc_msgs(w)
else
flush(w.w_io)
if !now && w.gcflag
flush_gc_msgs(w)
else
flush(io)
end
end
release_lock && release_write_lock(w)
end

function flush_gc_msgs()
Expand Down Expand Up @@ -357,7 +326,7 @@ end
function worker_id_from_socket(s)
w = get(map_sock_wrkr, s, nothing)
if isa(w,Worker)
if is(s, w.r_stream) || is(s, w.w_io.w_stream) || is(s, w.w_io.sendbuf)
if is(s, w.r_stream) || is(s, w.w_stream)
return w.id
end
end
Expand All @@ -374,8 +343,7 @@ function register_worker(pg, w)
map_pid_wrkr[w.id] = w
if isa(w, Worker)
map_sock_wrkr[w.r_stream] = w
map_sock_wrkr[w.w_io.w_stream] = w
map_sock_wrkr[w.w_io.sendbuf] = w
map_sock_wrkr[w.w_stream] = w
end
end

Expand All @@ -385,10 +353,9 @@ function deregister_worker(pg, pid)
w = pop!(map_pid_wrkr, pid, nothing)
if isa(w, Worker)
pop!(map_sock_wrkr, w.r_stream)
if w.r_stream != w.w_io.w_stream
pop!(map_sock_wrkr, w.w_io.w_stream)
if w.r_stream != w.w_stream
pop!(map_sock_wrkr, w.w_stream)
end
pop!(map_sock_wrkr, w.w_io.sendbuf)

# Notify the cluster manager of this workers death
if myid() == 1
Expand Down Expand Up @@ -915,7 +882,7 @@ function create_message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStrea
(r_s, w_s) = connect(cluster_manager, rpid, wconfig)
w = Worker(rpid, r_s, w_s, cluster_manager, wconfig)
register_worker(w)
process_messages(w.r_stream, w.w_io.w_stream)
process_messages(w.r_stream, w.w_stream)
send_msg_now(w, :identify_socket, self_pid)
else
# Processes with higher pids connect to us. Don't do anything just yet
Expand Down Expand Up @@ -1241,7 +1208,7 @@ function setup_worker(pg::ProcessGroup, w)

# Start a new task to handle inbound messages from connected worker in master.
# Also calls `wait_connected` on TCP streams.
process_messages(w.r_stream, w.w_io.w_stream; ntfy_join_complete=rr_join)
process_messages(w.r_stream, w.w_stream; ntfy_join_complete=rr_join)

# send address information of all workers to the new worker.
# Cluster managers set the address of each worker in `WorkerConfig.connect_at`.
Expand Down Expand Up @@ -1646,73 +1613,3 @@ function terminate_all_workers()
end
end
end


# Optimized send
# - smaller writes are initially buffered with a final single socket write
# - large isbits arrays are unbuffered and written directly

const SENDBUF_SZ=100000
function buffer_or_write(s::BufferedAsyncStream, nb)
totb = nb_available(s.sendbuf) + nb
if totb < SENDBUF_SZ
return (true, false)
elseif nb > SENDBUF_SZ
flush(s)
return (false, false)
else
return (true, true)
end
end

write(s::BufferedAsyncStream, b::UInt8) = write_single(s, b, 1)
write(s::BufferedAsyncStream, c::Char) = write_single(s, c, utf8sizeof(c))

function write_single(s::BufferedAsyncStream, b, sz)
# print("writing $b $sz\n")
(do_buffering, do_flushing) = buffer_or_write(s, sz)
if do_buffering
write(s.sendbuf, b)
do_flushing && flush(s)
else
write(s.w_stream, b)
end
return sz
end

function write{T}(s::BufferedAsyncStream, a::Array{T})
# print("writing array $(isbits(T)) $T\n")
if isbits(T)
n = uint(length(a)*sizeof(T))
(do_buffering, do_flushing) = buffer_or_write(s, n)
if do_buffering
write(s.sendbuf, a)
do_flushing && flush(s)
else
write(s.w_stream, a)
end
return int(n)
else
bytes_written = write(s.sendbuf, a)
(nb_available(s.sendbuf) > SENDBUF_SZ) && flush(s)
return bytes_written
end
end
function write(s::BufferedAsyncStream, p::Ptr, nb::Integer)
# print("writing from pointer $nb bytes\n")
(do_buffering, do_flushing) = buffer_or_write(s, nb)
if do_buffering
write(s.sendbuf, p, nb)
do_flushing && flush(s)
else
write(s.w_stream, p, nb)
end
return nb
end

function flush(s::BufferedAsyncStream)
# print("Flushing $(nb_available(s.sendbuf)) bytes\n")
if nb_available(s.sendbuf) > 0
write(s.w_stream, takebuf_array(s.sendbuf))
end
end
8 changes: 0 additions & 8 deletions base/nullable.jl
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
immutable Nullable{T}
isnull::Bool
value::T

Nullable() = new(true)
Nullable(value::T) = new(false, value)
end

immutable NullException <: Exception
end

Expand Down
6 changes: 5 additions & 1 deletion base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ type TCPSocket <: Socket
connectnotify::Condition
closecb::Callback
closenotify::Condition
sendbuf::Nullable{IOBuffer}
lock_task::Nullable{Task}
lock_wait::Condition

TCPSocket(handle) = new(
handle,
Expand All @@ -267,7 +270,8 @@ type TCPSocket <: Socket
PipeBuffer(),
false, Condition(),
false, Condition(),
false, Condition()
false, Condition(),
nothing,nothing,Condition()
)
end
function TCPSocket()
Expand Down
Loading

0 comments on commit f8e7783

Please sign in to comment.