From 927744b5590dea75218c8ed4f3f350a5121669d3 Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Tue, 17 Feb 2015 08:50:21 +0530 Subject: [PATCH] Buffered send in AsyncStreams --- base/base.jl | 9 +++ base/iobuffer.jl | 1 + base/multi.jl | 141 +++++++---------------------------------------- base/nullable.jl | 8 --- base/socket.jl | 6 +- base/stream.jl | 108 ++++++++++++++++++++++++++++++++---- 6 files changed, 130 insertions(+), 143 deletions(-) diff --git a/base/base.jl b/base/base.jl index 54aa8b0f66402..ef2b8be418e54 100644 --- a/base/base.jl +++ b/base/base.jl @@ -275,3 +275,12 @@ Array{T}(::Type{T}, d::Integer...) = Array{T}(convert((Int...), d)) Array{T}(::Type{T}, m::Integer) = Array{T}(m) Array{T}(::Type{T}, m::Integer,n::Integer) = Array{T}(m,n) Array{T}(::Type{T}, m::Integer,n::Integer,o::Integer) = Array{T}(m,n,o) + +immutable Nullable{T} + isnull::Bool + value::T + + Nullable() = new(true) + Nullable(value::T) = new(false, value) +end + diff --git a/base/iobuffer.jl b/base/iobuffer.jl index 6490251916f18..57014a60bee99 100644 --- a/base/iobuffer.jl +++ b/base/iobuffer.jl @@ -51,6 +51,7 @@ IOBuffer() = IOBuffer(UInt8[], true, true) IOBuffer(maxsize::Int) = (x=IOBuffer(Array(UInt8,maxsize),true,true,maxsize); x.size=0; x) is_maxsize_unlimited(io::IOBuffer) = (io.maxsize == typemax(Int)) +maxsize(io::IOBuffer) = io.maxsize read!(from::IOBuffer, a::Array) = read_sub(from, a, 1, length(a)) diff --git a/base/multi.jl b/base/multi.jl index 7ecf0694b92a1..eb60d54011d29 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -119,22 +119,12 @@ type WorkerConfig end end -# Optimizing writes to socket -type BufferedAsyncStream <: IO - w_stream::AsyncStream - sendbuf::IOBuffer - locked_by::Nullable{Task} # The task that has locked the write stream - acq_lock_cond::Condition # Object to wait on to acquire lock - - BufferedAsyncStream(s) = new(s, PipeBuffer(), Nullable{Task}(), Condition()) -end - type Worker id::Int r_stream::AsyncStream - w_io::BufferedAsyncStream + w_stream::AsyncStream manager::ClusterManager config::WorkerConfig @@ -143,7 +133,7 @@ type Worker gcflag::Bool Worker(id, r_stream, w_stream, manager, config) = - new(id, r_stream, BufferedAsyncStream(w_stream), manager, config, [], [], false) + new(id, r_stream, buffer_writes(w_stream), manager, config, [], [], false) end Worker(id, r_stream, w_stream, manager) = Worker(id, r_stream, w_stream, manager, WorkerConfig()) @@ -174,42 +164,21 @@ function flush_gc_msgs(w::Worker) end end -# Acquire lock for write, recording the current task as owning the lock. -# Returns true if lock was set, false if lock was already acquired -function acquire_write_lock(w::Worker) - t = current_task() - while true - if isnull(w.w_io.locked_by) - w.w_io.locked_by = t; return true - else - if get(w.w_io.locked_by) == t - return false - end - wait(w.w_io.acq_lock_cond) - end - end -end - -function release_write_lock(w::Worker) - w.w_io.locked_by = Nullable{Task}() - notify(w.w_io.acq_lock_cond) -end - function send_msg_(w::Worker, kind, args, now::Bool) #println("Sending msg $kind") - release_lock = acquire_write_lock(w) - - serialize(w.w_io, kind) - for arg in args - serialize(w.w_io, arg) - end + io = w.w_stream + lock(io) do io + serialize(io, kind) + for arg in args + serialize(io, arg) + end - if !now && w.gcflag - flush_gc_msgs(w) - else - flush(w.w_io) + if !now && w.gcflag + flush_gc_msgs(w) + else + flush(io) + end end - release_lock && release_write_lock(w) end function flush_gc_msgs() @@ -357,7 +326,7 @@ end function worker_id_from_socket(s) w = get(map_sock_wrkr, s, nothing) if isa(w,Worker) - if is(s, w.r_stream) || is(s, w.w_io.w_stream) || is(s, w.w_io.sendbuf) + if is(s, w.r_stream) || is(s, w.w_stream) return w.id end end @@ -374,8 +343,7 @@ function register_worker(pg, w) map_pid_wrkr[w.id] = w if isa(w, Worker) map_sock_wrkr[w.r_stream] = w - map_sock_wrkr[w.w_io.w_stream] = w - map_sock_wrkr[w.w_io.sendbuf] = w + map_sock_wrkr[w.w_stream] = w end end @@ -385,10 +353,9 @@ function deregister_worker(pg, pid) w = pop!(map_pid_wrkr, pid, nothing) if isa(w, Worker) pop!(map_sock_wrkr, w.r_stream) - if w.r_stream != w.w_io.w_stream - pop!(map_sock_wrkr, w.w_io.w_stream) + if w.r_stream != w.w_stream + pop!(map_sock_wrkr, w.w_stream) end - pop!(map_sock_wrkr, w.w_io.sendbuf) # Notify the cluster manager of this workers death if myid() == 1 @@ -915,7 +882,7 @@ function create_message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStrea (r_s, w_s) = connect(cluster_manager, rpid, wconfig) w = Worker(rpid, r_s, w_s, cluster_manager, wconfig) register_worker(w) - process_messages(w.r_stream, w.w_io.w_stream) + process_messages(w.r_stream, w.w_stream) send_msg_now(w, :identify_socket, self_pid) else # Processes with higher pids connect to us. Don't do anything just yet @@ -1241,7 +1208,7 @@ function setup_worker(pg::ProcessGroup, w) # Start a new task to handle inbound messages from connected worker in master. # Also calls `wait_connected` on TCP streams. - process_messages(w.r_stream, w.w_io.w_stream; ntfy_join_complete=rr_join) + process_messages(w.r_stream, w.w_stream; ntfy_join_complete=rr_join) # send address information of all workers to the new worker. # Cluster managers set the address of each worker in `WorkerConfig.connect_at`. @@ -1646,73 +1613,3 @@ function terminate_all_workers() end end end - - -# Optimized send -# - smaller writes are initially buffered with a final single socket write -# - large isbits arrays are unbuffered and written directly - -const SENDBUF_SZ=100000 -function buffer_or_write(s::BufferedAsyncStream, nb) - totb = nb_available(s.sendbuf) + nb - if totb < SENDBUF_SZ - return (true, false) - elseif nb > SENDBUF_SZ - flush(s) - return (false, false) - else - return (true, true) - end -end - -write(s::BufferedAsyncStream, b::UInt8) = write_single(s, b, 1) -write(s::BufferedAsyncStream, c::Char) = write_single(s, c, utf8sizeof(c)) - -function write_single(s::BufferedAsyncStream, b, sz) -# print("writing $b $sz\n") - (do_buffering, do_flushing) = buffer_or_write(s, sz) - if do_buffering - write(s.sendbuf, b) - do_flushing && flush(s) - else - write(s.w_stream, b) - end - return sz -end - -function write{T}(s::BufferedAsyncStream, a::Array{T}) -# print("writing array $(isbits(T)) $T\n") - if isbits(T) - n = uint(length(a)*sizeof(T)) - (do_buffering, do_flushing) = buffer_or_write(s, n) - if do_buffering - write(s.sendbuf, a) - do_flushing && flush(s) - else - write(s.w_stream, a) - end - return int(n) - else - bytes_written = write(s.sendbuf, a) - (nb_available(s.sendbuf) > SENDBUF_SZ) && flush(s) - return bytes_written - end -end -function write(s::BufferedAsyncStream, p::Ptr, nb::Integer) -# print("writing from pointer $nb bytes\n") - (do_buffering, do_flushing) = buffer_or_write(s, nb) - if do_buffering - write(s.sendbuf, p, nb) - do_flushing && flush(s) - else - write(s.w_stream, p, nb) - end - return nb -end - -function flush(s::BufferedAsyncStream) -# print("Flushing $(nb_available(s.sendbuf)) bytes\n") - if nb_available(s.sendbuf) > 0 - write(s.w_stream, takebuf_array(s.sendbuf)) - end -end diff --git a/base/nullable.jl b/base/nullable.jl index d95ac0674433f..dd8e6d0591709 100644 --- a/base/nullable.jl +++ b/base/nullable.jl @@ -1,11 +1,3 @@ -immutable Nullable{T} - isnull::Bool - value::T - - Nullable() = new(true) - Nullable(value::T) = new(false, value) -end - immutable NullException <: Exception end diff --git a/base/socket.jl b/base/socket.jl index 7f90a426a2f2a..ed76f649dc81d 100644 --- a/base/socket.jl +++ b/base/socket.jl @@ -259,6 +259,9 @@ type TCPSocket <: Socket connectnotify::Condition closecb::Callback closenotify::Condition + sendbuf::Nullable{IOBuffer} + lock_task::Nullable{Task} + lock_wait::Condition TCPSocket(handle) = new( handle, @@ -267,7 +270,8 @@ type TCPSocket <: Socket PipeBuffer(), false, Condition(), false, Condition(), - false, Condition() + false, Condition(), + nothing,nothing,Condition() ) end function TCPSocket() diff --git a/base/stream.jl b/base/stream.jl index 70cd08166ba79..4f5823b30c0ae 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -107,6 +107,9 @@ type Pipe <: AsyncStream connectnotify::Condition closecb::Callback closenotify::Condition + sendbuf::Nullable{IOBuffer} + lock_task::Nullable{Task} + lock_wait::Condition Pipe(handle) = new( handle, StatusUninit, @@ -114,7 +117,8 @@ type Pipe <: AsyncStream true, false,Condition(), false,Condition(), - false,Condition()) + false,Condition(), + nothing,nothing,Condition()) end function Pipe() handle = c_malloc(_sizeof_uv_named_pipe) @@ -181,6 +185,9 @@ type TTY <: AsyncStream readnotify::Condition closecb::Callback closenotify::Condition + sendbuf::Nullable{IOBuffer} + lock_task::Nullable{Task} + lock_wait::Condition @windows_only ispty::Bool function TTY(handle) tty = new( @@ -189,7 +196,8 @@ type TTY <: AsyncStream true, PipeBuffer(), false,Condition(), - false,Condition()) + false,Condition(), + nothing,nothing,Condition()) @windows_only tty.ispty = bool(ccall(:jl_ispty, Cint, (Ptr{Void},), handle)) tty end @@ -279,8 +287,6 @@ function reinit_stdio() global STDERR = init_stdio(ccall(:jl_stderr_stream,Ptr{Void},())) end -flush(::AsyncStream) = nothing - function isopen{T<:Union(AsyncStream,UVServer)}(x::T) if !(x.status != StatusUninit && x.status != StatusInit) throw(ArgumentError("$T object not initialized")) @@ -660,6 +666,7 @@ function read!{T}(s::AsyncStream, a::Array{T}) return a end +const SZ_UNBUFFERED_IO=65536 function read!(s::AsyncStream, a::Vector{UInt8}) nb = length(a) sbuf = s.buffer @@ -670,7 +677,7 @@ function read!(s::AsyncStream, a::Vector{UInt8}) return read!(sbuf, a) end - if nb <= 65536 # Arbitrary 64K limit under which we are OK with copying the array from the stream's buffer + if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer wait_readnb(s,nb) read!(sbuf, a) else @@ -748,21 +755,82 @@ function uv_write(s::AsyncStream, p, n::Integer) return Int(n) end +# Optimized send +# - smaller writes are buffered, final uv write on flush or when buffer full +# - large isbits arrays are unbuffered and written directly + +function buffer_or_write(s::AsyncStream, p::Ptr, n::Integer) + if isnull(s.sendbuf) + return uv_write(s, p, n) + else + buf = get(s.sendbuf) + end + + totb = nb_available(buf) + n + if totb < maxsize(buf) + nb = write(buf, p, n) + else + flush(s) + if n > maxsize(buf) + nb = uv_write(s, p, n) + else + nb = write(buf, p, n) + end + end + return nb +end + +function flush(s::AsyncStream) + if isnull(s.sendbuf) + return s + end + buf = get(s.sendbuf) + if nb_available(buf) > 0 + arr = takebuf_array(buf) # Array of UInt8s + uv_write(s, arr, length(arr)) + end + s +end + +buffer_writes(s::AsyncStream, bufsize=SZ_UNBUFFERED_IO) = (s.sendbuf=PipeBuffer(bufsize); s) + +# Locks an asyncstream. Recursive calls by the same task is OK. +function lock(f::Function, s::AsyncStream) + t = current_task() + release_lock = true + while true + if isnull(s.lock_task) + s.lock_task = t; break + else + if get(s.lock_task) == t + release_lock = false; break + end + wait(s.lock_wait) + end + end + + f(s) + if release_lock + s.lock_task = nothing + notify(s.lock_wait) + end +end + ## low-level calls ## write(s::AsyncStream, b::UInt8) = write(s, [b]) write(s::AsyncStream, c::Char) = write(s, string(c)) function write{T}(s::AsyncStream, a::Array{T}) if isbits(T) - n = uint(length(a))*sizeof(T) - return uv_write(s, a, n) + n = uint(length(a)*sizeof(T)) + return buffer_or_write(s, pointer(a), n); else check_open(s) invoke(write,(IO,Array),s,a) end end -write(s::AsyncStream, p::Ptr, n::Integer) = uv_write(s, p, n) +write(s::AsyncStream, p::Ptr, n::Integer) = buffer_or_write(s, p, n) function _uv_hook_writecb_task(s::AsyncStream,req::Ptr{Void},status::Int32) d = uv_req_data(req) @@ -925,8 +993,11 @@ type BufferStream <: AsyncStream r_c::Condition close_c::Condition is_open::Bool + buffer_writes::Bool + lock_task::Nullable{Task} + lock_wait::Condition - BufferStream() = new(PipeBuffer(), Condition(), Condition(), true) + BufferStream() = new(PipeBuffer(), Condition(), Condition(), true, false, nothing, Condition()) end isopen(s::BufferStream) = s.is_open @@ -958,7 +1029,20 @@ end wait_close(s::BufferStream) = if isopen(s) wait(s.close_c); end start_reading(s::BufferStream) = nothing -write(s::BufferStream, b::UInt8) = (rv=write(s.buffer, b); notify(s.r_c; all=true);rv) -write{T}(s::BufferStream, a::Array{T}) = (rv=write(s.buffer, a); notify(s.r_c; all=true);rv) -write(s::BufferStream, p::Ptr, nb::Integer) = (rv=write(s.buffer, p, nb); notify(s.r_c; all=true);rv) +write(s::BufferStream, b::UInt8) = write(s, [b]) +write(s::BufferStream, c::Char) = write(s, string(c)) + +function write{T}(s::BufferStream, a::Array{T}) + rv=write(s.buffer, a) + !(s.buffer_writes) && notify(s.r_c; all=true); + rv +end +function write(s::BufferStream, p::Ptr, nb::Integer) + rv=write(s.buffer, p, nb) + !(s.buffer_writes) && notify(s.r_c; all=true); + rv +end +# If buffer_writes is called, it will delay notifying waiters till a flush is called. +buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes=true; s) +flush(s::BufferStream) = (notify(s.r_c; all=true); s)