Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon: Add an on-disk cache for frequently accessed ledgers. #4457

Merged
merged 7 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 66 additions & 16 deletions exp/lighthorizon/archive/ingest_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,81 @@ package archive

import (
"context"
"fmt"
"net/url"

"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/xdr"
)

type ArchiveConfig struct {
SourceUrl string
NetworkPassphrase string
CacheDir string
CacheSize int
}

func NewIngestArchive(config ArchiveConfig) (Archive, error) {
if config.CacheSize <= 0 {
return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize)
}

parsed, err := url.Parse(config.SourceUrl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this S3 parse/config logic seems a bit more of a concern for the archive to know about, could it be encapsulated in historyarchive.ConnectBackend() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I somewhat agree, but this also follows the way index.ConnectWithConfig() works. I'm hesitant to modify historyarchive package since it's heavily used elsewhere so there may be some bad side-effects to any changes there, even additive ones.

if err != nil {
return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl)
}

region := ""
needsCache := true
switch parsed.Scheme {
case "file":
// We should only avoid a cache if the ledgers are already local.
needsCache = false

case "s3":
// We need to extract the region if it's specified.
region = parsed.Query().Get("region")
}

// Now, set up a simple filesystem-like access to the backend and wrap it in
// a local on-disk LRU cache if we can.
source, err := historyarchive.ConnectBackend(
config.SourceUrl,
historyarchive.ConnectOptions{
Context: context.Background(),
NetworkPassphrase: config.NetworkPassphrase,
S3Region: region,
},
)
if err != nil {
return nil, err
}

if needsCache {
cache, err := historyarchive.MakeFsCacheBackend(source,
config.CacheDir, uint(config.CacheSize))

if err != nil { // warn but continue w/o cache
log.WithField("path", config.CacheDir).
WithError(err).
Warnf("Failed to create cached ledger backend")
} else {
log.WithField("path", config.CacheDir).
Infof("On-disk cache configured")
source = cache
}
}

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
return ingestArchive{ledgerBackend}, nil
}

// This is an implementation of LightHorizon Archive that uses the existing horizon ingestion backend.
type ingestArchive struct {
*ledgerbackend.HistoryArchiveBackend
Expand Down Expand Up @@ -82,20 +148,4 @@ func (adaptation *ingestTransactionReaderAdaption) Read() (LedgerTransaction, er
return tx, nil
}

func NewIngestArchive(sourceUrl string, networkPassphrase string) (Archive, error) {
// Simple file os access
source, err := historyarchive.ConnectBackend(
sourceUrl,
historyarchive.ConnectOptions{
Context: context.Background(),
NetworkPassphrase: networkPassphrase,
},
)
if err != nil {
return nil, err
}
ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
return ingestArchive{ledgerBackend}, nil
}

var _ Archive = (*ingestArchive)(nil) // ensure conformity to the interface
1 change: 1 addition & 0 deletions exp/lighthorizon/archive/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package archive

import (
"context"

"github.com/stellar/go/xdr"
)

Expand Down
21 changes: 17 additions & 4 deletions exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,40 @@ import (
"github.com/stellar/go/support/log"
)

const (
defaultCacheSize = (60 * 60 * 24) / 6 // 1 day of ledgers @ 6s each
)

func main() {
sourceUrl := flag.String("source", "gcs:https://horizon-archive-poc", "history archive url to read txmeta files")
indexesUrl := flag.String("indexes", "file:https://indexes", "url of the indexes")
networkPassphrase := flag.String("network-passphrase", network.TestNetworkPassphrase, "network passphrase")
networkPassphrase := flag.String("network-passphrase", network.PublicNetworkPassphrase, "network passphrase")
cacheDir := flag.String("ledger-cache", "", `path to cache frequently-used ledgers;
if left empty, uses a temporary directory`)
cacheSize := flag.Int("ledger-cache-size", defaultCacheSize,
"number of ledgers to store in the cache")
flag.Parse()

L := log.WithField("service", "horizon-lite")
// L.SetLevel(log.DebugLevel)
L.SetLevel(log.InfoLevel)
L.Info("Starting lighthorizon!")

registry := prometheus.NewRegistry()
indexStore, err := index.ConnectWithConfig(index.StoreConfig{
Url: *indexesUrl,
Metrics: registry,
Log: L.WithField("subservice", "index"),
Metrics: registry,
})
if err != nil {
panic(err)
}

ingestArchive, err := archive.NewIngestArchive(*sourceUrl, *networkPassphrase)
ingestArchive, err := archive.NewIngestArchive(archive.ArchiveConfig{
SourceUrl: *sourceUrl,
NetworkPassphrase: *networkPassphrase,
CacheDir: *cacheDir,
CacheSize: *cacheSize,
})
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/go-querystring v0.0.0-20160401233042-9235644dd9e5 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/hashicorp/golang-lru v0.5.1
github.com/hpcloud/tail v1.0.0 // indirect
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand Down
Loading