Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/lighthorizon' into 4483_ledger…
Browse files Browse the repository at this point in the history
…exporter_as_service
  • Loading branch information
sreuland committed Aug 2, 2022
2 parents f0321b6 + a12b11f commit 6c95fe8
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 66 deletions.
13 changes: 9 additions & 4 deletions exp/lighthorizon/http.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package main

import (
"net/http"
"strconv"
"time"

"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/stellar/go/exp/lighthorizon/actions"
"github.com/stellar/go/exp/lighthorizon/services"
"net/http"
"strconv"
"time"
supportHttp "github.com/stellar/go/support/http"
)

func newWrapResponseWriter(w http.ResponseWriter, r *http.Request) middleware.WrapResponseWriter {
Expand All @@ -24,6 +27,7 @@ func newWrapResponseWriter(w http.ResponseWriter, r *http.Request) middleware.Wr
func prometheusMiddleware(requestDurationMetric *prometheus.SummaryVec) func(next http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
route := supportHttp.GetChiRoutePattern(r)
mw := newWrapResponseWriter(w, r)

then := time.Now()
Expand All @@ -33,6 +37,7 @@ func prometheusMiddleware(requestDurationMetric *prometheus.SummaryVec) func(nex
requestDurationMetric.With(prometheus.Labels{
"status": strconv.FormatInt(int64(mw.Status()), 10),
"method": r.Method,
"route": route,
}).Observe(float64(duration.Seconds()))
})
}
Expand All @@ -44,7 +49,7 @@ func lightHorizonHTTPHandler(registry *prometheus.Registry, lightHorizon service
Namespace: "horizon_lite", Subsystem: "http", Name: "requests_duration_seconds",
Help: "HTTP requests durations, sliding window = 10m",
},
[]string{"status", "method"},
[]string{"status", "method", "route"},
)
registry.MustRegister(requestDurationMetric)

Expand Down
2 changes: 2 additions & 0 deletions exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"

"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/exp/lighthorizon/archive"
"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/exp/lighthorizon/services"
Expand Down Expand Up @@ -56,6 +57,7 @@ if left empty, uses a temporary directory`)
Archive: ingestArchive,
Passphrase: *networkPassphrase,
IndexStore: indexStore,
Metrics: services.NewMetrics(registry),
}

lightHorizon := services.LightHorizon{
Expand Down
101 changes: 95 additions & 6 deletions exp/lighthorizon/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package services
import (
"context"
"io"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/exp/lighthorizon/archive"
"github.com/stellar/go/exp/lighthorizon/common"
Expand All @@ -23,15 +27,49 @@ var (
checkpointManager = historyarchive.NewCheckpointManager(0)
)

// NewMetrics returns a Metrics instance containing all the prometheus
// metrics necessary for running light horizon services.
func NewMetrics(registry *prometheus.Registry) Metrics {
const minute = 60
const day = 24 * 60 * minute
responseAgeHistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "horizon_lite",
Subsystem: "services",
Name: "response_age",
Buckets: []float64{
5 * minute,
60 * minute,
day,
7 * day,
30 * day,
90 * day,
180 * day,
365 * day,
},
Help: "Age of the response for each service, sliding window = 10m",
},
[]string{"request", "successful"},
)
registry.MustRegister(responseAgeHistogram)
return Metrics{
ResponseAgeHistogram: responseAgeHistogram,
}
}

type LightHorizon struct {
Operations OperationsService
Transactions TransactionsService
}

type Metrics struct {
ResponseAgeHistogram *prometheus.HistogramVec
}

type Config struct {
Archive archive.Archive
IndexStore index.Store
Passphrase string
Metrics Metrics
}

type TransactionsService struct {
Expand All @@ -57,6 +95,27 @@ type TransactionRepository interface {
// processing (e.g. when a limit is reached) and any error that occurred.
type searchCallback func(archive.LedgerTransaction, *xdr.LedgerHeader) (finished bool, err error)

func operationsResponseAgeSeconds(ops []common.Operation) float64 {
if len(ops) == 0 {
return -1
}

oldest := ops[0].LedgerHeader.ScpValue.CloseTime
for i := 1; i < len(ops); i++ {
if closeTime := ops[i].LedgerHeader.ScpValue.CloseTime; closeTime < oldest {
oldest = closeTime
}
}

lastCloseTime := time.Unix(int64(oldest), 0).UTC()
now := time.Now().UTC()
if now.Before(lastCloseTime) {
log.Errorf("current time %v is before oldest operation close time %v", now, lastCloseTime)
return -1
}
return now.Sub(lastCloseTime).Seconds()
}

func (os *OperationsService) GetOperationsByAccount(ctx context.Context,
cursor int64, limit uint64,
accountId string,
Expand Down Expand Up @@ -88,11 +147,36 @@ func (os *OperationsService) GetOperationsByAccount(ctx context.Context,
return false, nil
}

if err := searchTxByAccount(ctx, cursor, accountId, os.Config, opsCallback); err != nil {
return nil, err
err := searchTxByAccount(ctx, cursor, accountId, os.Config, opsCallback)
if age := operationsResponseAgeSeconds(ops); age >= 0 {
os.Config.Metrics.ResponseAgeHistogram.With(prometheus.Labels{
"request": "GetOperationsByAccount",
"successful": strconv.FormatBool(err == nil),
}).Observe(age)
}

return ops, nil
return ops, err
}

func transactionsResponseAgeSeconds(txs []common.Transaction) float64 {
if len(txs) == 0 {
return -1
}

oldest := txs[0].LedgerHeader.ScpValue.CloseTime
for i := 1; i < len(txs); i++ {
if closeTime := txs[i].LedgerHeader.ScpValue.CloseTime; closeTime < oldest {
oldest = closeTime
}
}

lastCloseTime := time.Unix(int64(oldest), 0).UTC()
now := time.Now().UTC()
if now.Before(lastCloseTime) {
log.Errorf("current time %v is before oldest transaction close time %v", now, lastCloseTime)
return -1
}
return now.Sub(lastCloseTime).Seconds()
}

func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context,
Expand All @@ -113,10 +197,15 @@ func (ts *TransactionsService) GetTransactionsByAccount(ctx context.Context,
return uint64(len(txs)) == limit, nil
}

if err := searchTxByAccount(ctx, cursor, accountId, ts.Config, txsCallback); err != nil {
return nil, err
err := searchTxByAccount(ctx, cursor, accountId, ts.Config, txsCallback)
if age := transactionsResponseAgeSeconds(txs); age >= 0 {
ts.Config.Metrics.ResponseAgeHistogram.With(prometheus.Labels{
"request": "GetTransactionsByAccount",
"successful": strconv.FormatBool(err == nil),
}).Observe(age)
}
return txs, nil

return txs, err
}

func searchTxByAccount(ctx context.Context, cursor int64, accountId string, config Config, callback searchCallback) error {
Expand Down
4 changes: 4 additions & 0 deletions exp/lighthorizon/services/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io"
"testing"

"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/exp/lighthorizon/archive"
"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/toid"
Expand Down Expand Up @@ -230,6 +232,7 @@ func newTransactionService(ctx context.Context) TransactionsService {
Archive: archive,
IndexStore: store,
Passphrase: passphrase,
Metrics: NewMetrics(prometheus.NewRegistry()),
},
}
}
Expand All @@ -242,6 +245,7 @@ func newOperationService(ctx context.Context) OperationsService {
Archive: archive,
IndexStore: store,
Passphrase: passphrase,
Metrics: NewMetrics(prometheus.NewRegistry()),
},
}
}
58 changes: 6 additions & 52 deletions services/horizon/internal/httpx/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"context"
"database/sql"
"net/http"
"regexp"
"strconv"
"strings"
"time"

"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/prometheus/client_golang/prometheus"

Expand All @@ -24,6 +22,7 @@ import (
hProblem "github.com/stellar/go/services/horizon/internal/render/problem"
"github.com/stellar/go/support/db"
supportErrors "github.com/stellar/go/support/errors"
supportHttp "github.com/stellar/go/support/http"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/render/problem"
)
Expand Down Expand Up @@ -130,51 +129,8 @@ func getClientData(r *http.Request, headerName string) string {
return value
}

var routeRegexp = regexp.MustCompile("{([^:}]*):[^}]*}")

// https://prometheus.io/docs/instrumenting/exposition_formats/
// label_value can be any sequence of UTF-8 characters, but the backslash (\),
// double-quote ("), and line feed (\n) characters have to be escaped as \\,
// \", and \n, respectively.
func sanitizeMetricRoute(routePattern string) string {
route := routeRegexp.ReplaceAllString(routePattern, "{$1}")
route = strings.ReplaceAll(route, "\\", "\\\\")
route = strings.ReplaceAll(route, "\"", "\\\"")
route = strings.ReplaceAll(route, "\n", "\\n")
if route == "" {
// Can be empty when request did not reach the final route (ex. blocked by
// a middleware). More info: https://github.com/go-chi/chi/issues/270
return "undefined"
}
return route
}

// Author: https://github.com/rliebz
// From: https://github.com/go-chi/chi/issues/270#issuecomment-479184559
// https://github.com/go-chi/chi/blob/master/LICENSE
func getRoutePattern(r *http.Request) string {
rctx := chi.RouteContext(r.Context())
if pattern := rctx.RoutePattern(); pattern != "" {
// Pattern is already available
return pattern
}

routePath := r.URL.Path
if r.URL.RawPath != "" {
routePath = r.URL.RawPath
}

tctx := chi.NewRouteContext()
if !rctx.Routes.Match(tctx, r.Method, routePath) {
return ""
}

// tctx has the updated pattern, since Match mutates it
return tctx.RoutePattern()
}

func logEndOfRequest(ctx context.Context, r *http.Request, requestDurationSummary *prometheus.SummaryVec, duration time.Duration, mw middleware.WrapResponseWriter, streaming bool) {
route := sanitizeMetricRoute(getRoutePattern(r))
route := supportHttp.GetChiRoutePattern(r)

referer := r.Referer()
if referer == "" {
Expand Down Expand Up @@ -237,9 +193,8 @@ func NewHistoryMiddleware(ledgerState *ledger.State, staleThreshold int32, sessi

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
chiRoute := chi.RouteContext(ctx)
if chiRoute != nil {
ctx = context.WithValue(ctx, &db.RouteContextKey, sanitizeMetricRoute(chiRoute.RoutePattern()))
if routePattern := supportHttp.GetChiRoutePattern(r); routePattern != "" {
ctx = context.WithValue(ctx, &db.RouteContextKey, routePattern)
}
if staleThreshold > 0 {
ls := ledgerState.CurrentStatus()
Expand Down Expand Up @@ -309,9 +264,8 @@ func ingestionStatus(ctx context.Context, q *history.Q) (uint32, bool, error) {
func (m *StateMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
chiRoute := chi.RouteContext(ctx)
if chiRoute != nil {
ctx = context.WithValue(ctx, &db.RouteContextKey, sanitizeMetricRoute(chiRoute.RoutePattern()))
if routePattern := supportHttp.GetChiRoutePattern(r); routePattern != "" {
ctx = context.WithValue(ctx, &db.RouteContextKey, routePattern)
}
session := m.HorizonSession.Clone()
q := &history.Q{session}
Expand Down
45 changes: 45 additions & 0 deletions support/http/logging_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
stdhttp "net/http"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -57,6 +58,50 @@ func LoggingMiddlewareWithOptions(options Options) func(stdhttp.Handler) stdhttp
}
}

var routeRegexp = regexp.MustCompile("{([^:}]*):[^}]*}")

// https://prometheus.io/docs/instrumenting/exposition_formats/
// label_value can be any sequence of UTF-8 characters, but the backslash (\),
// double-quote ("), and line feed (\n) characters have to be escaped as \\,
// \", and \n, respectively.
func sanitizeMetricRoute(routePattern string) string {
route := routeRegexp.ReplaceAllString(routePattern, "{$1}")
route = strings.ReplaceAll(route, "\\", "\\\\")
route = strings.ReplaceAll(route, "\"", "\\\"")
route = strings.ReplaceAll(route, "\n", "\\n")
if route == "" {
// Can be empty when request did not reach the final route (ex. blocked by
// a middleware). More info: https://github.com/go-chi/chi/issues/270
return "undefined"
}
return route
}

// GetChiRoutePattern returns the chi route pattern from the given request context.
// Author: https://github.com/rliebz
// From: https://github.com/go-chi/chi/issues/270#issuecomment-479184559
// https://github.com/go-chi/chi/blob/master/LICENSE
func GetChiRoutePattern(r *stdhttp.Request) string {
rctx := chi.RouteContext(r.Context())
if pattern := rctx.RoutePattern(); pattern != "" {
// Pattern is already available
return pattern
}

routePath := r.URL.Path
if r.URL.RawPath != "" {
routePath = r.URL.RawPath
}

tctx := chi.NewRouteContext()
if !rctx.Routes.Match(tctx, r.Method, routePath) {
return ""
}

// tctx has the updated pattern, since Match mutates it
return sanitizeMetricRoute(tctx.RoutePattern())
}

// logStartOfRequest emits the logline that reports that an http request is
// beginning processing.
func logStartOfRequest(
Expand Down
Loading

0 comments on commit 6c95fe8

Please sign in to comment.