Skip to content

Commit

Permalink
Kernel: Consolidate timeout logic
Browse files Browse the repository at this point in the history
Allow passing in an optional timeout to Thread::block and move
the timeout check out of Thread::Blocker. This way all Blockers
implicitly support timeouts and don't need to implement it
themselves. Do however allow them to override timeouts (e.g.
for sockets).
  • Loading branch information
tomuta authored and awesomekling committed Aug 3, 2020
1 parent df52061 commit f4a5c9b
Show file tree
Hide file tree
Showing 15 changed files with 60 additions and 77 deletions.
6 changes: 3 additions & 3 deletions Kernel/FileSystem/Plan9FileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ KResult Plan9FS::post_message(Message& message)

while (size > 0) {
if (!description.can_write()) {
if (Thread::current()->block<Thread::WriteBlocker>(description).was_interrupted())
if (Thread::current()->block<Thread::WriteBlocker>(nullptr, description).was_interrupted())
return KResult(-EINTR);
}
ssize_t nwritten = description.write(data, size);
Expand All @@ -441,7 +441,7 @@ KResult Plan9FS::do_read(u8* data, size_t size)
auto& description = file_description();
while (size > 0) {
if (!description.can_read()) {
if (Thread::current()->block<Thread::ReadBlocker>(description).was_interrupted())
if (Thread::current()->block<Thread::ReadBlocker>(nullptr, description).was_interrupted())
return KResult(-EINTR);
}
ssize_t nread = description.read(data, size);
Expand Down Expand Up @@ -524,7 +524,7 @@ KResult Plan9FS::wait_for_specific_message(u16 tag, Message& out_message)
// Block until either:
// * Someone else reads the message we're waiting for, and hands it to us;
// * Or we become the one to read and dispatch messages.
if (Thread::current()->block<Plan9FS::Blocker>(completion).was_interrupted()) {
if (Thread::current()->block<Plan9FS::Blocker>(nullptr, completion).was_interrupted()) {
LOCKER(m_lock);
m_completions.remove(tag);
return KResult(-EINTR);
Expand Down
4 changes: 2 additions & 2 deletions Kernel/Net/IPv4Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ ssize_t IPv4Socket::receive_byte_buffered(FileDescription& description, void* bu
return -EAGAIN;

locker.unlock();
auto res = Thread::current()->block<Thread::ReadBlocker>(description);
auto res = Thread::current()->block<Thread::ReadBlocker>(nullptr, description);
locker.lock();

if (!m_can_read) {
Expand Down Expand Up @@ -296,7 +296,7 @@ ssize_t IPv4Socket::receive_packet_buffered(FileDescription& description, void*
}

locker.unlock();
auto res = Thread::current()->block<Thread::ReadBlocker>(description);
auto res = Thread::current()->block<Thread::ReadBlocker>(nullptr, description);
locker.lock();

if (!m_can_read) {
Expand Down
4 changes: 2 additions & 2 deletions Kernel/Net/LocalSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ KResult LocalSocket::connect(FileDescription& description, const sockaddr* addre
return KSuccess;
}

if (Thread::current()->block<Thread::ConnectBlocker>(description).was_interrupted()) {
if (Thread::current()->block<Thread::ConnectBlocker>(nullptr, description).was_interrupted()) {
m_connect_side_role = Role::None;
return KResult(-EINTR);
}
Expand Down Expand Up @@ -300,7 +300,7 @@ ssize_t LocalSocket::recvfrom(FileDescription& description, void* buffer, size_t
return -EAGAIN;
}
} else if (!can_read(description, 0)) {
if (Thread::current()->block<Thread::ReadBlocker>(description).was_interrupted())
if (Thread::current()->block<Thread::ReadBlocker>(nullptr, description).was_interrupted())
return -EINTR;
}
if (!has_attached_peer(description) && buffer_for_me.is_empty())
Expand Down
2 changes: 1 addition & 1 deletion Kernel/Net/TCPSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ KResult TCPSocket::protocol_connect(FileDescription& description, ShouldBlock sh
m_direction = Direction::Outgoing;

if (should_block == ShouldBlock::Yes) {
if (Thread::current()->block<Thread::ConnectBlocker>(description).was_interrupted())
if (Thread::current()->block<Thread::ConnectBlocker>(nullptr, description).was_interrupted())
return KResult(-EINTR);
ASSERT(setup_state() == SetupState::Completed);
if (has_error()) {
Expand Down
74 changes: 30 additions & 44 deletions Kernel/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <AK/QuickSort.h>
#include <AK/ScopeGuard.h>
#include <AK/TemporaryChange.h>
#include <AK/Time.h>
#include <Kernel/FileSystem/FileDescription.h>
#include <Kernel/Net/Socket.h>
#include <Kernel/Process.h>
Expand Down Expand Up @@ -131,26 +132,21 @@ bool Thread::ConnectBlocker::should_unblock(Thread&)
Thread::WriteBlocker::WriteBlocker(const FileDescription& description)
: FileDescriptionBlocker(description)
{
}

timespec* Thread::WriteBlocker::override_timeout(timespec* timeout)
{
auto& description = blocked_description();
if (description.is_socket()) {
auto& socket = *description.socket();
if (socket.has_send_timeout()) {
timeval deadline = Scheduler::time_since_boot();
deadline.tv_sec += socket.send_timeout().tv_sec;
deadline.tv_usec += socket.send_timeout().tv_usec;
deadline.tv_sec += (socket.send_timeout().tv_usec / 1000000) * 1;
deadline.tv_usec %= 1000000;
m_deadline = deadline;
timeval_to_timespec(Scheduler::time_since_boot(), m_deadline);
timespec_add_timeval(m_deadline, socket.send_timeout(), m_deadline);
if (!timeout || m_deadline < *timeout)
return &m_deadline;
}
}
}

bool Thread::WriteBlocker::should_unblock(Thread& thread, time_t now_sec, long now_usec)
{
if (m_deadline.has_value()) {
bool timed_out = now_sec > m_deadline.value().tv_sec || (now_sec == m_deadline.value().tv_sec && now_usec >= m_deadline.value().tv_usec);
return timed_out || blocked_description().can_write();
}
return should_unblock(thread);
return timeout;
}

bool Thread::WriteBlocker::should_unblock(Thread&)
Expand All @@ -161,26 +157,21 @@ bool Thread::WriteBlocker::should_unblock(Thread&)
Thread::ReadBlocker::ReadBlocker(const FileDescription& description)
: FileDescriptionBlocker(description)
{
}

timespec* Thread::ReadBlocker::override_timeout(timespec* timeout)
{
auto& description = blocked_description();
if (description.is_socket()) {
auto& socket = *description.socket();
if (socket.has_receive_timeout()) {
timeval deadline = Scheduler::time_since_boot();
deadline.tv_sec += socket.receive_timeout().tv_sec;
deadline.tv_usec += socket.receive_timeout().tv_usec;
deadline.tv_sec += (socket.receive_timeout().tv_usec / 1000000) * 1;
deadline.tv_usec %= 1000000;
m_deadline = deadline;
timeval_to_timespec(Scheduler::time_since_boot(), m_deadline);
timespec_add_timeval(m_deadline, socket.receive_timeout(), m_deadline);
if (!timeout || m_deadline < *timeout)
return &m_deadline;
}
}
}

bool Thread::ReadBlocker::should_unblock(Thread& thread, time_t now_sec, long now_usec)
{
if (m_deadline.has_value()) {
bool timed_out = now_sec > m_deadline.value().tv_sec || (now_sec == m_deadline.value().tv_sec && now_usec >= m_deadline.value().tv_usec);
return timed_out || blocked_description().can_read();
}
return should_unblock(thread);
return timeout;
}

bool Thread::ReadBlocker::should_unblock(Thread&)
Expand Down Expand Up @@ -210,24 +201,13 @@ bool Thread::SleepBlocker::should_unblock(Thread&)
return m_wakeup_time <= g_uptime;
}

Thread::SelectBlocker::SelectBlocker(const timespec& ts, bool select_has_timeout, const FDVector& read_fds, const FDVector& write_fds, const FDVector& except_fds)
: m_select_timeout(ts)
, m_select_has_timeout(select_has_timeout)
, m_select_read_fds(read_fds)
Thread::SelectBlocker::SelectBlocker(const FDVector& read_fds, const FDVector& write_fds, const FDVector& except_fds)
: m_select_read_fds(read_fds)
, m_select_write_fds(write_fds)
, m_select_exceptional_fds(except_fds)
{
}

bool Thread::SelectBlocker::should_unblock(Thread& thread, time_t now_sec, long now_usec)
{
if (m_select_has_timeout) {
if (now_sec > m_select_timeout.tv_sec || (now_sec == m_select_timeout.tv_sec && now_usec * 1000 >= m_select_timeout.tv_nsec))
return true;
}
return should_unblock(thread);
}

bool Thread::SelectBlocker::should_unblock(Thread& thread)
{
auto& process = thread.process();
Expand Down Expand Up @@ -317,10 +297,16 @@ void Thread::consider_unblock(time_t now_sec, long now_usec)
/* don't know, don't care */
return;
case Thread::Blocked:
{
ASSERT(m_blocker != nullptr);
if (m_blocker->should_unblock(*this, now_sec, now_usec))
timespec now;
now.tv_sec = now_sec,
now.tv_nsec = now_usec * 1000ull;
bool timed_out = m_blocker_timeout && now >= *m_blocker_timeout;
if (timed_out || m_blocker->should_unblock(*this))
unblock();
return;
}
case Thread::Skip1SchedulerPass:
set_state(Thread::Skip0SchedulerPasses);
return;
Expand Down
2 changes: 1 addition & 1 deletion Kernel/Syscall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ void syscall_handler(TrapFrame* trap)
current_thread->die_if_needed();

if (current_thread->has_unmasked_pending_signals())
(void)current_thread->block<Thread::SemiPermanentBlocker>(Thread::SemiPermanentBlocker::Reason::Signal);
(void)current_thread->block<Thread::SemiPermanentBlocker>(nullptr, Thread::SemiPermanentBlocker::Reason::Signal);
}

}
2 changes: 1 addition & 1 deletion Kernel/Syscalls/kill.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ KResult Process::do_killself(int signal)
auto current_thread = Thread::current();
if (!current_thread->should_ignore_signal(signal)) {
current_thread->send_signal(signal, this);
(void)current_thread->block<Thread::SemiPermanentBlocker>(Thread::SemiPermanentBlocker::Reason::Signal);
(void)current_thread->block<Thread::SemiPermanentBlocker>(nullptr, Thread::SemiPermanentBlocker::Reason::Signal);
}

return KSuccess;
Expand Down
2 changes: 1 addition & 1 deletion Kernel/Syscalls/read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ ssize_t Process::sys$read(int fd, Userspace<u8*> buffer, ssize_t size)
return -EISDIR;
if (description->is_blocking()) {
if (!description->can_read()) {
if (Thread::current()->block<Thread::ReadBlocker>(*description).was_interrupted())
if (Thread::current()->block<Thread::ReadBlocker>(nullptr, *description).was_interrupted())
return -EINTR;
if (!description->can_read())
return -EAGAIN;
Expand Down
4 changes: 2 additions & 2 deletions Kernel/Syscalls/select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ int Process::sys$select(const Syscall::SC_select_params* params)
#endif

if (!timeout || select_has_timeout) {
if (current_thread->block<Thread::SelectBlocker>(computed_timeout, select_has_timeout, rfds, wfds, efds).was_interrupted())
if (current_thread->block<Thread::SelectBlocker>(select_has_timeout ? &computed_timeout : nullptr, rfds, wfds, efds).was_interrupted())
return -EINTR;
// While we blocked, the process lock was dropped. This gave other threads
// the opportunity to mess with the memory. For example, it could free the
Expand Down Expand Up @@ -191,7 +191,7 @@ int Process::sys$poll(const Syscall::SC_poll_params* params)
#endif

if (!timeout || has_timeout) {
if (current_thread->block<Thread::SelectBlocker>(actual_timeout, has_timeout, rfds, wfds, Thread::SelectBlocker::FDVector()).was_interrupted())
if (current_thread->block<Thread::SelectBlocker>(has_timeout ? &actual_timeout : nullptr, rfds, wfds, Thread::SelectBlocker::FDVector()).was_interrupted())
return -EINTR;
}

Expand Down
2 changes: 1 addition & 1 deletion Kernel/Syscalls/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ int Process::sys$accept(int accepting_socket_fd, sockaddr* user_address, socklen

if (!socket.can_accept()) {
if (accepting_socket_description->is_blocking()) {
if (Thread::current()->block<Thread::AcceptBlocker>(*accepting_socket_description).was_interrupted())
if (Thread::current()->block<Thread::AcceptBlocker>(nullptr, *accepting_socket_description).was_interrupted())
return -EINTR;
} else {
return -EAGAIN;
Expand Down
2 changes: 1 addition & 1 deletion Kernel/Syscalls/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ int Process::sys$join_thread(int tid, void** exit_value)

// NOTE: pthread_join() cannot be interrupted by signals. Only by death.
for (;;) {
auto result = current_thread->block<Thread::JoinBlocker>(*thread, joinee_exit_value);
auto result = current_thread->block<Thread::JoinBlocker>(nullptr, *thread, joinee_exit_value);
if (result == Thread::BlockResult::InterruptedByDeath) {
// NOTE: This cleans things up so that Thread::finalize() won't
// get confused about a missing joiner when finalizing the joinee.
Expand Down
2 changes: 1 addition & 1 deletion Kernel/Syscalls/waitid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ KResultOr<siginfo_t> Process::do_waitid(idtype_t idtype, int id, int options)
return KResult(-EINVAL);
}

if (Thread::current()->block<Thread::WaitBlocker>(options, waitee_pid).was_interrupted())
if (Thread::current()->block<Thread::WaitBlocker>(nullptr, options, waitee_pid).was_interrupted())
return KResult(-EINTR);

ScopedSpinLock lock(g_processes_lock);
Expand Down
2 changes: 1 addition & 1 deletion Kernel/Syscalls/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ ssize_t Process::do_write(FileDescription& description, const u8* data, int data
#ifdef IO_DEBUG
dbg() << "block write on " << description.absolute_path();
#endif
if (Thread::current()->block<Thread::WriteBlocker>(description).was_interrupted()) {
if (Thread::current()->block<Thread::WriteBlocker>(nullptr, description).was_interrupted()) {
if (nwritten == 0)
return -EINTR;
}
Expand Down
4 changes: 2 additions & 2 deletions Kernel/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ u64 Thread::sleep(u64 ticks)
{
ASSERT(state() == Thread::Running);
u64 wakeup_time = g_uptime + ticks;
auto ret = Thread::current()->block<Thread::SleepBlocker>(wakeup_time);
auto ret = Thread::current()->block<Thread::SleepBlocker>(nullptr, wakeup_time);
if (wakeup_time > g_uptime) {
ASSERT(ret.was_interrupted());
}
Expand All @@ -219,7 +219,7 @@ u64 Thread::sleep(u64 ticks)
u64 Thread::sleep_until(u64 wakeup_time)
{
ASSERT(state() == Thread::Running);
auto ret = Thread::current()->block<Thread::SleepBlocker>(wakeup_time);
auto ret = Thread::current()->block<Thread::SleepBlocker>(nullptr, wakeup_time);
if (wakeup_time > g_uptime)
ASSERT(ret.was_interrupted());
return wakeup_time;
Expand Down
25 changes: 11 additions & 14 deletions Kernel/Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,10 @@ class Thread {
class Blocker {
public:
virtual ~Blocker() { }
virtual bool should_unblock(Thread& thread, time_t, long)
{
return should_unblock(thread);
}
virtual bool should_unblock(Thread&) = 0;
virtual const char* state_string() const = 0;
virtual bool is_reason_signal() const { return false; }
virtual timespec* override_timeout(timespec* timeout) { return timeout; }
void set_interrupted_by_death() { m_was_interrupted_by_death = true; }
bool was_interrupted_by_death() const { return m_was_interrupted_by_death; }
void set_interrupted_by_signal() { m_was_interrupted_while_blocked = true; }
Expand Down Expand Up @@ -184,23 +181,23 @@ class Thread {
class WriteBlocker final : public FileDescriptionBlocker {
public:
explicit WriteBlocker(const FileDescription&);
virtual bool should_unblock(Thread&, time_t, long) override;
virtual bool should_unblock(Thread&) override;
virtual const char* state_string() const override { return "Writing"; }
virtual timespec* override_timeout(timespec*) override;

private:
Optional<timeval> m_deadline;
timespec m_deadline;
};

class ReadBlocker final : public FileDescriptionBlocker {
public:
explicit ReadBlocker(const FileDescription&);
virtual bool should_unblock(Thread&, time_t, long) override;
virtual bool should_unblock(Thread&) override;
virtual const char* state_string() const override { return "Reading"; }
virtual timespec* override_timeout(timespec*) override;

private:
Optional<timeval> m_deadline;
timespec m_deadline;
};

class ConditionBlocker final : public Blocker {
Expand All @@ -227,14 +224,11 @@ class Thread {
class SelectBlocker final : public Blocker {
public:
typedef Vector<int, FD_SETSIZE> FDVector;
SelectBlocker(const timespec& ts, bool select_has_timeout, const FDVector& read_fds, const FDVector& write_fds, const FDVector& except_fds);
virtual bool should_unblock(Thread&, time_t, long) override;
SelectBlocker(const FDVector& read_fds, const FDVector& write_fds, const FDVector& except_fds);
virtual bool should_unblock(Thread&) override;
virtual const char* state_string() const override { return "Selecting"; }

private:
timespec m_select_timeout;
bool m_select_has_timeout { false };
const FDVector& m_select_read_fds;
const FDVector& m_select_write_fds;
const FDVector& m_select_exceptional_fds;
Expand Down Expand Up @@ -345,7 +339,7 @@ class Thread {
};

template<typename T, class... Args>
[[nodiscard]] BlockResult block(Args&&... args)
[[nodiscard]] BlockResult block(timespec* timeout, Args&&... args)
{
T t(forward<Args>(args)...);

Expand All @@ -361,6 +355,7 @@ class Thread {
}

m_blocker = &t;
m_blocker_timeout = t.override_timeout(timeout);
set_state(Thread::Blocked);
}

Expand All @@ -373,6 +368,7 @@ class Thread {

// Remove ourselves...
m_blocker = nullptr;
m_blocker_timeout = nullptr;

if (t.was_interrupted_by_signal())
return BlockResult::InterruptedBySignal;
Expand All @@ -385,7 +381,7 @@ class Thread {

[[nodiscard]] BlockResult block_until(const char* state_string, Function<bool()>&& condition)
{
return block<ConditionBlocker>(state_string, move(condition));
return block<ConditionBlocker>(nullptr, state_string, move(condition));
}

BlockResult wait_on(WaitQueue& queue, const char* reason, timeval* timeout = nullptr, Atomic<bool>* lock = nullptr, Thread* beneficiary = nullptr);
Expand Down Expand Up @@ -552,6 +548,7 @@ class Thread {
size_t m_thread_specific_region_size { 0 };
SignalActionData m_signal_action_data[32];
Blocker* m_blocker { nullptr };
timespec* m_blocker_timeout { nullptr };
const char* m_wait_reason { nullptr };

bool m_is_active { false };
Expand Down

0 comments on commit f4a5c9b

Please sign in to comment.