Skip to content

Commit

Permalink
ingest: Add support for tx meta v3 (#4606)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekn authored Sep 26, 2022
1 parent 0187d17 commit e1e1e4c
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 45 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/horizon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
go: [1.18.6, 1.19.1]
pg: [9.6.5]
ingestion-backend: [db, captive-core, captive-core-remote-storage]
protocol-version: [18, 19]
protocol-version: [18, 19, 20]
runs-on: ${{ matrix.os }}
services:
postgres:
Expand All @@ -34,6 +34,8 @@ jobs:
env:
HORIZON_INTEGRATION_TESTS_ENABLED: true
HORIZON_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL: ${{ matrix.protocol-version }}
PROTOCOL_20_CORE_DEBIAN_PKG_VERSION: 19.3.1-1069.056673a51.focal~soroban2
PROTOCOL_20_CORE_DOCKER_IMG: 2opremio/stellar-core:19.3.1-1069.056673a51.focal-soroban2
PROTOCOL_19_CORE_DEBIAN_PKG_VERSION: 19.3.0-1006.9ce6dc4e9.focal
PROTOCOL_19_CORE_DOCKER_IMG: stellar/stellar-core:19.3.0-1006.9ce6dc4e9.focal
PROTOCOL_18_CORE_DEBIAN_PKG_VERSION: 19.3.0-1006.9ce6dc4e9.focal
Expand Down Expand Up @@ -101,7 +103,7 @@ jobs:
uses: actions/cache@v3
with:
path: ./empty
key: horizon-hash-${{ hashFiles('./horizon') }}-${{ hashFiles('./clients/horizonclient/**') }}-${{ hashFiles('./protocols/horizon/**') }}-${{ hashFiles('./txnbuild/**') }}-${{ hashFiles('./services/horizon/internal/integration/**') }}-${{ env.PROTOCOL_19_CORE_DOCKER_IMG }}-${{ env.PROTOCOL_18_CORE_DOCKER_IMG }}
key: horizon-hash-${{ hashFiles('./horizon') }}-${{ hashFiles('./clients/horizonclient/**') }}-${{ hashFiles('./protocols/horizon/**') }}-${{ hashFiles('./txnbuild/**') }}-${{ hashFiles('./services/horizon/internal/integration/**') }}-${{ env.PROTOCOL_20_CORE_DOCKER_IMG }}-${{ env.PROTOCOL_19_CORE_DOCKER_IMG }}-${{ env.PROTOCOL_18_CORE_DOCKER_IMG }}-${{ matrix.protocol-version }}

- if: ${{ steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }}
run: go test -race -timeout 25m -v ./services/horizon/internal/integration/...
Expand Down
4 changes: 2 additions & 2 deletions ingest/ledger_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ func (r *LedgerChangeReader) Read() (Change, error) {
return r.Read()
case upgradeChangesState:
// Get upgrade changes
if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.V0.UpgradesProcessing) {
if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()) {
changes := GetChangesFromLedgerEntryChanges(
r.LedgerTransactionReader.ledgerCloseMeta.V0.UpgradesProcessing[r.upgradeIndex].Changes,
r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()[r.upgradeIndex].Changes,
)
r.pending = append(r.pending, changes...)
r.upgradeIndex++
Expand Down
61 changes: 38 additions & 23 deletions ingest/ledger_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,28 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) {
)
changes = append(changes, opChanges...)
}
case 2, 3:
var (
beforeChanges, afterChanges xdr.LedgerEntryChanges
operationMeta []xdr.OperationMeta
)

switch t.UnsafeMeta.V {
case 2:
v2Meta := t.UnsafeMeta.MustV2()
beforeChanges = v2Meta.TxChangesBefore
afterChanges = v2Meta.TxChangesAfter
operationMeta = v2Meta.Operations
case 3:
v3Meta := t.UnsafeMeta.MustV3()
beforeChanges = v3Meta.TxChangesBefore
afterChanges = v3Meta.TxChangesAfter
operationMeta = v3Meta.Operations
default:
panic("Invalid meta version, expected 2 or 3")
}

case 2:
v2Meta := t.UnsafeMeta.MustV2()
txChangesBefore := GetChangesFromLedgerEntryChanges(v2Meta.TxChangesBefore)
txChangesBefore := GetChangesFromLedgerEntryChanges(beforeChanges)
changes = append(changes, txChangesBefore...)

// Ignore operations meta and txChangesAfter if txInternalError
Expand All @@ -67,14 +85,14 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) {
return changes, nil
}

for _, operationMeta := range v2Meta.Operations {
for _, operationMeta := range operationMeta {
opChanges := GetChangesFromLedgerEntryChanges(
operationMeta.Changes,
)
changes = append(changes, opChanges...)
}

txChangesAfter := GetChangesFromLedgerEntryChanges(v2Meta.TxChangesAfter)
txChangesAfter := GetChangesFromLedgerEntryChanges(afterChanges)
changes = append(changes, txChangesAfter...)
default:
return changes, errors.New("Unsupported TransactionMeta version")
Expand All @@ -97,31 +115,28 @@ func (t *LedgerTransaction) GetOperation(index uint32) (xdr.Operation, bool) {
func (t *LedgerTransaction) GetOperationChanges(operationIndex uint32) ([]Change, error) {
changes := []Change{}

// Transaction meta
switch t.UnsafeMeta.V {
case 0:
if t.UnsafeMeta.V == 0 {
return changes, errors.New("TransactionMeta.V=0 not supported")
case 1:
// Ignore operations meta if txInternalError https://github.com/stellar/go/issues/2111
if t.txInternalError() {
return changes, nil
}
}

v1Meta := t.UnsafeMeta.MustV1()
changes = operationChanges(v1Meta.Operations, operationIndex)
case 2:
// Ignore operations meta if txInternalError https://github.com/stellar/go/issues/2111
if t.txInternalError() {
return changes, nil
}
// Ignore operations meta if txInternalError https://github.com/stellar/go/issues/2111
if t.txInternalError() {
return changes, nil
}

v2Meta := t.UnsafeMeta.MustV2()
changes = operationChanges(v2Meta.Operations, operationIndex)
var operationMeta []xdr.OperationMeta
switch t.UnsafeMeta.V {
case 1:
operationMeta = t.UnsafeMeta.MustV1().Operations
case 2:
operationMeta = t.UnsafeMeta.MustV2().Operations
case 3:
operationMeta = t.UnsafeMeta.MustV3().Operations
default:
return changes, errors.New("Unsupported TransactionMeta version")
}

return changes, nil
return operationChanges(operationMeta, operationIndex), nil
}

func operationChanges(ops []xdr.OperationMeta, index uint32) []Change {
Expand Down
22 changes: 11 additions & 11 deletions ingest/ledger_transaction_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (reader *LedgerTransactionReader) GetSequence() uint32 {

// GetHeader returns the XDR Header data associated with the stored ledger.
func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry {
return reader.ledgerCloseMeta.V0.LedgerHeader
return reader.ledgerCloseMeta.LedgerHeaderHistoryEntry()
}

// Read returns the next transaction in the ledger, ordered by tx number, each time
Expand All @@ -69,26 +69,26 @@ func (reader *LedgerTransactionReader) Rewind() {
// a per-transaction view of the data when Read() is called.
func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta, networkPassphrase string) error {
byHash := map[xdr.Hash]xdr.TransactionEnvelope{}
for i, tx := range lcm.V0.TxSet.Txs {
for i, tx := range lcm.TransactionEnvelopes() {
hash, err := network.HashTransactionInEnvelope(tx, networkPassphrase)
if err != nil {
return errors.Wrapf(err, "could not hash transaction %d in TxSet", i)
}
byHash[hash] = tx
}

for i := range lcm.V0.TxProcessing {
result := lcm.V0.TxProcessing[i].Result
envelope, ok := byHash[result.TransactionHash]
for i := 0; i < lcm.CountTransactions(); i++ {
hash := lcm.TransactionHash(i)
envelope, ok := byHash[hash]
if !ok {
hexHash := hex.EncodeToString(result.TransactionHash[:])
hexHash := hex.EncodeToString(hash[:])
return errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash)
}

// We check the version only if FeeProcessing are non empty because some backends
// (like HistoryArchiveBackend) do not return meta.
if lcm.V0.LedgerHeader.Header.LedgerVersion < 10 && lcm.V0.TxProcessing[i].TxApplyProcessing.V != 2 &&
len(lcm.V0.TxProcessing[i].FeeProcessing) > 0 {
if lcm.ProtocolVersion() < 10 && lcm.TxApplyProcessing(i).V < 2 &&
len(lcm.FeeProcessing(i)) > 0 {
return errors.New(
"TransactionMeta.V=2 is required in protocol version older than version 10. " +
"Please process ledgers again using the latest stellar-core version.",
Expand All @@ -98,9 +98,9 @@ func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta
reader.transactions = append(reader.transactions, LedgerTransaction{
Index: uint32(i + 1), // Transactions start at '1'
Envelope: envelope,
Result: result,
UnsafeMeta: lcm.V0.TxProcessing[i].TxApplyProcessing,
FeeChanges: lcm.V0.TxProcessing[i].FeeProcessing,
Result: lcm.TransactionResultPair(i),
UnsafeMeta: lcm.TxApplyProcessing(i),
FeeChanges: lcm.FeeProcessing(i),
})
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
const (
// MaxSupportedProtocolVersion defines the maximum supported version of
// the Stellar protocol.
MaxSupportedProtocolVersion uint32 = 19
MaxSupportedProtocolVersion uint32 = 20

// CurrentVersion reflects the latest version of the ingestion
// algorithm. This value is stored in KV store and is used to decide
Expand Down
137 changes: 132 additions & 5 deletions xdr/ledger_close_meta.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,148 @@
package xdr

import "fmt"

func (l LedgerCloseMeta) LedgerHeaderHistoryEntry() LedgerHeaderHistoryEntry {
switch l.V {
case 0:
return l.MustV0().LedgerHeader
case 1:
return l.MustV1().LedgerHeader
case 2:
return l.MustV2().LedgerHeader
default:
panic(fmt.Sprintf("Unsupported LedgerCloseMeta.V: %d", l.V))
}
}

func (l LedgerCloseMeta) LedgerSequence() uint32 {
return uint32(l.MustV0().LedgerHeader.Header.LedgerSeq)
return uint32(l.LedgerHeaderHistoryEntry().Header.LedgerSeq)
}

func (l LedgerCloseMeta) LedgerHash() Hash {
return l.MustV0().LedgerHeader.Hash
return l.LedgerHeaderHistoryEntry().Hash
}

func (l LedgerCloseMeta) PreviousLedgerHash() Hash {
return l.MustV0().LedgerHeader.Header.PreviousLedgerHash
return l.LedgerHeaderHistoryEntry().Header.PreviousLedgerHash
}

func (l LedgerCloseMeta) ProtocolVersion() uint32 {
return uint32(l.MustV0().LedgerHeader.Header.LedgerVersion)
return uint32(l.LedgerHeaderHistoryEntry().Header.LedgerVersion)
}

func (l LedgerCloseMeta) BucketListHash() Hash {
return l.MustV0().LedgerHeader.Header.BucketListHash
return l.LedgerHeaderHistoryEntry().Header.BucketListHash
}

func (l LedgerCloseMeta) CountTransactions() int {
switch l.V {
case 0:
return len(l.MustV0().TxProcessing)
case 1:
return len(l.MustV1().TxProcessing)
case 2:
return len(l.MustV2().TxProcessing)
default:
panic(fmt.Sprintf("Unsupported LedgerCloseMeta.V: %d", l.V))
}
}

func (l LedgerCloseMeta) TransactionEnvelopes() []TransactionEnvelope {
switch l.V {
case 0:
return l.MustV0().TxSet.Txs
case 1, 2:
var envelopes = make([]TransactionEnvelope, 0, l.CountTransactions())
var phases []TransactionPhase
if l.V == 1 {
phases = l.MustV1().TxSet.V1TxSet.Phases
} else {
phases = l.MustV2().TxSet.V1TxSet.Phases
}
for _, phase := range phases {
for _, component := range *phase.V0Components {
envelopes = append(envelopes, component.TxsMaybeDiscountedFee.Txs...)
}
}
return envelopes
default:
panic(fmt.Sprintf("Unsupported LedgerCloseMeta.V: %d", l.V))
}
}

// TransactionHash returns Hash for tx at index i in processing order..
func (l LedgerCloseMeta) TransactionHash(i int) Hash {
switch l.V {
case 0:
return l.MustV0().TxProcessing[i].Result.TransactionHash
case 1:
return l.MustV1().TxProcessing[i].Result.TransactionHash
case 2:
return l.MustV2().TxProcessing[i].Result.TransactionHash
default:
panic(fmt.Sprintf("Unsupported LedgerCloseMeta.V: %d", l.V))
}
}

// TransactionResultPair returns TransactionResultPair for tx at index i in processing order.
func (l LedgerCloseMeta) TransactionResultPair(i int) TransactionResultPair {
switch l.V {
case 0:
return l.MustV0().TxProcessing[i].Result
case 1:
return l.MustV1().TxProcessing[i].Result
case 2:
if l.MustV2().TxProcessing[i].TxApplyProcessing.V != 3 {
panic("TransactionResult unavailable because LedgerCloseMeta.V = 2 and TransactionMeta.V != 3")
}
return TransactionResultPair{
TransactionHash: l.TransactionHash(i),
Result: l.MustV2().TxProcessing[i].TxApplyProcessing.MustV3().TxResult,
}
default:
panic(fmt.Sprintf("Unsupported LedgerCloseMeta.V: %d", l.V))
}
}

// FeeProcessing returns FeeProcessing for tx at index i in processing order.
func (l LedgerCloseMeta) FeeProcessing(i int) LedgerEntryChanges {
switch l.V {
case 0:
return l.MustV0().TxProcessing[i].FeeProcessing
case 1:
return l.MustV1().TxProcessing[i].FeeProcessing
case 2:
return l.MustV2().TxProcessing[i].FeeProcessing
default:
panic(fmt.Sprintf("Unsupported LedgerCloseMeta.V: %d", l.V))
}
}

// TxApplyProcessing returns TxApplyProcessing for tx at index i in processing order.
func (l LedgerCloseMeta) TxApplyProcessing(i int) TransactionMeta {
switch l.V {
case 0:
return l.MustV0().TxProcessing[i].TxApplyProcessing
case 1:
return l.MustV1().TxProcessing[i].TxApplyProcessing
case 2:
return l.MustV2().TxProcessing[i].TxApplyProcessing
default:
panic(fmt.Sprintf("Unsupported LedgerCloseMeta.V: %d", l.V))
}
}

// UpgradesProcessing returns UpgradesProcessing for ledger.
func (l LedgerCloseMeta) UpgradesProcessing() []UpgradeEntryMeta {
switch l.V {
case 0:
return l.MustV0().UpgradesProcessing
case 1:
return l.MustV1().UpgradesProcessing
case 2:
return l.MustV2().UpgradesProcessing
default:
panic(fmt.Sprintf("Unsupported LedgerCloseMeta.V: %d", l.V))
}
}
2 changes: 2 additions & 0 deletions xdr/transaction_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ func (transactionMeta *TransactionMeta) OperationsMeta() []OperationMeta {
return transactionMeta.MustV1().Operations
case 2:
return transactionMeta.MustV2().Operations
case 3:
return transactionMeta.MustV3().Operations
default:
panic("Unsupported TransactionMeta version")
}
Expand Down
8 changes: 7 additions & 1 deletion xdr/transaction_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ func (r TransactionResultPair) OperationResults() ([]OperationResult, bool) {
// InnerHash returns the hash of the inner transaction.
// This function can only be called on fee bump transactions.
func (r TransactionResultPair) InnerHash() Hash {
return r.Result.Result.MustInnerResultPair().TransactionHash
return r.Result.InnerHash()
}

// InnerHash returns the hash of the inner transaction.
// This function can only be called on fee bump transactions.
func (r TransactionResult) InnerHash() Hash {
return r.Result.MustInnerResultPair().TransactionHash
}

// ExtractBalanceID will parse the operation result at `opIndex` within the
Expand Down

0 comments on commit e1e1e4c

Please sign in to comment.