From 7310f557aa9016693da6deac9915eaedea660e71 Mon Sep 17 00:00:00 2001 From: Keno Fischer Date: Tue, 18 Jun 2013 18:06:59 -0400 Subject: [PATCH] Redesign FD/File watchers Fixes #3015 Fixes #3016 Fixes #3020 --- base/client.jl | 1 + base/exports.jl | 4 +- base/fs.jl | 2 +- base/poll.jl | 294 ++++++++++++++++++++++++++++++++++++++++++++++++ base/stat.jl | 2 +- base/stream.jl | 206 ++------------------------------- base/sysimg.jl | 3 + src/jl_uv.c | 15 +++ test/file.jl | 12 +- test/pollfd.jl | 11 +- 10 files changed, 335 insertions(+), 215 deletions(-) create mode 100644 base/poll.jl diff --git a/base/client.jl b/base/client.jl index 153a8710e21f9..073e03164e2a6 100644 --- a/base/client.jl +++ b/base/client.jl @@ -293,6 +293,7 @@ end function _start() # set up standard streams reinit_stdio() + fdwatcher_reinit() # Initialize RNG Random.librandom_init() # Check that OpenBLAS is correctly built diff --git a/base/exports.jl b/base/exports.jl index 0ceb6ac0af66b..0650cae44ae6f 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1048,8 +1048,8 @@ export nb_available, open, open_any_tcp_port, - OS_FD, - OS_SOCKET, + RawFD, + WindowsRawSocket, PipeBuffer, position, read, diff --git a/base/fs.jl b/base/fs.jl index e0bc80ae05f28..7d80df1fc4774 100644 --- a/base/fs.jl +++ b/base/fs.jl @@ -121,7 +121,7 @@ function truncate(f::File, n::Integer) f end -fd(f::File) = OS_FD(f.handle) +fd(f::File) = RawFD(f.handle) stat(f::File) = stat(fd(f)) end diff --git a/base/poll.jl b/base/poll.jl new file mode 100644 index 0000000000000..2d622f3cec54a --- /dev/null +++ b/base/poll.jl @@ -0,0 +1,294 @@ +type FileMonitor + handle::Ptr{Void} + cb::Callback + open::Bool + notify::Condition + function FileMonitor(cb, file) + handle = c_malloc(_sizeof_uv_fs_event) + err = ccall(:jl_fs_event_init,Int32, (Ptr{Void}, Ptr{Void}, Ptr{Uint8}, Int32), eventloop(),handle,file,0) + if err == -1 + c_free(handle) + throw(UVError("FileMonitor")) + end + this = new(handle,cb,false,Condition()) + associate_julia_struct(handle,this) + finalizer(this,close) + this + end + FileMonitor(file) = FileMonitor(false,file) +end + +close(t::FileMonitor) = ccall(:jl_close_uv,Void,(Ptr{Void},),t.handle) + +const UV_READABLE = 1 +const UV_WRITEABLE = 2 + +convert(::Type{Int32},fd::RawFD) = fd.fd + +#Wrapper for an OS file descriptor (for Windows) +@windows_only immutable WindowsRawSocket + handle::Ptr{Void} # On Windows file descriptors are HANDLE's and 64-bit on 64-bit Windows... +end + +abstract UVPollingWatcher + +type PollingFileWatcher <: UVPollingWatcher + handle::Ptr{Void} + file::String + open::Bool + notify::Condition + cb::Callback + function PollingFileWatcher(cb, file) + handle = c_malloc(_sizeof_uv_fs_poll) + err = ccall(:uv_fs_poll_init,Int32,(Ptr{Void},Ptr{Void}),eventloop(),handle) + if err == -1 + c_free(handle) + throw(UVError("PollingFileWatcher")) + end + this = new(handle, file, false, Condition(), cb) + associate_julia_struct(handle,this) + finalizer(this,close) + this + end + PollingFileWatcher(file) = PollingFileWatcher(false,file) +end + +@unix_only typealias FDW_FD RawFD +@windows_only typealias FDW_FD WindowsRawSocket + +_get_osfhandle(fd::RawFD) = WindowsRawSocket(ccall(:_get_osfhandle,Ptr{Void},(Int32,),fd.fd)) + +type FDWatcher <: UVPollingWatcher + handle::Ptr{Void} + fd::FDW_FD + open::Bool + notify::Condition + cb::Callback + events::Int32 + FDWatcher(handle::Ptr,fd::FDW_FD,open::Bool,notify::Condition,cb::Callback,events::Integer) = + new(handle,fd,open,notify,cb,int32(events)) +end +function FDWatcher(fd::RawFD) + handle = c_malloc(_sizeof_uv_poll) + err = ccall(:uv_poll_init,Int32,(Ptr{Void},Ptr{Void},Int32),eventloop(),handle,fd.fd) + if err == -1 + c_free(handle) + throw(UVError("FDWatcher")) + end + @unix_only this = FDWatcher(handle,fd,false,Condition(),false,0) + @windows_only this = FDWatcher(handle,_get_osfhandle(fd),false,Condition(),false,0) + associate_julia_struct(handle,this) + finalizer(this,close) + this +end +@windows_only function FDWatcher(fd::WindowsRawSocket) + handle = c_malloc(_sizeof_uv_poll) + err = ccall(:uv_poll_init_socket,Int32,(Ptr{Void}, Ptr{Void}, Ptr{Void}), + eventloop(), handle, fd.handle) + if err == -1 + c_free(handle) + throw(UVError("FDWatcher")) + end + this = FDWatcher(handle,fd,false,Condition(),false,0) + associate_julia_struct(handle,this) + finalizer(this,close) + this +end + +function fdw_wait_cb(fdw::FDWatcher,status,events) + if status == -1 + notify(fdw.notify,(UV_error_t(_uv_lasterror(),_uv_lastsystemerror()),events)) + else + notify(fdw.notify,(UV_error_t(int32(0),int32(0)),events)) + end +end + +function _wait(fdw::FDWatcher,readable,writeable) + events = (readable ? UV_READABLE : 0) | + (writeable ? UV_WRITEABLE : 0) + if events == 0 + error("Must be watching for at least one event") + end + events |= fdw.events + if !fdw.open || (events != fdw.events) + # (re)initialize fdw + start_watching(fdw_wait_cb,fdw,events) + end + while true + err, events = wait(fdw.notify) + if err.uv_code != 0 + throw(UVError("wait (FD)",err)) + end + if (readable && (events & UV_READABLE) != 0) || + (writeable && (events & UV_WRITEABLE) != 0) + break + end + end + if isempty(fdw.notify.waitq) + stop_watching(fdw) + end + events +end + +# On Unix we can only have one watcher per FD, so we need to keep an explicit +# list of them. On Windows, I think it is techincally possible to have more than one +# watcher per FD, but in order to keep compatibility, we do the same on windows as we do +# on unix + +let + global fdwatcher_reinit + const empty_watcher = FDWatcher(C_NULL,RawFD(-1),false,Condition(),false,0) + @unix_only begin + fdwatcher_array = Array(FDWatcher,0) + function fdwatcher_reinit() + fdwatcher_array = Array(FDWatcher,0) + end + + function wait(fd::RawFD; readable=false, writeable=false) + old_length = length(fdwatcher_array) + if fd.fd > old_length + resize!(fdwatcher_array,fd.fd+1) + fdwatcher_array[old_length+1:fd.fd+1] = empty_watcher + end + if is(fdwatcher_array[fd.fd+1],empty_watcher) + fdwatcher_array[fd.fd+1] = FDWatcher(fd) + end + _wait(fdwatcher_array[fd.fd+1],readable,writeable) + end + end + @windows_only begin + fdwatcher_dict = Dict{WindowsRawSocket,FDWatcher}() + function fdwatcher_reinit() + fdwatcher_array = Dict{WindowsRawSocket,FDWatcher}() + end + + function wait(fd::RawFD; readable=false, writeable=false) + wait(_get_osfhandle(fd); readable=readable, writeable=writeable) + end + + function wait(socket::WindowsRawSocket; readable=false, writeable=false) + if !has(fdwatcher_array,socket.handle) + fdwatcher_array[fd.handle] = FDWatcher(socket) + end + _wait(fdwatcher_array[fd.handle],readable,writeable) + end + end +end + +function pfw_wait_cb(pfw::PollingFileWatcher, status, prev, cur) + if status == -1 + notify(pfw.notify,(UV_error_t(_uv_lasterror(),_uv_lastsystemerror()),prev,cur)) + else + notify(pfw.notify,(UV_error_t(int32(0),int32(0)),prev,cur)) + end +end + +function wait(pfw::PollingFileWatcher; interval=3.0) + if !pfw.open + start_watching(pfw_wait_cb,pfw,interval) + end + err,prev,curr = wait(pfw.notify) + if err.uv_code != 0 + throw(UVError("wait (PollingFileWatcher)",err)) + end + if isempty(pfw.notify.waitq) + stop_watching(pfw) + end + (prev,curr) +end + +function wait(m::FileMonitor) + err, filename, events = wait(m.notify) + if err.uv_code != 0 + throw(UVError("wait (FileMonitor)",err)) + end + filename, events +end + + +close(t::UVPollingWatcher) = ccall(:jl_close_uv,Void,(Ptr{Void},),t.handle) + +function start_watching(t::FDWatcher, events) + associate_julia_struct(t.handle, t) + @unix_only if ccall(:jl_uv_unix_fd_is_watched,Int32,(Int32,Ptr{Void},Ptr{Void}),t.fd,t.handle,eventloop()) == 1 + error("Cannot watch an FD more than once on Unix") + end + uv_error("start_watching (FD)", + ccall(:jl_poll_start,Int32,(Ptr{Void},Int32),t.handle,events)==-1) +end +start_watching(f::Function, t::FDWatcher, events) = (t.cb = f; start_watching(t,events)) + +function start_watching(t::PollingFileWatcher, interval) + associate_julia_struct(t.handle, t) + uv_error("start_watching (File)", + ccall(:jl_fs_poll_start,Int32,(Ptr{Void},Ptr{Uint8},Uint32),t.handle,t.file,interval)==-1) +end +start_watching(f::Function, t::PollingFileWatcher, interval) = (t.cb = f;start_watching(t,interval)) + +function stop_watching(t::FDWatcher) + disassociate_julia_struct(t.handle) + uv_error("stop_watching (FD)", + ccall(:uv_poll_stop,Int32,(Ptr{Void},),t.handle)==-1) +end + +function stop_watching(t::PollingFileWatcher) + disassociate_julia_struct(t.handle) + uv_error("stop_watching (File)", + ccall(:uv_fs_poll_stop,Int32,(Ptr{Void},),t.handle)==-1) +end + +function _uv_hook_fseventscb(t::FileMonitor,filename::Ptr,events::Int32,status::Int32) + if(isa(t.cb,Function)) + # bytestring(convert(Ptr{Uint8},filename)) - seems broken at the moment - got NULL + t.cb(status, events, status) + if status == -1 + notify(t.notify,(UV_error_t(_uv_lasterror(),_uv_lastsystemerror()),bytestring(convert(Ptr{Uint8},filename)),events)) + else + notify(t.notify,(UV_error_t(int32(0),int32(0)),bytestring(convert(Ptr{Uint8},filename)),events)) + end + end +end + +function _uv_hook_pollcb(t::FDWatcher,status::Int32,events::Int32) + if(isa(t.cb,Function)) + t.cb(t,status, events) + end +end +function _uv_hook_fspollcb(t::PollingFileWatcher,status::Int32,prev::Ptr,cur::Ptr) + if(isa(t.cb,Function)) + t.cb(t, status, Stat(convert(Ptr{Uint8},prev)), Stat(convert(Ptr{Uint8},cur))) + end +end + +_uv_hook_close(uv::FileMonitor) = (uv.handle = 0; nothing) +_uv_hook_close(uv::UVPollingWatcher) = (uv.handle = 0; nothing) + +function poll_fd(s, seconds::Real; readable=false, writeable=false) + wt = Condition() + + @schedule (args = wait(s; readable=readable, writeable=writeable); notify(wt,(:poll,args))) + @schedule (sleep(seconds); notify(wt,(:timeout,0))) + + _, ret = wait(wt) + + return ret +end + +function poll_file(s, interval_seconds::Real, seconds::Real) + wt = Condition() + + @schedule (wait(PollingFileWatcher(s);interval=interval_seconds); notify(wt,(:poll))) + @schedule (sleep(seconds); notify(wt,(:timeout))) + + wait(wt) == :poll +end + +function watch_file(cb, s; poll=false) + if poll + pfw = PollingFileWatcher(cb,s) + start_watching(pfw) + return pfw + else + return FileMonitor(cb,s) + end +end \ No newline at end of file diff --git a/base/stat.jl b/base/stat.jl index 12bfb1d948e95..a5f772842f910 100644 --- a/base/stat.jl +++ b/base/stat.jl @@ -48,7 +48,7 @@ macro stat_call(sym,arg1type,arg) end end -stat(fd::OS_FD) = @stat_call jl_fstat Int32 fd.fd +stat(fd::RawFD) = @stat_call jl_fstat Int32 fd.fd stat(fd::Integer) = @stat_call jl_fstat Int32 fd stat(path::String) = @stat_call jl_stat Ptr{Uint8} path lstat(path::String) = @stat_call jl_lstat Ptr{Uint8} path diff --git a/base/stream.jl b/base/stream.jl index 54690df5349ba..905fa59a9432e 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -11,6 +11,12 @@ abstract UVServer typealias UVHandle Ptr{Void} typealias UVStream AsyncStream +#Wrapper for an OS file descriptor (on both Unix and Windows) +immutable RawFD + fd::Int32 + RawFD(fd::Integer) = new(int32(fd)) +end + function uv_sizeof_handle(handle) if !(UV_UNKNOWN_HANDLE < handle < UV_HANDLE_TYPE_MAX) throw(DomainError()) @@ -88,142 +94,6 @@ end show(io::IO,stream::TTY) = print(io,"TTY(",stream.open?"connected,":"disconnected,",nb_available(stream.buffer)," bytes waiting)") - -type FileMonitor - handle::Ptr{Void} - cb::Callback - function FileMonitor(cb, file) - handle = c_malloc(_sizeof_uv_fs_events) - err = ccall(:jl_fs_event_init,Int32, (Ptr{Void}, Ptr{Void}, Ptr{Uint8}, Int32), eventloop(),handle,file,0) - if err == -1 - c_free(handle) - throw(UVError("FileMonitor")) - end - this = new(handle,cb) - associate_julia_struct(handle,this) - finalizer(this,close) - this - end - FileMonitor(file) = FileMonitor(false,file) -end - -close(t::FileMonitor) = ccall(:jl_close_uv,Void,(Ptr{Void},),t.handle) - -const UV_READABLE = 1 -const UV_WRITEABLE = 2 - -#Wrapper for an OS file descriptor (on both Unix and Windows) -immutable OS_FD - fd::Int32 -end - -convert(::Type{Int32},fd::OS_FD) = fd.fd - -#Wrapper for an OS file descriptor (for Windows) -@windows_only immutable OS_SOCKET - handle::Ptr{Void} # On Windows file descriptors are HANDLE's and 64-bit on 64-bit Windows... -end - -abstract UVPollingWatcher - -type PollingFileWatcher <: UVPollingWatcher - handle::Ptr{Void} - file::ASCIIString - cb::Callback - function PollingFileWatcher(cb, file) - handle = c_malloc(_sizeof_uv_fs_poll) - err = ccall(:uv_fs_poll_init,Int32,(Ptr{Void},Ptr{Void}),eventloop(),handle) - if err == -1 - c_free(handle) - throw(UVError("PollingFileWatcher")) - end - this = new(handle, file, cb) - associate_julia_struct(handle,this) - finalizer(this,close) - this - end - PollingFileWatcher(file) = PollingFileWatcher(false,file) -end - -type FDWatcher <: UVPollingWatcher - handle::Ptr{Void} - cb::Callback - function FDWatcher(fd::OS_FD) - handle = c_malloc(_sizeof_uv_poll) - err = ccall(:uv_poll_init,Int32,(Ptr{Void},Ptr{Void},Int32),eventloop(),handle,fd.fd) - if err == -1 - c_free(handle) - throw(UVError("FDWatcher")) - end - this = new(handle,false) - associate_julia_struct(handle,this) - finalizer(this,close) - this - end - @windows_only function FDWatcher(fd::OS_SOCKET) - handle = c_malloc(_sizeof_uv_poll) - err = ccall(:uv_poll_init_socket,Int32,(Ptr{Void}, Ptr{Void}, Ptr{Void}), - eventloop(), handle, fd.handle) - if err == -1 - c_free(handle) - throw(UVError("FDWatcher")) - end - this = new(handle,false) - associate_julia_struct(handle,this) - finalizer(this,close) - this - end -end - -close(t::UVPollingWatcher) = ccall(:jl_close_uv,Void,(Ptr{Void},),t.handle) - -function start_watching(t::FDWatcher, events) - associate_julia_struct(t.handle, t) - uv_error("start_watching (FD)", - ccall(:jl_poll_start,Int32,(Ptr{Void},Int32),t.handle,events)==-1) -end -start_watching(f::Function, t::FDWatcher, events) = (t.cb = f; start_watching(t,events)) - -function start_watching(t::PollingFileWatcher, interval) - associate_julia_struct(t.handle, t) - uv_error("start_watching (File)", - ccall(:jl_fs_poll_start,Int32,(Ptr{Void},Ptr{Uint8},Uint32),t.handle,t.file,interval)==-1) -end -start_watching(f::Function, t::PollingFileWatcher, interval) = (t.cb = f;start_watching(t,interval)) - -function stop_watching(t::FDWatcher) - disassociate_julia_struct(t.handle) - uv_error("stop_watching (FD)", - ccall(:uv_poll_stop,Int32,(Ptr{Void},),t.handle)==-1) -end - -function stop_watching(t::PollingFileWatcher) - disassociate_julia_struct(t.handle) - uv_error("stop_watching (File)", - ccall(:uv_fs_poll_stop,Int32,(Ptr{Void},),t.handle)==-1) -end - -function _uv_hook_fseventscb(t::FileMonitor,filename::Ptr,events::Int32,status::Int32) - if(isa(t.cb,Function)) - # bytestring(convert(Ptr{Uint8},filename)) - seems broken at the moment - got NULL - t.cb(status, events, status) - end -end - -function _uv_hook_pollcb(t::FDWatcher,status::Int32,events::Int32) - if(isa(t.cb,Function)) - t.cb(status, events) - end -end -function _uv_hook_fspollcb(t::PollingFileWatcher,status::Int32,prev::Ptr,cur::Ptr) - if(isa(t.cb,Function)) - t.cb(status, Stat(convert(Ptr{Uint8},prev)), Stat(convert(Ptr{Uint8},cur))) - end -end - -_uv_hook_close(uv::FileMonitor) = (uv.handle = 0; nothing) -_uv_hook_close(uv::UVPollingWatcher) = (uv.handle = 0; nothing) - uvtype(::AsyncStream) = UV_STREAM uvhandle(stream::AsyncStream) = stream.handle @@ -408,69 +278,7 @@ TimeoutAsyncWork(cb::Function) = TimeoutAsyncWork(eventloop(),cb) close(t::TimeoutAsyncWork) = ccall(:jl_close_uv,Void,(Ptr{Void},),t.handle) -function poll_fd(s, events::Integer, timeout_ms::Integer) - wt = Condition() - - fdw = FDWatcher(s) - start_watching((status, events) -> notify(wt, (:poll, status, events)), fdw, events) - - if (timeout_ms > 0) - timer = TimeoutAsyncWork(status -> notify(wt, (:timeout, status))) - start_timer(timer, int64(timeout_ms), int64(0)) - end - local args - try - args = wait(wt) - finally - if (timeout_ms > 0) stop_timer(timer) end - stop_watching(fdw) - end - - if (args[2] == 0) - if (args[1] == :poll) return args[3] end - if (args[1] == :timeout) return 0 end - end - - error("Error while polling") -end - -function poll_file(s, interval::Integer, timeout_ms::Integer) - wt = Condition() - - pfw = PollingFileWatcher(s) - start_watching((status,prev,cur) -> notify(wt, (:poll, status)), pfw, interval) - - if (timeout_ms > 0) - timer = TimeoutAsyncWork(status -> notify(wt, (:timeout, status))) - start_timer(timer, int64(timeout_ms), int64(0)) - end - - local args - try - args = wait(wt) - finally - if (timeout_ms > 0) stop_timer(timer) end - stop_watching(pfw) - end - - if (args[2] == 0) - if (args[1] == :poll) return 1 end - if (args[1] == :timeout) return 0 end - end - - error("error while polling") -end - -function watch_file(cb, s; poll=false) - if poll - pfw = PollingFileWatcher(cb,s) - start_watching(pfw) - return pfw - else - return FileMonitor(cb,s) - end -end function _uv_hook_close(uv::Union(AsyncStream,UVServer)) uv.handle = 0 @@ -487,7 +295,7 @@ _uv_hook_asynccb(async::AsyncWork, status::Int32) = async.cb(status) function start_timer(timer::TimeoutAsyncWork,timeout::Int64,repeat::Int64) associate_julia_struct(timer.handle,timer) ccall(:uv_update_time,Void,(Ptr{Void},),eventloop()) - ccall(:jl_timer_start,Int32,(Ptr{Void},Int64,Int64),timer.handle,timeout,repeat) + ccall(:jl_timer_start,Int32,(Ptr{Void},Int64,Int64),timer.handle,timeout+1,repeat) end function stop_timer(timer::TimeoutAsyncWork) diff --git a/base/sysimg.jl b/base/sysimg.jl index 759d89efc4c41..24e8cd088b164 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -109,6 +109,9 @@ importall .Printf include("serialize.jl") include("multi.jl") +# Polling (requires multi.jl) +include("poll.jl") + # system & environment include("libc.jl") include("env.jl") diff --git a/src/jl_uv.c b/src/jl_uv.c index 1f9cd5467f862..5de63df4b9b09 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -767,6 +767,21 @@ DLLEXPORT uv_lib_t *jl_wrap_raw_dl_handle(void *handle) return lib; } +#ifndef _OS_WINDOWS_ + +DLLEXPORT int jl_uv_unix_fd_is_watched(int fd, uv_poll_t *handle, uv_loop_t *loop) +{ + if(fd > loop->nwatchers) + return 0; + if(loop->watchers[fd] == NULL) + return 0; + if(loop->watchers[fd] == &handle->io_watcher) + return 0; + return 1; +} + +#endif + #ifdef __cplusplus } #endif diff --git a/test/file.jl b/test/file.jl index b65c3d07adabc..e8596c8b2dc46 100644 --- a/test/file.jl +++ b/test/file.jl @@ -94,12 +94,12 @@ function test_monitor(slval) end # Commented out the tests below due to issues 3015, 3016 and 3020 -#test_timeout(100) -#test_timeout(1000) -#test_touch(100) -#test_touch(1000) -#test_monitor(1000) -#test_monitor(100) +test_timeout(0.1) +test_timeout(1) +test_touch(0.1) +test_touch(1) +test_monitor(1) +test_monitor(0.1) diff --git a/test/pollfd.jl b/test/pollfd.jl index 71a854a9e277a..29ef0c1b9e1a1 100644 --- a/test/pollfd.jl +++ b/test/pollfd.jl @@ -5,7 +5,7 @@ pipe_fds = Array(Cint,2) @test 0 == ccall(:pipe, Cint, (Ptr{Cint},), pipe_fds) function test_poll(timeout_ms) - rc = poll_fd(OS_FD(pipe_fds[1]), UV_READABLE, timeout_ms) + rc = poll_fd(RawFD(pipe_fds[1]), timeout_ms; readable=true) produce(rc) end @@ -43,11 +43,10 @@ function test_read(slval) @test slval <= tdiff end -# Commented out the tests below due to issues 3015, 3016 and 3020 -#test_timeout(100) -#test_timeout(1000) -#test_read(100) -#test_read(1000) +test_timeout(.1) +test_timeout(1) +test_read(.1) +test_read(1) ccall(:close, Cint, (Cint,), pipe_fds[1]) ccall(:close, Cint, (Cint,), pipe_fds[2])