Flexible mechanism to make execution flow interruptible.
The breaker carries a cancellation signal to interrupt an action execution.
var NewYear = time.Time{}.AddDate(time.Now().Year(), 0, 0)
interrupter := breaker.Multiplex(
breaker.BreakByContext(context.WithTimeout(req.Context(), time.Minute)),
breaker.BreakByDeadline(NewYear),
breaker.BreakBySignal(os.Interrupt),
)
defer interrupter.Close()
<-interrupter.Done() // wait context cancellation, timeout or interrupt signal
A full description of the idea is available here.
I have to make modules github.com/kamilsk/retry/v5:
if err := retry.Retry(breaker.BreakByTimeout(time.Minute), action); err != nil {
log.Fatal(err)
}
and github.com/kamilsk/semaphore/v5:
if err := semaphore.Acquire(breaker.BreakByTimeout(time.Minute), 5); err != nil {
log.Fatal(err)
}
more consistent and reliable.
Additionally, I want to implement a Graceful Shutdown on the same mechanism.
interrupter := breaker.Multiplex(
breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()
ctx := breaker.ToContext(interrupter)
ctx = context.WithValue(ctx, header, "...")
req, err := http.NewRequestWithContext(ctx, http.MethodGet, server.URL, nil)
if err != nil {
panic(err)
}
var resp *http.Response
action := func(ctx context.Context) (err error) {
req = req.Clone(ctx)
source := ctx.Value(header).(string)
req.Header.Set(header, source)
resp, err = http.DefaultClient.Do(req)
return err
}
if err := retry.Do(ctx, action); err != nil {
panic(err)
}
Full example
package main
import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"syscall"
"time"
"github.com/kamilsk/breaker"
"github.com/kamilsk/retry/v5"
)
func main() {
const (
header = "X-Message"
timeout = time.Minute
)
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
time.Sleep(timeout / 10)
_, _ = rw.Write([]byte(req.Header.Get(header)))
}))
defer server.Close()
interrupter := breaker.Multiplex(
breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()
ctx := breaker.ToContext(interrupter)
ctx = context.WithValue(ctx, header, "flexible mechanism to make execution flow interruptible")
req, err := http.NewRequestWithContext(ctx, http.MethodGet, server.URL, nil)
if err != nil {
panic(err)
}
var resp *http.Response
action := func(ctx context.Context) (err error) {
req = req.Clone(ctx)
source := ctx.Value(header).(string)
req.Header.Set(header, source)
resp, err = http.DefaultClient.Do(req)
return err
}
if err := retry.Do(ctx, action); err != nil {
fmt.Println("error:", err)
return
}
_, _ = io.Copy(os.Stdout, resp.Body)
}
interrupter := breaker.Multiplex(
breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()
server := http.Server{
BaseContext: func(net.Listener) context.Context {
return breaker.ToContext(interrupter)
},
}
go func() {
if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}()
<-interrupter.Done()
if errors.Is(interrupter.Err(), breaker.Interrupted) {
if err := server.Shutdown(context.TODO()); err != nil {
panic(err)
}
}
Full example
package main
import (
"context"
"errors"
"fmt"
"log"
"net"
"net/http"
"os"
"syscall"
"time"
"github.com/kamilsk/breaker"
)
func main() {
const timeout = time.Minute
interrupter := breaker.Multiplex(
breaker.BreakBySignal(os.Interrupt, syscall.SIGINT, syscall.SIGTERM),
breaker.BreakByTimeout(timeout),
)
defer interrupter.Close()
server := http.Server{
Addr: ":8080",
Handler: http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}),
BaseContext: func(net.Listener) context.Context {
return breaker.ToContext(interrupter)
},
}
go func() {
if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatal(err)
}
}()
<-interrupter.Done()
if err := interrupter.Err(); errors.Is(err, breaker.Interrupted) {
if err := server.Shutdown(context.TODO()); err != nil {
panic(err)
}
}
fmt.Println("graceful shutdown")
}
The library uses SemVer for versioning, and it is not BC-safe through major releases. You can use go modules to manage its version.
$ go get github.com/kamilsk/breaker@latest
The example shows how to execute console commands for ten minutes.
$ date
# Thu Jan 7 21:02:21
$ breakit after 10m -- server run --port=8080
$ breakit ps
# +--------------------------+----------------------------+----------+----------+
# | Process | Status | Since | Until |
# +--------------------------+----------------------------+----------+----------+
# | server run --port=8080 | exit 1; panic: database... | 21:02:36 | - |
# +--------------------------+----------------------------+----------+----------+
# | | | Total | 1 |
# +--------------------------+----------------------------+----------+----------+
$ breakit after 10m -- database run --port=5432
$ breakit after 10m delay 5s -- server run --port=8080
$ breakit ps
# +--------------------------+----------------------------+----------+----------+
# | Process | Status | Since | Until |
# +--------------------------+----------------------------+----------+----------+
# | database run --port=5432 | running | 21:04:09 | 21:14:09 |
# | server run --port=8080 | delayed | 21:04:30 | 21:14:25 |
# +--------------------------+----------------------------+----------+----------+
# | | | Total | 2 |
# +--------------------------+----------------------------+----------+----------+
See more details here.
made with β€οΈ for everyone