Skip to content

Commit

Permalink
runtime: adjust netpollWaiters after goroutines are ready
Browse files Browse the repository at this point in the history
The runtime was adjusting netpollWaiters before the waiting
goroutines were marked as ready. This could cause the scheduler
to report a deadlock because there were no goroutines ready to run.
Keeping netpollWaiters non-zero ensures that at least one goroutine
will call netpoll(-1) from findRunnable.

This does mean that if a program has network activity for a while
and then never has it again, and also has no timers, then we can leave
an M stranded in a call to netpoll from which it will never return.
At least this won't be a common case. And it's not new; this has been
a potential problem for some time.

Fixes #61454

Change-Id: I17c7f891c2bb1262fda12c6929664e64686463c8
Reviewed-on: https://go-review.googlesource.com/c/go/+/511455
TryBot-Result: Gopher Robot <[email protected]>
Run-TryBot: Ian Lance Taylor <[email protected]>
Reviewed-by: Michael Knyszek <[email protected]>
Auto-Submit: Ian Lance Taylor <[email protected]>
Reviewed-by: Heschi Kreinick <[email protected]>
  • Loading branch information
ianlancetaylor authored and gopherbot committed Jul 20, 2023
1 parent 890b96f commit f51c55b
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 59 deletions.
58 changes: 44 additions & 14 deletions src/runtime/netpoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
// func netpollclose(fd uintptr) int32
// Disable notifications for fd. Return an errno value.
//
// func netpoll(delta int64) gList
// func netpoll(delta int64) (gList, int32)
// Poll the network. If delta < 0, block indefinitely. If delta == 0,
// poll without blocking. If delta > 0, block for up to delta nanoseconds.
// Return a list of goroutines built by calling netpollready.
// Return a list of goroutines built by calling netpollready,
// and a delta to add to netpollWaiters when all goroutines are ready.
// This will never return an empty list with a non-zero delta.
//
// func netpollBreak()
// Wake up the network poller, assumed to be blocked in netpoll.
Expand Down Expand Up @@ -426,12 +428,13 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
}
// If we set the new deadline in the past, unblock currently pending IO if any.
// Note that pd.publishInfo has already been called, above, immediately after modifying rd and wd.
delta := int32(0)
var rg, wg *g
if pd.rd < 0 {
rg = netpollunblock(pd, 'r', false)
rg = netpollunblock(pd, 'r', false, &delta)
}
if pd.wd < 0 {
wg = netpollunblock(pd, 'w', false)
wg = netpollunblock(pd, 'w', false, &delta)
}
unlock(&pd.lock)
if rg != nil {
Expand All @@ -440,6 +443,7 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
if wg != nil {
netpollgoready(wg, 3)
}
netpollAdjustWaiters(delta)
}

//go:linkname poll_runtime_pollUnblock internal/poll.runtime_pollUnblock
Expand All @@ -453,8 +457,9 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
pd.wseq++
var rg, wg *g
pd.publishInfo()
rg = netpollunblock(pd, 'r', false)
wg = netpollunblock(pd, 'w', false)
delta := int32(0)
rg = netpollunblock(pd, 'r', false, &delta)
wg = netpollunblock(pd, 'w', false, &delta)
if pd.rt.f != nil {
deltimer(&pd.rt)
pd.rt.f = nil
Expand All @@ -470,6 +475,7 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
if wg != nil {
netpollgoready(wg, 3)
}
netpollAdjustWaiters(delta)
}

// netpollready is called by the platform-specific netpoll function.
Expand All @@ -478,23 +484,27 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
// whether the fd is ready for reading or writing or both.
//
// This returns a delta to apply to netpollWaiters.
//
// This may run while the world is stopped, so write barriers are not allowed.
//
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
delta := int32(0)
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
rg = netpollunblock(pd, 'r', true, &delta)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
wg = netpollunblock(pd, 'w', true, &delta)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
return delta
}

func netpollcheckerr(pd *pollDesc, mode int32) int {
Expand All @@ -520,7 +530,7 @@ func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
netpollWaiters.Add(1)
netpollAdjustWaiters(1)
}
return r
}
Expand Down Expand Up @@ -570,7 +580,13 @@ func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
return old == pdReady
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
// netpollunblock moves either pd.rg (if mode == 'r') or
// pd.wg (if mode == 'w') into the pdReady state.
// This returns any goroutine blocked on pd.{rg,wg}.
// It adds any adjustment to netpollWaiters to *delta;
// this adjustment should be applied after the goroutine has
// been marked ready.
func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
Expand All @@ -594,7 +610,7 @@ func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
if old == pdWait {
old = pdNil
} else if old != pdNil {
netpollWaiters.Add(-1)
*delta -= 1
}
return (*g)(unsafe.Pointer(old))
}
Expand All @@ -614,14 +630,15 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
unlock(&pd.lock)
return
}
delta := int32(0)
var rg *g
if read {
if pd.rd <= 0 || pd.rt.f == nil {
throw("runtime: inconsistent read deadline")
}
pd.rd = -1
pd.publishInfo()
rg = netpollunblock(pd, 'r', false)
rg = netpollunblock(pd, 'r', false, &delta)
}
var wg *g
if write {
Expand All @@ -630,7 +647,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
}
pd.wd = -1
pd.publishInfo()
wg = netpollunblock(pd, 'w', false)
wg = netpollunblock(pd, 'w', false, &delta)
}
unlock(&pd.lock)
if rg != nil {
Expand All @@ -639,6 +656,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
if wg != nil {
netpollgoready(wg, 0)
}
netpollAdjustWaiters(delta)
}

func netpollDeadline(arg any, seq uintptr) {
Expand All @@ -653,6 +671,18 @@ func netpollWriteDeadline(arg any, seq uintptr) {
netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
}

// netpollAnyWaiters reports whether any goroutines are waiting for I/O.
func netpollAnyWaiters() bool {
return netpollWaiters.Load() > 0
}

// netpollAdjustWaiters adds delta to netpollWaiters.
func netpollAdjustWaiters(delta int32) {
if delta != 0 {
netpollWaiters.Add(delta)
}
}

func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
if c.first == nil {
Expand Down
11 changes: 6 additions & 5 deletions src/runtime/netpoll_aix.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ func netpollBreak() {
// delay > 0: block for up to that many nanoseconds
//
//go:nowritebarrierrec
func netpoll(delay int64) gList {
func netpoll(delay int64) (gList, int32) {
var timeout uintptr
if delay < 0 {
timeout = ^uintptr(0)
} else if delay == 0 {
// TODO: call poll with timeout == 0
return gList{}
return gList{}, 0
} else if delay < 1e6 {
timeout = 1
} else if delay < 1e15 {
Expand All @@ -186,7 +186,7 @@ retry:
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if timeout > 0 {
return gList{}
return gList{}, 0
}
goto retry
}
Expand All @@ -206,6 +206,7 @@ retry:
n--
}
var toRun gList
delta := int32(0)
for i := 1; i < len(pfds) && n > 0; i++ {
pfd := &pfds[i]

Expand All @@ -220,10 +221,10 @@ retry:
}
if mode != 0 {
pds[i].setEventErr(pfd.revents == _POLLERR, 0)
netpollready(&toRun, pds[i], mode)
delta += netpollready(&toRun, pds[i], mode)
n--
}
}
unlock(&mtxset)
return toRun
return toRun, delta
}
11 changes: 6 additions & 5 deletions src/runtime/netpoll_epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ func netpollBreak() {
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
func netpoll(delay int64) (gList, int32) {
if epfd == -1 {
return gList{}
return gList{}, 0
}
var waitms int32
if delay < 0 {
Expand All @@ -124,11 +124,12 @@ retry:
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
return gList{}, 0
}
goto retry
}
var toRun gList
delta := int32(0)
for i := int32(0); i < n; i++ {
ev := events[i]
if ev.Events == 0 {
Expand Down Expand Up @@ -164,9 +165,9 @@ retry:
tag := tp.tag()
if pd.fdseq.Load() == tag {
pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
netpollready(&toRun, pd, mode)
delta += netpollready(&toRun, pd, mode)
}
}
}
return toRun
return toRun, delta
}
4 changes: 2 additions & 2 deletions src/runtime/netpoll_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ func netpollarm(pd *pollDesc, mode int) {
func netpollBreak() {
}

func netpoll(delay int64) gList {
return gList{}
func netpoll(delay int64) (gList, int32) {
return gList{}, 0
}
11 changes: 6 additions & 5 deletions src/runtime/netpoll_kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ func netpollBreak() {
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
func netpoll(delay int64) (gList, int32) {
if kq == -1 {
return gList{}
return gList{}, 0
}
var tp *timespec
var ts timespec
Expand All @@ -147,11 +147,12 @@ retry:
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if delay > 0 {
return gList{}
return gList{}, 0
}
goto retry
}
var toRun gList
delta := int32(0)
for i := 0; i < int(n); i++ {
ev := &events[i]

Expand Down Expand Up @@ -208,8 +209,8 @@ retry:
}
}
pd.setEventErr(ev.flags == _EV_ERROR, tag)
netpollready(&toRun, pd, mode)
delta += netpollready(&toRun, pd, mode)
}
}
return toRun
return toRun, delta
}
11 changes: 6 additions & 5 deletions src/runtime/netpoll_solaris.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ func netpollBreak() {
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
func netpoll(delay int64) (gList, int32) {
if portfd == -1 {
return gList{}
return gList{}, 0
}

var wait *timespec
Expand Down Expand Up @@ -259,12 +259,13 @@ retry:
// If a timed sleep was interrupted and there are no events,
// just return to recalculate how long we should sleep now.
if delay > 0 {
return gList{}
return gList{}, 0
}
goto retry
}

var toRun gList
delta := int32(0)
for i := 0; i < int(n); i++ {
ev := &events[i]

Expand Down Expand Up @@ -324,9 +325,9 @@ retry:
// about the event port on SmartOS.
//
// See golang.org/x/issue/30840.
netpollready(&toRun, pd, mode)
delta += netpollready(&toRun, pd, mode)
}
}

return toRun
return toRun, delta
}
12 changes: 9 additions & 3 deletions src/runtime/netpoll_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package runtime
import "runtime/internal/atomic"

var netpollInited atomic.Uint32
var netpollWaiters atomic.Uint32

var netpollStubLock mutex
var netpollNote note
Expand All @@ -34,7 +33,7 @@ func netpollBreak() {

// Polls for ready network connections.
// Returns list of goroutines that become runnable.
func netpoll(delay int64) gList {
func netpoll(delay int64) (gList, int32) {
// Implementation for platforms that do not support
// integrated network poller.
if delay != 0 {
Expand All @@ -53,9 +52,16 @@ func netpoll(delay int64) gList {
// (eg when running TestNetpollBreak).
osyield()
}
return gList{}
return gList{}, 0
}

func netpollinited() bool {
return netpollInited.Load() != 0
}

func netpollAnyWaiters() bool {
return false
}

func netpollAdjustWaiters(delta int32) {
}
Loading

0 comments on commit f51c55b

Please sign in to comment.