Skip to content

Commit

Permalink
harvester: Pause plot paths on transfer failures
Browse files Browse the repository at this point in the history
The harvester will now pause a specific plot path for 5 minutes if the
write or open operations fails. These are the two cases where the
failure is most likely related to the disk/filesystem and could warrant
skipping it so as to not get stuck.

Future changes should involve periodically updating stats and disabling
on a failure.
  • Loading branch information
krobertson committed Mar 25, 2024
1 parent 723e810 commit 969f025
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 30 deletions.
45 changes: 45 additions & 0 deletions cli/harvester/plotpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
package harvester

import (
"cmp"
"slices"
"sync"
"sync/atomic"
"time"

"golang.org/x/sys/unix"
)

type plotPath struct {
path string
busy atomic.Bool
paused atomic.Bool
freeSpace uint64
totalSpace uint64
mutex sync.Mutex
Expand All @@ -27,3 +31,44 @@ func (p *plotPath) updateFreeSpace() {
p.freeSpace = stat.Bavail * uint64(stat.Bsize)
p.totalSpace = stat.Blocks * uint64(stat.Bsize)
}

// pause is used to temporarily pause selecting the specified path as an option
// for storing plots. This is primarily used if storing a plot fails. It may be
// an intermittiend issue, but this allows retrying it later.
func (p *plotPath) pause() {
p.paused.Store(true)
time.AfterFunc(5*time.Minute, func() {
p.paused.Store(false)
})
}

// sortPaths will update the order of the plotPaths inside the harvester's
// sortedPaths slice. This should be done after every file transfer when the
// free space is updated.
func (h *harvester) sortPaths() {
h.sortMutex.Lock()
defer h.sortMutex.Unlock()

slices.SortStableFunc(h.sortedPlots, func(a, b *plotPath) int {
return cmp.Compare(b.freeSpace, a.freeSpace)
})
}

// pickPlot will return which plot path would be most ideal for the current
// request. It will order the one with the most free space that doesn't already
// have an active transfer.
func (h *harvester) pickPlot() *plotPath {
h.sortMutex.Lock()
defer h.sortMutex.Unlock()

for _, v := range h.sortedPlots {
if v.busy.Load() {
continue
}
if v.paused.Load() {
continue
}
return v
}
return nil
}
32 changes: 2 additions & 30 deletions cli/harvester/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
package harvester

import (
"cmp"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -223,6 +221,7 @@ func (h *harvester) httpHandler(w http.ResponseWriter, req *http.Request) {
if err != nil {
log.Printf("Failed to open file at %s: %v", req.URL.Path, err)
w.WriteHeader(500)
plotPath.pause()
return
}
defer f.Close()
Expand All @@ -236,6 +235,7 @@ func (h *harvester) httpHandler(w http.ResponseWriter, req *http.Request) {
f.Close()
os.Remove(req.URL.Path)
w.WriteHeader(500)
plotPath.pause()
return
}

Expand All @@ -250,34 +250,6 @@ func (h *harvester) httpHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(201)
}

// sortPaths will update the order of the plotPaths inside the harvester's
// sortedPaths slice. This should be done after every file transfer when the
// free space is updated.
func (h *harvester) sortPaths() {
h.sortMutex.Lock()
defer h.sortMutex.Unlock()

slices.SortStableFunc(h.sortedPlots, func(a, b *plotPath) int {
return cmp.Compare(b.freeSpace, a.freeSpace)
})
}

// pickPlot will return which plot path would be most ideal for the current
// request. It will order the one with the most free space that doesn't already
// have an active transfer.
func (h *harvester) pickPlot() *plotPath {
h.sortMutex.Lock()
defer h.sortMutex.Unlock()

for _, v := range h.sortedPlots {
if v.busy.Load() {
continue
}
return v
}
return nil
}

// generateTaint will calculate how long to delay the response based on current
// system pressure. This can be used to organically load balance in a cluster,
// allowing more preferencial hosts to respond faster.
Expand Down

0 comments on commit 969f025

Please sign in to comment.