From 261e2b9c31f19b8de87da968dae8016f5e05e91f Mon Sep 17 00:00:00 2001 From: Jeff Bezanson Date: Wed, 17 Jul 2019 12:33:21 -0400 Subject: [PATCH] make `IOStream` thread-safe (#32421) --- NEWS.md | 4 ++ base/Base.jl | 2 +- base/io.jl | 74 ++++++++++++++++++++++ base/iostream.jl | 156 +++++++++++++++++------------------------------ base/lock.jl | 22 +++++++ src/sys.c | 13 ++-- 6 files changed, 165 insertions(+), 106 deletions(-) diff --git a/NEWS.md b/NEWS.md index 4d757a20e247d..232cc6d6f69fb 100644 --- a/NEWS.md +++ b/NEWS.md @@ -14,6 +14,10 @@ Language changes Multi-threading changes ----------------------- +* All system-level I/O operations (e.g. files and sockets) are now thread-safe. + This does not include subtypes of `IO` that are entirely in-memory, such as `IOBuffer`, + although it specifically does include `BufferStream`. + ([#32309], [#32174], [#31981], [#32421]). Build system changes -------------------- diff --git a/base/Base.jl b/base/Base.jl index efd9e6887dd03..7c5274405668b 100644 --- a/base/Base.jl +++ b/base/Base.jl @@ -198,7 +198,6 @@ include("c.jl") # Core I/O include("io.jl") -include("iostream.jl") include("iobuffer.jl") # strings & printing @@ -264,6 +263,7 @@ function randn end # I/O include("libuv.jl") include("asyncevent.jl") +include("iostream.jl") include("stream.jl") include("filesystem.jl") using .Filesystem diff --git a/base/io.jl b/base/io.jl index 047862ec227b3..d339b773da42b 100644 --- a/base/io.jl +++ b/base/io.jl @@ -226,6 +226,80 @@ function unsafe_read(s::IO, p::Ptr{UInt8}, n::UInt) nothing end +function peek(s::IO) + mark(s) + try read(s, UInt8) + finally + reset(s) + end +end + +# Generic `open` methods + +""" + open_flags(; keywords...) -> NamedTuple + +Compute the `read`, `write`, `create`, `truncate`, `append` flag value for +a given set of keyword arguments to [`open`](@ref) a [`NamedTuple`](@ref). +""" +function open_flags(; + read :: Union{Bool,Nothing} = nothing, + write :: Union{Bool,Nothing} = nothing, + create :: Union{Bool,Nothing} = nothing, + truncate :: Union{Bool,Nothing} = nothing, + append :: Union{Bool,Nothing} = nothing, +) + if write === true && read !== true && append !== true + create === nothing && (create = true) + truncate === nothing && (truncate = true) + end + + if truncate === true || append === true + write === nothing && (write = true) + create === nothing && (create = true) + end + + write === nothing && (write = false) + read === nothing && (read = !write) + create === nothing && (create = false) + truncate === nothing && (truncate = false) + append === nothing && (append = false) + + return ( + read = read, + write = write, + create = create, + truncate = truncate, + append = append, + ) +end + +""" + open(f::Function, args...; kwargs....) + +Apply the function `f` to the result of `open(args...; kwargs...)` and close the resulting file +descriptor upon completion. + +# Examples +```jldoctest +julia> open("myfile.txt", "w") do io + write(io, "Hello world!") + end; + +julia> open(f->read(f, String), "myfile.txt") +"Hello world!" + +julia> rm("myfile.txt") +``` +""" +function open(f::Function, args...; kwargs...) + io = open(args...; kwargs...) + try + f(io) + finally + close(io) + end +end # Generic wrappers around other IO objects abstract type AbstractPipe <: IO end diff --git a/base/iostream.jl b/base/iostream.jl index 981bd83813bfe..70b08d84f7aa9 100644 --- a/base/iostream.jl +++ b/base/iostream.jl @@ -15,11 +15,11 @@ mutable struct IOStream <: IO ios::Array{UInt8,1} name::AbstractString mark::Int64 + lock::ReentrantLock - IOStream(name::AbstractString, buf::Array{UInt8,1}) = new(pointer(buf), buf, name, -1) + IOStream(name::AbstractString, buf::Array{UInt8,1}) = new(pointer(buf), buf, name, -1, ReentrantLock()) end -# TODO: delay adding finalizer, e.g. for memio with a small buffer, or -# in the case where we take! it. + function IOStream(name::AbstractString, finalize::Bool) buf = zeros(UInt8,sizeof_ios_t) x = IOStream(name, buf) @@ -42,11 +42,11 @@ to synchronous `File`'s and `IOStream`'s not to any of the asynchronous streams. fd(s::IOStream) = Int(ccall(:jl_ios_fd, Clong, (Ptr{Cvoid},), s.ios)) stat(s::IOStream) = stat(fd(s)) -close(s::IOStream) = ccall(:ios_close, Cvoid, (Ptr{Cvoid},), s.ios) -isopen(s::IOStream) = ccall(:ios_isopen, Cint, (Ptr{Cvoid},), s.ios)!=0 +close(s::IOStream) = @lock_nofail s.lock ccall(:ios_close, Cvoid, (Ptr{Cvoid},), s.ios) +isopen(s::IOStream) = ccall(:ios_isopen, Cint, (Ptr{Cvoid},), s.ios) != 0 function flush(s::IOStream) sigatomic_begin() - bad = ccall(:ios_flush, Cint, (Ptr{Cvoid},), s.ios) != 0 + bad = @lock_nofail s.lock ccall(:ios_flush, Cint, (Ptr{Cvoid},), s.ios) != 0 sigatomic_end() systemerror("flush", bad) end @@ -83,7 +83,8 @@ julia> String(take!(io)) ``` """ function truncate(s::IOStream, n::Integer) - systemerror("truncate", ccall(:ios_trunc, Cint, (Ptr{Cvoid}, Csize_t), s.ios, n) != 0) + err = @lock_nofail s.lock ccall(:ios_trunc, Cint, (Ptr{Cvoid}, Csize_t), s.ios, n) != 0 + systemerror("truncate", err) return s end @@ -103,7 +104,7 @@ julia> read(io, Char) ``` """ function seek(s::IOStream, n::Integer) - ret = ccall(:ios_seek, Int64, (Ptr{Cvoid}, Int64), s.ios, n) + ret = @lock_nofail s.lock ccall(:ios_seek, Int64, (Ptr{Cvoid}, Int64), s.ios, n) systemerror("seek", ret == -1) ret < -1 && error("seek failed") return s @@ -137,7 +138,8 @@ seekstart(s::IO) = seek(s,0) Seek a stream to its end. """ function seekend(s::IOStream) - systemerror("seekend", ccall(:ios_seek_end, Int64, (Ptr{Cvoid},), s.ios) != 0) + err = @lock_nofail s.lock ccall(:ios_seek_end, Int64, (Ptr{Cvoid},), s.ios) != 0 + systemerror("seekend", err) return s end @@ -159,7 +161,7 @@ julia> read(io, Char) ``` """ function skip(s::IOStream, delta::Integer) - ret = ccall(:ios_skip, Int64, (Ptr{Cvoid}, Int64), s.ios, delta) + ret = @lock_nofail s.lock ccall(:ios_skip, Int64, (Ptr{Cvoid}, Int64), s.ios, delta) systemerror("skip", ret == -1) ret < -1 && error("skip failed") return s @@ -191,12 +193,13 @@ julia> position(io) ``` """ function position(s::IOStream) - pos = ccall(:ios_pos, Int64, (Ptr{Cvoid},), s.ios) + pos = @lock_nofail s.lock ccall(:ios_pos, Int64, (Ptr{Cvoid},), s.ios) systemerror("position", pos == -1) return pos end -eof(s::IOStream) = ccall(:ios_eof_blocking, Cint, (Ptr{Cvoid},), s.ios)!=0 +_eof_nolock(s::IOStream) = ccall(:ios_eof_blocking, Cint, (Ptr{Cvoid},), s.ios) != 0 +eof(s::IOStream) = @lock_nofail s.lock _eof_nolock(s) ## constructing and opening streams ## @@ -217,44 +220,6 @@ function fdio(name::AbstractString, fd::Integer, own::Bool=false) end fdio(fd::Integer, own::Bool=false) = fdio(string(""), fd, own) -""" - open_flags(; keywords...) -> NamedTuple - -Compute the `read`, `write`, `create`, `truncate`, `append` flag value for -a given set of keyword arguments to [`open`](@ref) a [`NamedTuple`](@ref). -""" -function open_flags(; - read :: Union{Bool,Nothing} = nothing, - write :: Union{Bool,Nothing} = nothing, - create :: Union{Bool,Nothing} = nothing, - truncate :: Union{Bool,Nothing} = nothing, - append :: Union{Bool,Nothing} = nothing, -) - if write === true && read !== true && append !== true - create === nothing && (create = true) - truncate === nothing && (truncate = true) - end - - if truncate === true || append === true - write === nothing && (write = true) - create === nothing && (create = true) - end - - write === nothing && (write = false) - read === nothing && (read = !write) - create === nothing && (create = false) - truncate === nothing && (truncate = false) - append === nothing && (append = false) - - return ( - read = read, - write = write, - create = create, - truncate = truncate, - append = append, - ) -end - """ open(filename::AbstractString; keywords...) -> IOStream @@ -351,52 +316,36 @@ function open(fname::AbstractString, mode::AbstractString) throw(ArgumentError("invalid open mode: $mode")) end -""" - open(f::Function, args...; kwargs....) - -Apply the function `f` to the result of `open(args...; kwargs...)` and close the resulting file -descriptor upon completion. - -# Examples -```jldoctest -julia> open("myfile.txt", "w") do io - write(io, "Hello world!") - end; - -julia> open(f->read(f, String), "myfile.txt") -"Hello world!" - -julia> rm("myfile.txt") -``` -""" -function open(f::Function, args...; kwargs...) - io = open(args...; kwargs...) - try - f(io) - finally - close(io) - end -end - ## low-level calls ## function write(s::IOStream, b::UInt8) iswritable(s) || throw(ArgumentError("write failed, IOStream is not writeable")) - Int(ccall(:ios_putc, Cint, (Cint, Ptr{Cvoid}), b, s.ios)) + Int(@lock_nofail s.lock ccall(:ios_putc, Cint, (Cint, Ptr{Cvoid}), b, s.ios)) end function unsafe_write(s::IOStream, p::Ptr{UInt8}, nb::UInt) iswritable(s) || throw(ArgumentError("write failed, IOStream is not writeable")) - return Int(ccall(:ios_write, Csize_t, (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), s.ios, p, nb)) + return Int(@lock_nofail s.lock ccall(:ios_write, Csize_t, (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), s.ios, p, nb)) end # num bytes available without blocking -bytesavailable(s::IOStream) = ccall(:jl_nb_available, Int32, (Ptr{Cvoid},), s.ios) - -readavailable(s::IOStream) = read!(s, Vector{UInt8}(undef, bytesavailable(s))) +bytesavailable(s::IOStream) = @lock_nofail s.lock ccall(:jl_nb_available, Int32, (Ptr{Cvoid},), s.ios) + +function readavailable(s::IOStream) + lock(s.lock) + nb = ccall(:jl_nb_available, Int32, (Ptr{Cvoid},), s.ios) + a = Vector{UInt8}(undef, nb) + nr = ccall(:ios_readall, Csize_t, (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), s, a, nb) + if nr != nb + unlock(s.lock) + throw(EOFError()) + end + unlock(s.lock) + return a +end function read(s::IOStream, ::Type{UInt8}) - b = ccall(:ios_getc, Cint, (Ptr{Cvoid},), s.ios) + b = @lock_nofail s.lock ccall(:ios_getc, Cint, (Ptr{Cvoid},), s.ios) if b == -1 throw(EOFError()) end @@ -405,7 +354,15 @@ end if ENDIAN_BOM == 0x04030201 function read(s::IOStream, T::Union{Type{Int16},Type{UInt16},Type{Int32},Type{UInt32},Type{Int64},Type{UInt64}}) - return ccall(:jl_ios_get_nbyte_int, UInt64, (Ptr{Cvoid}, Csize_t), s.ios, sizeof(T)) % T + n = sizeof(T) + lock(s.lock) + if ccall(:jl_ios_buffer_n, Cint, (Ptr{Cvoid}, Csize_t), s.ios, n) != 0 + unlock(s.lock) + throw(EOFError()) + end + x = ccall(:jl_ios_get_nbyte_int, UInt64, (Ptr{Cvoid}, Csize_t), s.ios, n) % T + unlock(s.lock) + return x end read(s::IOStream, ::Type{Float16}) = reinterpret(Float16, read(s, Int16)) @@ -414,8 +371,8 @@ read(s::IOStream, ::Type{Float64}) = reinterpret(Float64, read(s, Int64)) end function unsafe_read(s::IOStream, p::Ptr{UInt8}, nb::UInt) - if ccall(:ios_readall, Csize_t, - (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), s, p, nb) != nb + nr = @lock_nofail s.lock ccall(:ios_readall, Csize_t, (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), s, p, nb) + if nr != nb throw(EOFError()) end nothing @@ -424,24 +381,25 @@ end ## text I/O ## take!(s::IOStream) = - ccall(:jl_take_buffer, Vector{UInt8}, (Ptr{Cvoid},), s.ios) + @lock_nofail s.lock ccall(:jl_take_buffer, Vector{UInt8}, (Ptr{Cvoid},), s.ios) function readuntil(s::IOStream, delim::UInt8; keep::Bool=false) - ccall(:jl_readuntil, Array{UInt8,1}, (Ptr{Cvoid}, UInt8, UInt8, UInt8), s.ios, delim, 0, !keep) + @lock_nofail s.lock ccall(:jl_readuntil, Array{UInt8,1}, (Ptr{Cvoid}, UInt8, UInt8, UInt8), s.ios, delim, 0, !keep) end # like readuntil, above, but returns a String without requiring a copy function readuntil_string(s::IOStream, delim::UInt8, keep::Bool) - ccall(:jl_readuntil, Ref{String}, (Ptr{Cvoid}, UInt8, UInt8, UInt8), s.ios, delim, 1, !keep) + @lock_nofail s.lock ccall(:jl_readuntil, Ref{String}, (Ptr{Cvoid}, UInt8, UInt8, UInt8), s.ios, delim, 1, !keep) end function readline(s::IOStream; keep::Bool=false) - ccall(:jl_readuntil, Ref{String}, (Ptr{Cvoid}, UInt8, UInt8, UInt8), s.ios, '\n', 1, keep ? 0 : 2) + @lock_nofail s.lock ccall(:jl_readuntil, Ref{String}, (Ptr{Cvoid}, UInt8, UInt8, UInt8), s.ios, '\n', 1, keep ? 0 : 2) end function readbytes_all!(s::IOStream, b::Array{UInt8}, nb) olb = lb = length(b) nr = 0 + @lock_nofail s.lock begin GC.@preserve b while nr < nb if lb < nr+1 lb = max(65536, (nr+1) * 2) @@ -449,7 +407,8 @@ function readbytes_all!(s::IOStream, b::Array{UInt8}, nb) end nr += Int(ccall(:ios_readall, Csize_t, (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), s.ios, pointer(b, nr+1), min(lb-nr, nb-nr))) - eof(s) && break + _eof_nolock(s) && break + end end if lb > olb && lb > nr resize!(b, max(olb, nr)) # shrink to just contain input data if was resized @@ -462,8 +421,11 @@ function readbytes_some!(s::IOStream, b::Array{UInt8}, nb) if nb > olb resize!(b, nb) end + local nr + @lock_nofail s.lock begin nr = GC.@preserve b Int(ccall(:ios_read, Csize_t, (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), s.ios, pointer(b), nb)) + end lb = length(b) if lb > olb && lb > nr resize!(b, max(olb, nr)) # shrink to just contain input data if was resized @@ -495,7 +457,7 @@ function read(s::IOStream) Int64(0) end if sz > 0 - pos = ccall(:ios_pos, Int64, (Ptr{Cvoid},), s.ios) + pos = position(s) if pos > 0 sz -= pos end @@ -528,13 +490,5 @@ end ## peek ## function peek(s::IOStream) - ccall(:ios_peekc, Cint, (Ptr{Cvoid},), s) -end - -function peek(s::IO) - mark(s) - try read(s, UInt8) - finally - reset(s) - end + @lock_nofail s.lock ccall(:ios_peekc, Cint, (Ptr{Cvoid},), s) end diff --git a/base/lock.jl b/base/lock.jl index 8df4e43aeccbf..60066220b744e 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -164,6 +164,28 @@ function trylock(f, l::AbstractLock) return false end +macro lock(l, expr) + quote + temp = $(esc(l)) + lock(temp) + try + $(esc(expr)) + finally + unlock(temp) + end + end +end + +macro lock_nofail(l, expr) + quote + temp = $(esc(l)) + lock(temp) + val = $(esc(expr)) + unlock(temp) + val + end +end + @eval Threads begin """ Threads.Condition([lock]) diff --git a/src/sys.c b/src/sys.c index befda0a222a93..7c6311feea8cd 100644 --- a/src/sys.c +++ b/src/sys.c @@ -315,16 +315,21 @@ JL_DLLEXPORT jl_value_t *jl_readuntil(ios_t *s, uint8_t delim, uint8_t str, uint return (jl_value_t*)a; } -JL_DLLEXPORT uint64_t jl_ios_get_nbyte_int(ios_t *s, const size_t n) +JL_DLLEXPORT int jl_ios_buffer_n(ios_t *s, const size_t n) { - assert(n <= 8); size_t space, ret; do { space = (size_t)(s->size - s->bpos); ret = ios_readprep(s, n); if (space == ret && ret < n) - jl_eof_error(); - } while(ret < n); + return 1; + } while (ret < n); + return 0; +} + +JL_DLLEXPORT uint64_t jl_ios_get_nbyte_int(ios_t *s, const size_t n) +{ + assert(n <= 8); uint64_t x = 0; uint8_t *buf = (uint8_t*)&s->buf[s->bpos]; if (n == 8) {