Skip to content

Commit

Permalink
ingest/ledgerbackend: Differentiate between isPrepared and isClosed i…
Browse files Browse the repository at this point in the history
…n captive core (#4088)

* Differentiate between isPrepared and isClosed in captive core.
  • Loading branch information
erika-sdf committed Dec 8, 2021
1 parent 6846c6d commit 6d63189
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 21 deletions.
2 changes: 1 addition & 1 deletion ingest/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file. This projec

## Unreleased

* Let filewatcher use binary hash instead of timestap to detect core version update. [4050](https://github.com/stellar/go/pull/4050)
* Let filewatcher use binary hash instead of timestamp to detect core version update [4050](https://github.com/stellar/go/pull/4050)

### New Features
* **Performance improvement**: the Captive Core backend now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.0](#v2.0.0)) rather than generating a one-time temporary sub-directory ([#3670](https://github.com/stellar/go/pull/3670)). Note that taking advantage of this feature requires [Stellar-Core v17.1.0](https://github.com/stellar/stellar-core/releases/tag/v17.1.0) or later.
Expand Down
40 changes: 34 additions & 6 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type CaptiveStellarCore struct {
cachedMeta *xdr.LedgerCloseMeta

prepared *Range // non-nil if any range is prepared
closed bool // False until the core is closed
nextLedger uint32 // next ledger expected, error w/ restart if not seen
lastLedger *uint32 // end of current segment if offline, nil if online
previousLedgerHash *string
Expand Down Expand Up @@ -365,7 +366,7 @@ func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRang
}

// PrepareRange prepares the given range (including from and to) to be loaded.
// Captive stellar-core backend needs to initalize Stellar-Core state to be
// Captive stellar-core backend needs to initialize Stellar-Core state to be
// able to stream ledgers.
// Stellar-Core mode depends on the provided ledgerRange:
// * For BoundedRange it will start Stellar-Core in catchup mode.
Expand Down Expand Up @@ -401,6 +402,12 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
return false
}

if c.stellarCoreRunner == nil {
return false
}
if c.closed {
return false
}
lastLedger := uint32(0)
if c.lastLedger != nil {
lastLedger = *c.lastLedger
Expand Down Expand Up @@ -458,7 +465,18 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd
}

if c.isClosed() {
return xdr.LedgerCloseMeta{}, errors.New("session is closed, call PrepareRange first")
return xdr.LedgerCloseMeta{}, errors.New("stellar-core is no longer usable")
}

if c.prepared == nil {
return xdr.LedgerCloseMeta{}, errors.New("session is not prepared, call PrepareRange first")
}

if c.stellarCoreRunner == nil {
return xdr.LedgerCloseMeta{}, errors.New("stellar-core cannot be nil, call PrepareRange first")
}
if c.closed {
return xdr.LedgerCloseMeta{}, errors.New("stellar-core has an error, call PrepareRange first")
}

if sequence < c.nextExpectedSequence() {
Expand Down Expand Up @@ -590,27 +608,38 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
defer c.stellarCoreLock.RUnlock()

if c.isClosed() {
return 0, errors.New("stellar-core must be opened to return latest available sequence")
return 0, errors.New("stellar-core is no longer usable")
}
if c.prepared == nil {
return 0, errors.New("stellar-core must be prepared, call PrepareRange first")
}
if c.stellarCoreRunner == nil {
return 0, errors.New("stellar-core cannot be nil, call PrepareRange first")
}
if c.closed {
return 0, errors.New("stellar-core is closed, call PrepareRange first")

}
if c.lastLedger == nil {
return c.nextExpectedSequence() - 1 + uint32(len(c.stellarCoreRunner.getMetaPipe())), nil
}
return *c.lastLedger, nil
}

func (c *CaptiveStellarCore) isClosed() bool {
return c.prepared == nil || c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil
return c.closed
}

// Close closes existing Stellar-Core process, streaming sessions and removes all
// temporary files. Note, once a CaptiveStellarCore instance is closed it can can no longer be used and
// temporary files. Note, once a CaptiveStellarCore instance is closed it can no longer be used and
// all subsequent calls to PrepareRange(), GetLedger(), etc will fail.
// Close is thread-safe and can be called from another go routine.
func (c *CaptiveStellarCore) Close() error {
c.stellarCoreLock.RLock()
defer c.stellarCoreLock.RUnlock()

c.closed = true

// after the CaptiveStellarCore context is canceled all subsequent calls to PrepareRange() will fail
c.cancel()

Expand All @@ -623,6 +652,5 @@ func (c *CaptiveStellarCore) Close() error {
if c.stellarCoreRunner != nil {
return c.stellarCoreRunner.close()
}

return nil
}
36 changes: 22 additions & 14 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,9 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) {

// Simulates a long (but graceful) shutdown...
cancel()
mockRunner.On("close").Return(nil)
mockRunner.On("getProcessExitError").Return(false, nil).Once()

err = captiveBackend.PrepareRange(ctx, BoundedRange(100, 200))
assert.EqualError(t, err, "error starting prepare range: the previous Stellar-Core instance is still running")
assert.NoError(t, err)

mockRunner.AssertExpectations(t)
mockArchive.AssertExpectations(t)
Expand Down Expand Up @@ -521,7 +519,7 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
func TestGetLatestLedgerSequence(t *testing.T) {
metaChan := make(chan metaResult, 300)

// Core will actually start with the last checkpoint before the from ledger
// Core will actually start with the last checkpoint before the `from` ledger
// and then rewind to the `from` ledger.
for i := 2; i <= 200; i++ {
meta := buildLedgerCloseMeta(testLedgerHeader{sequence: uint32(i)})
Expand Down Expand Up @@ -601,11 +599,17 @@ func TestCaptiveGetLedger(t *testing.T) {

// requires PrepareRange
_, err := captiveBackend.GetLedger(ctx, 64)
tt.EqualError(err, "session is closed, call PrepareRange first")
tt.EqualError(err, "session is not prepared, call PrepareRange first")

err = captiveBackend.PrepareRange(ctx, BoundedRange(65, 66))
ledgerRange := BoundedRange(65, 66)
tt.False(captiveBackend.isPrepared(ledgerRange), "core is not prepared until explicitly prepared")
tt.False(captiveBackend.isClosed())
err = captiveBackend.PrepareRange(ctx, ledgerRange)
assert.NoError(t, err)

tt.True(captiveBackend.isPrepared(ledgerRange))
tt.False(captiveBackend.isClosed())

_, err = captiveBackend.GetLedger(ctx, 64)
tt.Error(err, "requested ledger 64 is behind the captive core stream (expected=66)")

Expand All @@ -629,13 +633,14 @@ func TestCaptiveGetLedger(t *testing.T) {
_, err = captiveBackend.GetLedger(ctx, 66)
tt.NoError(err)

// closes after last ledger is consumed
tt.True(captiveBackend.isClosed())

// we should be able to call last ledger even after get ledger is closed
tt.False(captiveBackend.isPrepared(ledgerRange))
tt.False(captiveBackend.isClosed())
_, err = captiveBackend.GetLedger(ctx, 66)
tt.NoError(err)

// core is not closed unless it's explicitly closed
tt.False(captiveBackend.isClosed())

mockArchive.AssertExpectations(t)
mockRunner.AssertExpectations(t)
}
Expand Down Expand Up @@ -898,12 +903,14 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) {
tt.NoError(err)
tt.Equal(xdr.Uint32(65), meta.V0.LedgerHeader.Header.LedgerSeq)

tt.False(captiveBackend.isClosed())

// try reading from an empty buffer
_, err = captiveBackend.GetLedger(ctx, 66)
tt.EqualError(err, "unmarshalling error")

// closes if there is an error getting ledger
tt.True(captiveBackend.isClosed())
// not closed even if there is an error getting ledger
tt.False(captiveBackend.isClosed())

mockArchive.AssertExpectations(t)
mockRunner.AssertExpectations(t)
Expand Down Expand Up @@ -990,17 +997,18 @@ func TestCaptiveAfterClose(t *testing.T) {
assert.NoError(t, err)

assert.NoError(t, captiveBackend.Close())
assert.True(t, captiveBackend.isClosed())

_, err = captiveBackend.GetLedger(ctx, boundedRange.to)
assert.EqualError(t, err, "session is closed, call PrepareRange first")
assert.EqualError(t, err, "stellar-core is no longer usable")

var prepared bool
prepared, err = captiveBackend.IsPrepared(ctx, boundedRange)
assert.False(t, prepared)
assert.NoError(t, err)

_, err = captiveBackend.GetLatestLedgerSequence(ctx)
assert.EqualError(t, err, "stellar-core must be opened to return latest available sequence")
assert.EqualError(t, err, "stellar-core is no longer usable")

mockArchive.AssertExpectations(t)
mockRunner.AssertExpectations(t)
Expand Down

0 comments on commit 6d63189

Please sign in to comment.