Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/services/ledgerexporter: Extend tool to support lower ledger bound. #4405

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 63 additions & 43 deletions exp/services/ledgerexporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"flag"
"io"
"io/ioutil"
"os"
"strconv"
"strings"
Expand All @@ -26,6 +25,7 @@ func main() {
networkPassphrase := flag.String("network-passphrase", network.TestNetworkPassphrase, "network passphrase")
historyArchiveUrls := flag.String("history-archive-urls", "https://history.stellar.org/prd/core-testnet/core_testnet_001", "comma-separated list of history archive urls to read from")
captiveCoreTomlPath := flag.String("captive-core-toml-path", os.Getenv("CAPTIVE_CORE_TOML_PATH"), "path to load captive core toml file from")
startingLedger := flag.Int("start-ledger", 0, "ledger to start export from")
flag.Parse()

logger.SetLevel(supportlog.InfoLevel)
Expand All @@ -39,9 +39,7 @@ func main() {
}

captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromFile(*captiveCoreTomlPath, params)
if err != nil {
logger.WithError(err).Fatal("Invalid captive core toml")
}
logFatalIf(err, "Invalid captive core toml")

captiveConfig := ledgerbackend.CaptiveCoreConfig{
BinaryPath: *stellarCoreBinaryPath,
Expand All @@ -52,9 +50,7 @@ func main() {
Toml: captiveCoreToml,
}
core, err := ledgerbackend.NewCaptive(captiveConfig)
if err != nil {
logger.WithError(err).Fatal("Could not create captive core instance")
}
logFatalIf(err, "Could not create captive core instance")

target, err := historyarchive.ConnectBackend(
*targetUrl,
Expand All @@ -63,27 +59,48 @@ func main() {
NetworkPassphrase: params.NetworkPassphrase,
},
)
if err != nil {
logger.WithError(err).Fatal("Could not connect to target")
}
logFatalIf(err, "Could not connect to target")
defer target.Close()

latestLedger := readLatestLedger(target)

nextLedger := latestLedger
if err := core.PrepareRange(context.Background(), ledgerbackend.UnboundedRange(latestLedger)); err != nil {
logger.WithError(err).Fatalf("could not prepare unbounded range %v", nextLedger)
// Build the appropriate range for the given backend state.
var ledgerRange ledgerbackend.Range
startLedger := uint32(*startingLedger)
if startLedger == 0 {
ledgerRange = ledgerbackend.UnboundedRange(latestLedger)
} else if startLedger > 0 && latestLedger == 2 {
// Special case: if the starting ledger is set but there's no ledger in
// the backend (i.e. it's 2), the starting ledger becomes an unbounded
// the lower bound
ledgerRange = ledgerbackend.UnboundedRange(startLedger)
latestLedger = startLedger
} else {
if startLedger >= latestLedger {
logger.Fatalf("Invalid ledger range: %d >= %d",
startLedger, latestLedger)
}
ledgerRange = ledgerbackend.BoundedRange(startLedger, latestLedger)
}

err = core.PrepareRange(context.Background(), ledgerRange)
logFatalIf(err, "could not prepare unbounded range %v", latestLedger)

logger.Infof("Unpacking ledger range [%d, %d]", *startingLedger, latestLedger)

nextLedger := latestLedger
for {
leddger, err := core.GetLedger(context.Background(), nextLedger)
ledger, err := core.GetLedger(context.Background(), nextLedger)
if err != nil {
logger.WithError(err).Warnf("could not fetch ledger %v, will retry", nextLedger)
logger.WithError(err).Warnf("could not fetch ledger %v, retrying", nextLedger)
time.Sleep(time.Second)
continue
}

if err = writeLedger(target, leddger); err != nil {
if err = writeLedger(target, ledger); err != nil {
logger.WithError(err).Warnf(
"could not write ledger object %v, retrying",
uint64(ledger.LedgerSequence()))
continue
}

Expand All @@ -96,49 +113,52 @@ func main() {

}

// readLatestLedger determines the latest ledger in the given backend (at the
// /latest path), defaulting to Ledger #2 if one doesn't exist
func readLatestLedger(backend historyarchive.ArchiveBackend) uint32 {
r, err := backend.GetFile("latest")
if os.IsNotExist(err) {
return 2
} else if err != nil {
logger.WithError(err).Fatal("could not open latest ledger bucket")
} else {
defer r.Close()
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {
logger.WithError(err).Fatal("could not read latest ledger")
}
if parsed, err := strconv.ParseUint(buf.String(), 10, 32); err != nil {
logger.WithError(err).Fatalf("could not parse latest ledger: %s", buf.String())
} else {
return uint32(parsed)
}
}
return 0

logFatalIf(err, "could not open latest ledger bucket")
defer r.Close()

var buf bytes.Buffer
_, err = io.Copy(&buf, r)
logFatalIf(err, "could not read latest ledger")

parsed, err := strconv.ParseUint(buf.String(), 10, 32)
logFatalIf(err, "could not parse latest ledger: %s", buf.String())
return uint32(parsed)
}

func writeLedger(backend historyarchive.ArchiveBackend, leddger xdr.LedgerCloseMeta) error {
blob, err := leddger.MarshalBinary()
if err != nil {
logger.WithError(err).Fatalf("could not serialize ledger %v", uint64(leddger.LedgerSequence()))
}
err = backend.PutFile(
"ledgers/"+strconv.FormatUint(uint64(leddger.LedgerSequence()), 10),
ioutil.NopCloser(bytes.NewReader(blob)),
// writeLedger stores the given LedgerCloseMeta instance as a raw binary at the
// /ledgers/<seqNum> path. If an error is returned, it may be transient so you
// should attempt to retry.
func writeLedger(backend historyarchive.ArchiveBackend, ledger xdr.LedgerCloseMeta) error {
blob, err := ledger.MarshalBinary()
logFatalIf(err, "could not serialize ledger %v", ledger.LedgerSequence())

return backend.PutFile(
"ledgers/"+strconv.FormatUint(uint64(ledger.LedgerSequence()), 10),
io.NopCloser(bytes.NewReader(blob)),
)
if err != nil {
logger.WithError(err).Warnf("could not write ledger object %v, will retry", uint64(leddger.LedgerSequence()))
}
return err
}

func writeLatestLedger(backend historyarchive.ArchiveBackend, ledger uint32) error {
return backend.PutFile(
"latest",
ioutil.NopCloser(
io.NopCloser(
bytes.NewBufferString(
strconv.FormatUint(uint64(ledger), 10),
),
),
)
}

func logFatalIf(err error, message string, args ...interface{}) {
if err != nil {
logger.WithError(err).Fatalf(message, args...)
}
}