Skip to content

Commit

Permalink
Merge pull request JuliaLang#10073 from amitmurthy/amitm/optsend2
Browse files Browse the repository at this point in the history
optimized send - direct writes for large bitstype arrays
  • Loading branch information
amitmurthy committed Feb 9, 2015
2 parents 93c3a58 + 748c5df commit 6558327
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 20 deletions.
138 changes: 118 additions & 20 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -119,24 +119,37 @@ type WorkerConfig
end
end

# Optimizing writes to socket
type BufferedAsyncStream <: IO
w_stream::AsyncStream
sendbuf::IOBuffer
locked_by::Nullable{Task} # The task that has locked the write stream
acq_lock_cond::Condition # Object to wait on to acquire lock

BufferedAsyncStream(s) = new(s, PipeBuffer(), Nullable{Task}(), Condition())
end



type Worker
id::Int
r_stream::AsyncStream
w_stream::AsyncStream
w_io::BufferedAsyncStream
manager::ClusterManager
config::WorkerConfig

sendbuf::IOBuffer
del_msgs::Array{Any,1}
add_msgs::Array{Any,1}
gcflag::Bool

Worker(id, r_stream, w_stream, manager, config) =
new(id, r_stream, w_stream, manager, config, IOBuffer(), [], [], false)
new(id, r_stream, BufferedAsyncStream(w_stream), manager, config, [], [], false)
end

Worker(id, r_stream, w_stream, manager) = Worker(id, r_stream, w_stream, manager, WorkerConfig())



function send_msg_now(w::Worker, kind, args...)
send_msg_(w, kind, args, true)
end
Expand All @@ -161,26 +174,42 @@ function flush_gc_msgs(w::Worker)
end
end

#TODO: Move to different Thread
function enq_send_req(sock::AsyncStream, buf, now::Bool)
arr=takebuf_array(buf)
write(sock,arr)
#TODO implement "now"
# Acquire lock for write, recording the current task as owning the lock.
# Returns true if lock was set, false if lock was already acquired
function acquire_write_lock(w::Worker)
t = current_task()
while true
if isnull(w.w_io.locked_by)
w.w_io.locked_by = t; return true
else
if get(w.w_io.locked_by) == t
return false
end
wait(w.w_io.acq_lock_cond)
end
end
end

function release_write_lock(w::Worker)
w.w_io.locked_by = Nullable{Task}()
notify(w.w_io.acq_lock_cond)
end

function send_msg_(w::Worker, kind, args, now::Bool)
#println("Sending msg $kind")
buf = w.sendbuf
serialize(buf, kind)
release_lock = acquire_write_lock(w)

serialize(w.w_io, kind)
for arg in args
serialize(buf, arg)
serialize(w.w_io, arg)
end

if !now && w.gcflag
flush_gc_msgs(w)
else
enq_send_req(w.w_stream, buf, now)
flush(w.w_io)
end
release_lock && release_write_lock(w)
end

function flush_gc_msgs()
Expand Down Expand Up @@ -328,7 +357,7 @@ end
function worker_id_from_socket(s)
w = get(map_sock_wrkr, s, nothing)
if isa(w,Worker)
if is(s, w.r_stream) || is(s, w.w_stream) || is(s, w.sendbuf)
if is(s, w.r_stream) || is(s, w.w_io.w_stream) || is(s, w.w_io.sendbuf)
return w.id
end
end
Expand All @@ -345,8 +374,8 @@ function register_worker(pg, w)
map_pid_wrkr[w.id] = w
if isa(w, Worker)
map_sock_wrkr[w.r_stream] = w
map_sock_wrkr[w.w_stream] = w
map_sock_wrkr[w.sendbuf] = w
map_sock_wrkr[w.w_io.w_stream] = w
map_sock_wrkr[w.w_io.sendbuf] = w
end
end

Expand All @@ -356,10 +385,10 @@ function deregister_worker(pg, pid)
w = pop!(map_pid_wrkr, pid, nothing)
if isa(w, Worker)
pop!(map_sock_wrkr, w.r_stream)
if w.r_stream != w.w_stream
pop!(map_sock_wrkr, w.w_stream)
if w.r_stream != w.w_io.w_stream
pop!(map_sock_wrkr, w.w_io.w_stream)
end
pop!(map_sock_wrkr, w.sendbuf)
pop!(map_sock_wrkr, w.w_io.sendbuf)

# Notify the cluster manager of this workers death
if myid() == 1
Expand Down Expand Up @@ -886,7 +915,7 @@ function create_message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStrea
(r_s, w_s) = connect(cluster_manager, rpid, wconfig)
w = Worker(rpid, r_s, w_s, cluster_manager, wconfig)
register_worker(w)
process_messages(w.r_stream, w.w_stream)
process_messages(w.r_stream, w.w_io.w_stream)
send_msg_now(w, :identify_socket, self_pid)
else
# Processes with higher pids connect to us. Don't do anything just yet
Expand Down Expand Up @@ -1212,7 +1241,7 @@ function setup_worker(pg::ProcessGroup, w)

# Start a new task to handle inbound messages from connected worker in master.
# Also calls `wait_connected` on TCP streams.
process_messages(w.r_stream, w.w_stream; ntfy_join_complete=rr_join)
process_messages(w.r_stream, w.w_io.w_stream; ntfy_join_complete=rr_join)

# send address information of all workers to the new worker.
# Cluster managers set the address of each worker in `WorkerConfig.connect_at`.
Expand Down Expand Up @@ -1618,3 +1647,72 @@ function terminate_all_workers()
end
end


# Optimized send
# - smaller writes are initially buffered with a final single socket write
# - large isbits arrays are unbuffered and written directly

const SENDBUF_SZ=100000
function buffer_or_write(s::BufferedAsyncStream, nb)
totb = nb_available(s.sendbuf) + nb
if totb < SENDBUF_SZ
return (true, false)
elseif nb > SENDBUF_SZ
flush(s)
return (false, false)
else
return (true, true)
end
end

write(s::BufferedAsyncStream, b::UInt8) = write_single(s, b, 1)
write(s::BufferedAsyncStream, c::Char) = write_single(s, c, utf8sizeof(c))

function write_single(s::BufferedAsyncStream, b, sz)
# print("writing $b $sz\n")
(do_buffering, do_flushing) = buffer_or_write(s, sz)
if do_buffering
write(s.sendbuf, b)
do_flushing && flush(s)
else
write(s.w_stream, b)
end
return sz
end

function write{T}(s::BufferedAsyncStream, a::Array{T})
# print("writing array $(isbits(T)) $T\n")
if isbits(T)
n = uint(length(a)*sizeof(T))
(do_buffering, do_flushing) = buffer_or_write(s, n)
if do_buffering
write(s.sendbuf, a)
do_flushing && flush(s)
else
write(s.w_stream, a)
end
return int(n)
else
bytes_written = write(s.sendbuf, a)
(nb_available(s.sendbuf) > SENDBUF_SZ) && flush(s)
return bytes_written
end
end
function write(s::BufferedAsyncStream, p::Ptr, nb::Integer)
# print("writing from pointer $nb bytes\n")
(do_buffering, do_flushing) = buffer_or_write(s, nb)
if do_buffering
write(s.sendbuf, p, nb)
do_flushing && flush(s)
else
write(s.w_stream, p, nb)
end
return nb
end

function flush(s::BufferedAsyncStream)
# print("Flushing $(nb_available(s.sendbuf)) bytes\n")
if nb_available(s.sendbuf) > 0
write(s.w_stream, takebuf_array(s.sendbuf))
end
end
32 changes: 32 additions & 0 deletions test/parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,38 @@ end
# specify pids for pmap
@test sort(workers()[1:2]) == sort(unique(pmap(x->(sleep(0.1);myid()), 1:10, pids = workers()[1:2])))

# Testing buffered and unbuffered reads
# This large array should write directly to the socket
a = ones(10^6)
@test a == remotecall_fetch(id_other, (x)->x, a)

# Not a bitstype, should be buffered
s = [randstring() for x in 1:10^5]
@test s == remotecall_fetch(id_other, (x)->x, s)

#large number of small requests
[remotecall_fetch(id_other, myid) for i in 1:100000]

# test parallel sends of large arrays from multiple tasks to the same remote worker
ntasks = 10
rr_list = [RemoteRef() for x in 1:ntasks]
for rr in rr_list
@async let rr=rr
a=ones(10^6);
try
for i in 1:10
@test a == remotecall_fetch(id_other, (x)->x, a)
yield()
end
put!(rr, :OK)
catch
put!(rr, :ERROR)
end
end
end

@test [fetch(rr) for rr in rr_list] == [:OK for x in 1:ntasks]

# TODO: The below block should be always enabled but the error is printed by the event loop

# Hence in the event of any relevant changes to the parallel codebase,
Expand Down

0 comments on commit 6558327

Please sign in to comment.