Skip to content

Commit

Permalink
Initial POC checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
krobertson committed Mar 24, 2024
0 parents commit 50a7e79
Show file tree
Hide file tree
Showing 14 changed files with 786 additions and 0 deletions.
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# chia-garden

chia-garden is a utility to handle the migration of plot files for the Chia
blockchain.

It can be used to transfer plot files from dedicated plotting nodes to available
harvester nodes, optimizing for even file distribution and network conjestion.
It can work across a set of many plotters and many harvesters by communicating
over a dedicated message bus utilizing NATS.

### How does it handle distribution?

When a new plot file is found on a harvester, it will publish a message to NATS
notifying harvesters of the file being available and noting its size.

All the available harvesters will receive this message and generate a response
with a URL for the plotter to send the file to the harvester over HTTP.

Whichever harvester responds first to the plotter is the one which will have the
file sent to it.

Since the plotter only takes the first response, even distribution can be
naturally achieved by the harvesters responding fast or intentionally responding
slow. For instance:

* If the current harvester is already transferring a few plots, its network
traffic is going to be pegged, so it would be less ideal to receive it. It can
delay responding by 10ms for each active transfer it already has.
* If the current harvester's disk is getting closer to filling, it might be less
ideal to plot compared to a harvester with a completely empty disk. So it can
add a 5-10ms delay to when it responds.
* If the harvester has no transfers and plenty of storage, no delay, respond
immediately!

### Optimizations Applied

A number of optimizations are instrumented in how the transfers are performed.

1. The harvester nodes will prioritize disks with more free space. With this,
disks will typically fill at an even rate rather than one at a time.
1. It will only allow one plot to be written to a disk at a time. This is to
avoid fragmentation on the disk caused by concurrent plots being written.
1. Harvesters with more transfers will be deprioritized. This is to limit
network conjestion in cases where multiple plotters are are transferring at
the same time.
49 changes: 49 additions & 0 deletions cmd/harvester/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"flag"
"log"

"github.com/krobertson/chia-garden/pkg/rpc"
"github.com/krobertson/chia-garden/pkg/utils"

"github.com/nats-io/nats.go"

_ "net/http/pprof"
)

var (
maxTransfers int64 = 5
)

func main() {
var plotPaths utils.ArrayFlags

natsUrl := flag.String("nats", nats.DefaultURL, "NATS connection string")
flag.Var(&plotPaths, "plot", "Plots directories")
flag.Int64Var(&maxTransfers, "max-transfers", 5, "max concurrent transfers")
flag.Parse()

log.Print("Starting harvester-client...")

conn, err := nats.Connect(*natsUrl, nats.MaxReconnects(-1))
if err != nil {
log.Fatal("Failed to connect to NATS: ", err)
}
defer conn.Close()

server, err := newHarvester(plotPaths)
if err != nil {
log.Fatal("Failed to initialize harvester: ", err)
}

// initialize the rpc
_, err = rpc.NewNatsHarvesterListener(conn, server)
if err != nil {
log.Fatal("Failed to initialize NATS listener: ", err)
}

// Block main goroutine forever.
log.Print("Ready")
<-make(chan struct{})
}
27 changes: 27 additions & 0 deletions cmd/harvester/plotpath.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"sync"
"sync/atomic"

"golang.org/x/sys/unix"
)

type plotPath struct {
path string
busy atomic.Bool
freeSpace uint64
totalSpace uint64
mutex sync.Mutex
}

// updateFreeSpace will get the filesystem stats and update the free and total
// space on the plotPath. This primarily should be done with the plotPath mutex
// locked.
func (p *plotPath) updateFreeSpace() {
var stat unix.Statfs_t
unix.Statfs(p.path, &stat)

p.freeSpace = stat.Bavail * uint64(stat.Bsize)
p.totalSpace = stat.Blocks * uint64(stat.Bsize)
}
245 changes: 245 additions & 0 deletions cmd/harvester/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package main

import (
"cmp"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/krobertson/chia-garden/pkg/types"
"github.com/krobertson/chia-garden/pkg/utils"

"github.com/dustin/go-humanize"
)

const (
taintTransfers = 20 * time.Millisecond
taintFreeSpace = 20 * time.Millisecond
)

type harvester struct {
plots map[string]*plotPath
sortedPlots []*plotPath
sortMutex sync.Mutex
hostPort string
transfers atomic.Int64
}

// newHarvester will create a the harvester server process and validate all of
// the provided plot paths. It will return an error if any of the paths do not
// exist, or are not a directory.
func newHarvester(paths []string) (*harvester, error) {
h := &harvester{
plots: make(map[string]*plotPath),
sortedPlots: make([]*plotPath, len(paths)),
hostPort: fmt.Sprintf("%s:3434", utils.GetHostIP().String()),
}

// ensure we have at least one
if len(paths) == 0 {
return nil, fmt.Errorf("at least one plot path must be specified")
}

// validate the plots exist and add them in
for i, p := range paths {
p, err := filepath.Abs(p)
if err != nil {
return nil, fmt.Errorf("path %s failed expansion: %v", p, err)
}

fi, err := os.Stat(p)
if err != nil {
return nil, fmt.Errorf("path %s failed validation: %v", p, err)
}

if !fi.IsDir() {
return nil, fmt.Errorf("path %s is not a directory", p)
}

pp := &plotPath{path: p}
pp.updateFreeSpace()
h.plots[p] = pp
h.sortedPlots[i] = pp
}

// sort the paths
h.sortPaths()

// FIXME ideally handle graceful shutdown of existing transfers
http.HandleFunc("/", h.httpHandler)
go http.ListenAndServe(":3434", nil)

return h, nil
}

// PlotReady processes a request from a plotter to transfer a plot to the
// harvester. It will generate a response, but then momentarily sleep as a
// slight taint to allow the most ideal system to originally respond the
// fastest.
func (h *harvester) PlotReady(req *types.PlotRequest) (*types.PlotResponse, error) {
// pick a plot. This should return the one with the most free space that
// isn't busy.
plot := h.pickPlot()
if plot == nil {
return nil, fmt.Errorf("no paths available")
}

// check if we have enough free space
if plot.freeSpace <= req.Size {
return nil, nil
}

// generate response
resp := &types.PlotResponse{
Url: fmt.Sprintf("http:https://%s%s", h.hostPort, filepath.Join(plot.path, req.Name)),
}

// generate and handle the taint
d := h.generateTaint(plot)
time.Sleep(d)
return resp, nil
}

// httpHandler faciliates the transfer of plot files from the plotters to the
// harvesters. It encapculates a single request and is ran within its own
// goroutine. It will respond with a 201 on success and a relevant error code on
// failure. A failure should trigger the plotter to re-request storage.
func (h *harvester) httpHandler(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()

// get the plot path and ensure it exists
base := filepath.Dir(req.URL.Path)
plotPath, exists := h.plots[base]
if !exists {
log.Printf("Request to store in %s, but does not exist", base)
w.WriteHeader(404)
return
}

// check if we're maxed on concurrent transfers
if h.transfers.Load() >= maxTransfers {
log.Printf("Request to store in %s, but at max transfers", base)
w.WriteHeader(503)
return
}

// make sure the disk isn't already being written to. this helps to avoid
// file fragmentation
if plotPath.busy.Load() {
log.Printf("Request to store %s, but already trasnferring", req.URL.Path)
w.WriteHeader(503)
return
}

// make sure we have the content length
if req.ContentLength == 0 {
w.WriteHeader(411)
return
}

// lock the file path
plotPath.mutex.Lock()
defer plotPath.mutex.Unlock()
plotPath.busy.Store(true)
defer plotPath.busy.Store(false)
h.transfers.Add(1)
defer h.transfers.Add(-1)

// check if we have enough free space
if plotPath.freeSpace <= uint64(req.ContentLength) {
log.Printf("Request to store %s, but not enough space (%s / %s)",
req.URL.Path, humanize.Bytes(uint64(req.ContentLength)), humanize.Bytes(plotPath.freeSpace))
w.WriteHeader(413)
}

// validate the file doesn't already exist, as a safeguard
fi, _ := os.Stat(req.URL.Path)
if fi != nil {
log.Printf("File at %s already exists", req.URL.Path)
w.WriteHeader(409)
return
}

// open the file and transfer
f, err := os.Create(req.URL.Path)
if err != nil {
log.Printf("Failed to open file at %s: %v", req.URL.Path, err)
w.WriteHeader(500)
return
}
defer f.Close()

// perform the copy
start := time.Now()
bytes, err := io.Copy(f, req.Body)
if err != nil {
log.Printf("Failure while writing plot %s: %v", req.URL.Path, err)
f.Close()
os.Remove(req.URL.Path)
w.WriteHeader(500)
return
}

// update free space
plotPath.updateFreeSpace()
h.sortPaths()

// log successful and some metrics
seconds := time.Since(start).Seconds()
log.Printf("Successfully stored %s (%s, %f secs, %s/sec)",
req.URL.Path, humanize.IBytes(uint64(bytes)), seconds, humanize.Bytes(uint64(float64(bytes)/seconds)))
w.WriteHeader(201)
}

// sortPaths will update the order of the plotPaths inside the harvester's
// sortedPaths slice. This should be done after every file transfer when the
// free space is updated.
func (h *harvester) sortPaths() {
h.sortMutex.Lock()
defer h.sortMutex.Unlock()

slices.SortStableFunc(h.sortedPlots, func(a, b *plotPath) int {
return cmp.Compare(a.freeSpace, b.freeSpace)
})
}

// pickPlot will return which plot path would be most ideal for the current
// request. It will order the one with the most free space that doesn't already
// have an active transfer.
func (h *harvester) pickPlot() *plotPath {
h.sortMutex.Lock()
defer h.sortMutex.Unlock()

for _, v := range h.sortedPlots {
if v.busy.Load() {
continue
}
return v
}
return nil
}

// generateTaint will calculate how long to delay the response based on current
// system pressure. This can be used to organically load balance in a cluster,
// allowing more preferencial hosts to respond faster.
func (h *harvester) generateTaint(plot *plotPath) time.Duration {
d := time.Duration(0)

// apply per current transfer going on. this helps prefer harvesters with
// less busy networks
d += time.Duration(h.transfers.Load()) * taintTransfers

// apply for ratio of free disk space. this prefers harvesters with emptier
// disks
percent := 100 * plot.freeSpace / plot.totalSpace
d += time.Duration(percent) * taintFreeSpace / 1000

return d
}
Loading

0 comments on commit 50a7e79

Please sign in to comment.