Skip to content

Commit

Permalink
FileWatching: ensure PollingFileWatcher reports StatStruct correctly
Browse files Browse the repository at this point in the history
Previously, we might miss an prev=>current update event while waiting in the workqueue.
Now a careful application can ensure it always has the most up-to-date stat,
without needing to do an extra call to Stat after the notify.
Obviously, you still must be careful to initialize this before attempting the initial
operation (to ensure the previous Stat is the intended one).
  • Loading branch information
vtjnash committed Mar 23, 2018
1 parent 19e4afd commit 5f0052b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 31 deletions.
66 changes: 36 additions & 30 deletions stdlib/FileWatching/src/FileWatching.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export
PollingFileWatcher,
FDWatcher

import Base: @handle_as, wait, close, uvfinalize, eventloop, notify_error, stream_wait,
import Base: @handle_as, wait, close, eventloop, notify_error, stream_wait,
_sizeof_uv_poll, _sizeof_uv_fs_poll, _sizeof_uv_fs_event, _uv_hook_close, uv_error, UVError,
associate_julia_struct, disassociate_julia_struct, isreadable, iswritable, |
import Base.Filesystem.StatStruct
Expand Down Expand Up @@ -86,10 +86,12 @@ mutable struct PollingFileWatcher
interval::UInt32
notify::Condition
active::Bool
busy_polling::Int32
function PollingFileWatcher(file::AbstractString, interval::Float64=5.007) # same default as nodejs
curr_error::Int32
curr_stat::StatStruct
PollingFileWatcher(file::AbstractString, interval::Float64=5.007) = PollingFileWatcher(String(file), interval)
function PollingFileWatcher(file::String, interval::Float64=5.007) # same default as nodejs
handle = Libc.malloc(_sizeof_uv_fs_poll)
this = new(handle, file, round(UInt32, interval * 1000), Condition(), false, 0)
this = new(handle, file, round(UInt32, interval * 1000), Condition(), false, 0, StatStruct())
associate_julia_struct(handle, this)
err = ccall(:uv_fs_poll_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle)
if err != 0
Expand All @@ -110,7 +112,7 @@ mutable struct _FDWatcher
events::Int32

let FDWatchers = Vector{Any}()
global _FDWatcher
global _FDWatcher, uvfinalize
@static if Sys.isunix()
function _FDWatcher(fd::RawFD, readable::Bool, writable::Bool)
if !readable && !writable
Expand Down Expand Up @@ -150,7 +152,7 @@ mutable struct _FDWatcher
end
end

function Base.uvfinalize(t::_FDWatcher)
function uvfinalize(t::_FDWatcher)
if t.handle != C_NULL
disassociate_julia_struct(t)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle)
Expand Down Expand Up @@ -255,7 +257,7 @@ end
function _uv_hook_close(uv::PollingFileWatcher)
uv.handle = C_NULL
uv.active = false
notify(uv.notify, (StatStruct(), EOFError()))
notify(uv.notify, StatStruct())
nothing
end

Expand Down Expand Up @@ -304,22 +306,24 @@ end

function uv_fspollcb(handle::Ptr{Cvoid}, status::Int32, prev::Ptr, curr::Ptr)
t = @handle_as handle PollingFileWatcher
if status == 0 || status != t.busy_polling
t.busy_polling = status
old_status = t.curr_error
t.curr_error = status
if status == 0
t.curr_stat = StatStruct(convert(Ptr{UInt8}, curr))
end
if status == 0 || status != old_status
prev_stat = StatStruct(convert(Ptr{UInt8}, prev))
curr_stat = (status == 0) ? StatStruct(convert(Ptr{UInt8}, curr)) : UVError("PollingFileWatcher", status)
notify(t.notify, (prev_stat, curr_stat))
notify(t.notify, prev_stat)
end
nothing
end


function start_watching(t::_FDWatcher)
readable = t.refcount[1] > 0
writable = t.refcount[2] > 0
if t.active[1] != readable || t.active[2] != writable
# make sure the READABLE / WRITEABLE state is updated
uv_error("start_watching (File Handle)",
uv_error("FDWatcher (start)",
ccall(:uv_poll_start, Int32, (Ptr{Cvoid}, Int32, Ptr{Cvoid}),
t.handle,
(readable ? UV_READABLE : 0) | (writable ? UV_WRITABLE : 0),
Expand All @@ -331,7 +335,7 @@ end

function start_watching(t::PollingFileWatcher)
if !t.active
uv_error("start_watching (File Path)",
uv_error("PollingFileWatcher (start)",
ccall(:uv_fs_poll_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, UInt32),
t.handle, uv_jl_fspollcb::Ptr{Cvoid}, t.file, t.interval))
t.active = true
Expand All @@ -342,15 +346,15 @@ end
function stop_watching(t::PollingFileWatcher)
if t.active && isempty(t.notify.waitq)
t.active = false
uv_error("stop_watching (File Path)",
uv_error("PollingFileWatcher (stop)",
ccall(:uv_fs_poll_stop, Int32, (Ptr{Cvoid},), t.handle))
end
nothing
end

function start_watching(t::FileMonitor)
if !t.active
uv_error("start_watching (File Monitor)",
uv_error("FileMonitor (start)",
ccall(:uv_fs_event_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32),
t.handle, uv_jl_fseventscb::Ptr{Cvoid}, t.file, 0))
t.active = true
Expand All @@ -361,20 +365,21 @@ end
function stop_watching(t::FileMonitor)
if t.active && isempty(t.notify.waitq)
t.active = false
uv_error("stop_watching (File Monitor)",
uv_error("FileMonitor (stop)",
ccall(:uv_fs_event_stop, Int32, (Ptr{Cvoid},), t.handle))
end
nothing
end

function wait(fdw::FDWatcher)
return wait(fdw.watcher, readable = fdw.readable, writable = fdw.writable)
GC.@preserve fdw begin
return wait(fdw.watcher, readable = fdw.readable, writable = fdw.writable)
end
end
function wait(fdw::_FDWatcher; readable=true, writable=true)
events = FDEvent(Int32(0))
while true
if isa(events, FDEvent)
events = events::FDEvent
events |= FDEvent(fdw.events)
haveevent = false
if readable && isreadable(events)
Expand All @@ -395,7 +400,7 @@ function wait(fdw::_FDWatcher; readable=true, writable=true)
events = EOFError()
else
start_watching(fdw) # make sure the poll is active
events = wait(fdw.notify)
events = stream_wait(fdw, fdw.notify)::FDEvent
end
end
end
Expand All @@ -422,16 +427,22 @@ end

function wait(pfw::PollingFileWatcher)
start_watching(pfw)
prevstat, currstat = stream_wait(pfw, pfw.notify)
prevstat = stream_wait(pfw, pfw.notify)::StatStruct
stop_watching(pfw)
return prevstat, currstat
if pfw.handle == C_NULL
return prevstat, EOFError()
elseif pfw.curr_error != 0
return prevstat, UVError("PollingFileWatcher", pfw.curr_error)
else
return prevstat, pfw.curr_stat
end
end

function wait(m::FileMonitor)
start_watching(m)
filename, events = stream_wait(m, m.notify)
filename, events = stream_wait(m, m.notify)::Tuple{String, FileEvent}
stop_watching(m)
return filename, events
return (filename, events)
end

"""
Expand Down Expand Up @@ -462,7 +473,6 @@ function poll_fd(s::Union{RawFD, Sys.iswindows() ? WindowsRawSocket : Union{}},
end
notify(wt)
end

wait(wt)
return result
else
Expand Down Expand Up @@ -523,7 +533,7 @@ function poll_file(s::AbstractString, interval_seconds::Real=5.007, timeout_s::R
@schedule (sleep(timeout_s); close(pfw))
end
statdiff = wait(pfw)
if statdiff[1] == StatStruct() && isa(statdiff[2], UVError)
if isa(statdiff[2], UVError)
# file didn't initially exist, continue watching for it to be created (or the error to change)
statdiff = wait(pfw)
end
Expand All @@ -533,8 +543,4 @@ function poll_file(s::AbstractString, interval_seconds::Real=5.007, timeout_s::R
end
end

# deprecations

stop_watching(stream::_FDWatcher) = Base.depwarn("stop_watching(::_FDWatcher) should not be used", :stop_watching)

end
3 changes: 2 additions & 1 deletion stdlib/FileWatching/test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ test_monitor_wait_poll()
test_watch_file_timeout(0.1)
test_watch_file_change(6)

@test_throws Base.UVError watch_file("____nonexistent_file", 10)
@test_throws(Base.UVError("FileMonitor (start)", Base.UV_ENOENT),
watch_file("____nonexistent_file", 10))
@test(@elapsed(
@test(poll_file("____nonexistent_file", 1, 3.1) ===
(Base.Filesystem.StatStruct(), EOFError()))) > 3)
Expand Down

0 comments on commit 5f0052b

Please sign in to comment.