Skip to content

Commit

Permalink
Add function in blockstore to export TxIDs
Browse files Browse the repository at this point in the history
FAB-17837

Signed-off-by: manish <[email protected]>
  • Loading branch information
manish-sethi authored and denyeart committed May 7, 2020
1 parent b049276 commit 5b4f9e6
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 4 deletions.
65 changes: 64 additions & 1 deletion common/ledger/blkstorage/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ package blkstorage

import (
"bytes"
"encoding/binary"
"fmt"
"io"
"unicode/utf8"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
Expand All @@ -25,6 +28,7 @@ const (
txIDIdxKeyPrefix = 't'
blockNumTranNumIdxKeyPrefix = 'a'
indexCheckpointKeyStr = "indexCheckpointKey"
exportFileFormatVersion = byte(1)
)

var indexCheckpointKey = []byte(indexCheckpointKeyStr)
Expand Down Expand Up @@ -248,6 +252,44 @@ func (index *blockIndex) getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint
return txFLP, nil
}

func (index *blockIndex) exportUniqueTxIDs(writer io.Writer) error {
if !index.isAttributeIndexed(IndexableAttrTxID) {
return ErrAttrNotIndexed
}
writer.Write([]byte{exportFileFormatVersion})
dbItr := index.db.GetIterator([]byte{txIDIdxKeyPrefix}, []byte{txIDIdxKeyPrefix + 1})
defer dbItr.Release()
if err := dbItr.Error(); err != nil {
return errors.Wrap(err, "internal leveldb error while obtaining db iterator")
}

var previousTxID string
reusableBuf := make([]byte, binary.MaxVarintLen64)
for dbItr.Next() {
if err := dbItr.Error(); err != nil {
return errors.Wrap(err, "internal leveldb error while iterating for txids")
}
txID, err := retrieveTxID(dbItr.Key())
if err != nil {
return err
}
// duplicate TxID may be present in the index
if previousTxID == txID {
continue
}
previousTxID = txID

n := binary.PutUvarint(reusableBuf, uint64(len(txID)))
if _, err := writer.Write(reusableBuf[:n]); err != nil {
return err
}
if _, err := writer.Write([]byte(txID)); err != nil {
return err
}
}
return nil
}

func constructBlockNumKey(blockNum uint64) []byte {
blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
return append([]byte{blockNumIdxKeyPrefix}, blkNumBytes...)
Expand All @@ -267,6 +309,28 @@ func constructTxIDKey(txID string, blkNum, txNum uint64) []byte {
return append(k, util.EncodeOrderPreservingVarUint64(txNum)...)
}

// retrieveTxID takes input an encoded txid key of the format `prefix:len(TxID):TxID:BlkNum:TxNum`
// and returns the TxID from this
func retrieveTxID(encodedTxIDKey []byte) (string, error) {
if len(encodedTxIDKey) == 0 {
return "", errors.New("invalid txIDKey - zero-length slice")
}
if encodedTxIDKey[0] != txIDIdxKeyPrefix {
return "", errors.Errorf("invalid txIDKey {%x} - unexpected prefix", encodedTxIDKey)
}
remainingBytes := encodedTxIDKey[utf8.RuneLen(txIDIdxKeyPrefix):]

txIDLen, n, err := util.DecodeOrderPreservingVarUint64(remainingBytes)
if err != nil {
return "", errors.WithMessagef(err, "invalid txIDKey {%x}", encodedTxIDKey)
}
remainingBytes = remainingBytes[n:]
if len(remainingBytes) <= int(txIDLen) {
return "", errors.Errorf("invalid txIDKey {%x}, fewer bytes present", encodedTxIDKey)
}
return string(remainingBytes[:int(txIDLen)]), nil
}

type rangeScan struct {
startKey []byte
stopKey []byte
Expand Down Expand Up @@ -366,7 +430,6 @@ func (flp *fileLocPointer) String() string {
}

func (blockIdxInfo *blockIdxInfo) String() string {

var buffer bytes.Buffer
for _, txOffset := range blockIdxInfo.txOffsets {
buffer.WriteString("txId=")
Expand Down
158 changes: 156 additions & 2 deletions common/ledger/blkstorage/blockindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@ SPDX-License-Identifier: Apache-2.0
package blkstorage

import (
"bytes"
"encoding/binary"
"fmt"
"testing"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/common/ledger/util"
commonledgerutil "github.com/hyperledger/fabric/common/ledger/util"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/internal/pkg/txflags"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -194,7 +198,7 @@ func containsAttr(indexItems []IndexableAttr, attr IndexableAttr) bool {
return false
}

func TestTxIDKeyEncoding(t *testing.T) {
func TestTxIDKeyEncodingDecoding(t *testing.T) {
testcases := []struct {
txid string
blkNum uint64
Expand All @@ -206,16 +210,158 @@ func TestTxIDKeyEncoding(t *testing.T) {
{"txid1", 100, 100},
}
for i, testcase := range testcases {
encodedTxIDKey := constructTxIDKey(testcase.txid, testcase.blkNum, testcase.txNum)
t.Run(fmt.Sprintf(" %d", i),
func(t *testing.T) {
txID, err := retrieveTxID(encodedTxIDKey)
require.NoError(t, err)
require.Equal(t, testcase.txid, txID)
verifyTxIDKeyDecodable(t,
constructTxIDKey(testcase.txid, testcase.blkNum, testcase.txNum),
encodedTxIDKey,
testcase.txid, testcase.blkNum, testcase.txNum,
)
})
}
}

func TestTxIDKeyDecodingInvalidInputs(t *testing.T) {
prefix := []byte{txIDIdxKeyPrefix}
txIDLen := util.EncodeOrderPreservingVarUint64(uint64(len("mytxid")))
txID := []byte("mytxid")

// empty byte
_, err := retrieveTxID([]byte{})
require.EqualError(t, err, "invalid txIDKey - zero-length slice")

// invalid prefix
invalidPrefix := []byte{txIDIdxKeyPrefix + 1}
_, err = retrieveTxID(invalidPrefix)
require.EqualError(t, err, fmt.Sprintf("invalid txIDKey {%x} - unexpected prefix", invalidPrefix))

// invalid key - only prefix
_, err = retrieveTxID(prefix)
require.EqualError(t, err, fmt.Sprintf("invalid txIDKey {%x}: number of consumed bytes from DecodeVarint is invalid, expected 1, but got 0", prefix))

// invalid key - incomplete length
incompleteLength := appendAllAndTrimLastByte(prefix, txIDLen)
_, err = retrieveTxID(incompleteLength)
require.EqualError(t, err, fmt.Sprintf("invalid txIDKey {%x}: decoded size (1) from DecodeVarint is more than available bytes (0)", incompleteLength))

// invalid key - incomplete txid
incompleteTxID := appendAllAndTrimLastByte(prefix, txIDLen, txID)
_, err = retrieveTxID(incompleteTxID)
require.EqualError(t, err, fmt.Sprintf("invalid txIDKey {%x}, fewer bytes present", incompleteTxID))
}

func TestExportUniqueTxIDs(t *testing.T) {
env := newTestEnv(t, NewConf(testPath(), 0))
defer env.Cleanup()
ledgerid := "testledger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()
blkfileMgr := blkfileMgrWrapper.blockfileMgr

bg, gb := testutil.NewBlockGenerator(t, "myChannel", false)
blkfileMgr.addBlock(gb)

// add genesis block and test the exported bytes
configTxID, err := protoutil.GetOrComputeTxIDFromEnvelope(gb.Data.Data[0])
require.NoError(t, err)
testWriter := bytes.NewBuffer(nil)
err = blkfileMgr.index.exportUniqueTxIDs(testWriter)
require.NoError(t, err)
verifyExportedTxIDs(t, testWriter.Bytes(), configTxID)

// add block-1 and test the exported bytes
block1 := bg.NextBlockWithTxid(
[][]byte{
[]byte("tx with id=txid-3"),
[]byte("tx with id=txid-1"),
[]byte("tx with id=txid-2"),
[]byte("another tx with existing id=txid-1"),
},
[]string{"txid-3", "txid-1", "txid-2", "txid-1"},
)
err = blkfileMgr.addBlock(block1)
require.NoError(t, err)
testWriter.Reset()
err = blkfileMgr.index.exportUniqueTxIDs(testWriter)
require.NoError(t, err)
verifyExportedTxIDs(t, testWriter.Bytes(), "txid-1", "txid-2", "txid-3", configTxID) //"txid-1" appears once, Txids appear in radix sort order

// add block-2 and test the exported bytes
block2 := bg.NextBlockWithTxid(
[][]byte{
[]byte("tx with id=txid-0000000"),
[]byte("tx with id=txid-3"),
[]byte("tx with id=txid-4"),
},
[]string{"txid-0000000", "txid-3", "txid-4"},
)
blkfileMgr.addBlock(block2)
require.NoError(t, err)
testWriter.Reset()
err = blkfileMgr.index.exportUniqueTxIDs(testWriter)
require.NoError(t, err)
verifyExportedTxIDs(t, testWriter.Bytes(), "txid-1", "txid-2", "txid-3", "txid-4", "txid-0000000", configTxID) // "txid-1", and "txid-3 appears once and Txids appear in radix sort order
}

func TestExportUniqueTxIDsWhenTxIDsNotIndexed(t *testing.T) {
env := newTestEnvSelectiveIndexing(t, NewConf(testPath(), 0), []IndexableAttr{IndexableAttrBlockNum}, &disabled.Provider{})
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testledger")
defer blkfileMgrWrapper.close()

blocks := testutil.ConstructTestBlocks(t, 5)
blkfileMgrWrapper.addBlocks(blocks)

err := blkfileMgrWrapper.blockfileMgr.index.exportUniqueTxIDs(bytes.NewBuffer(nil))
require.Equal(t, err, ErrAttrNotIndexed)
}

func TestExportUniqueTxIDsErrorCases(t *testing.T) {
env := newTestEnv(t, NewConf(testPath(), 0))
defer env.Cleanup()
ledgerid := "testledger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()

blocks := testutil.ConstructTestBlocks(t, 5)
blkfileMgrWrapper.addBlocks(blocks)
blockfileMgr := blkfileMgrWrapper.blockfileMgr
index := blockfileMgr.index

err := index.exportUniqueTxIDs(&errorThrowingWriter{errors.New("always return error")})
require.EqualError(t, err, "always return error")

index.db.Put([]byte{txIDIdxKeyPrefix}, []byte("some junk value"), true)
err = index.exportUniqueTxIDs(bytes.NewBuffer(nil))
require.EqualError(t, err, "invalid txIDKey {74}: number of consumed bytes from DecodeVarint is invalid, expected 1, but got 0")

env.provider.leveldbProvider.Close()
err = index.exportUniqueTxIDs(bytes.NewBuffer(nil))
require.EqualError(t, err, "internal leveldb error while obtaining db iterator: leveldb: closed")
}

func verifyExportedTxIDs(t *testing.T, actual []byte, expectedTxIDs ...string) {
expectedBytes := []byte{exportFileFormatVersion}
buf := make([]byte, binary.MaxVarintLen64)
for _, txID := range expectedTxIDs {
n := binary.PutUvarint(buf, uint64(len(txID)))
expectedBytes = append(expectedBytes, buf[:n]...)
expectedBytes = append(expectedBytes, []byte(txID)...)
}
require.Equal(t, expectedBytes, actual)
}

func appendAllAndTrimLastByte(input ...[]byte) []byte {
r := []byte{}
for _, i := range input {
r = append(r, i...)
}
return r[:len(r)-1]
}

func verifyTxIDKeyDecodable(t *testing.T, txIDKey []byte, expectedTxID string, expectedBlkNum, expectedTxNum uint64) {
length, lengthBytes, err := commonledgerutil.DecodeOrderPreservingVarUint64(txIDKey[1:])
require.NoError(t, err)
Expand All @@ -233,3 +379,11 @@ func verifyTxIDKeyDecodable(t *testing.T, txIDKey []byte, expectedTxID string, e
require.Equal(t, expectedTxNum, txNum)
require.Len(t, txIDKey, firstIndexTxNum+n)
}

type errorThrowingWriter struct {
err error
}

func (w *errorThrowingWriter) Write(p []byte) (n int, err error) {
return 0, w.err
}
11 changes: 10 additions & 1 deletion common/ledger/blkstorage/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package blkstorage

import (
"io"
"time"

"github.com/hyperledger/fabric-protos-go/common"
Expand Down Expand Up @@ -73,19 +74,27 @@ func (store *BlockStore) RetrieveTxByID(txID string) (*common.Envelope, error) {
return store.fileMgr.retrieveTransactionByID(txID)
}

// RetrieveTxByID returns a transaction for given transaction id
// RetrieveTxByBlockNumTranNum returns a transaction for the given <blockNum, tranNum>
func (store *BlockStore) RetrieveTxByBlockNumTranNum(blockNum uint64, tranNum uint64) (*common.Envelope, error) {
return store.fileMgr.retrieveTransactionByBlockNumTranNum(blockNum, tranNum)
}

// RetrieveBlockByTxID returns the block for the specified txID
func (store *BlockStore) RetrieveBlockByTxID(txID string) (*common.Block, error) {
return store.fileMgr.retrieveBlockByTxID(txID)
}

// RetrieveTxValidationCodeByTxID returns the validation code for the specified txID
func (store *BlockStore) RetrieveTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
return store.fileMgr.retrieveTxValidationCodeByTxID(txID)
}

// ExportTxIds writes all the TxIDs to the writer. Technically, the TxIDs appear in the sort order of radix-sort/shortlex. However,
// since practically all the TxIDs are of same length, so the sort order would be the lexical sort order
func (store *BlockStore) ExportTxIds(writer io.Writer) error {
return store.fileMgr.index.exportUniqueTxIDs(writer)
}

// Shutdown shuts down the block store
func (store *BlockStore) Shutdown() {
logger.Debugf("closing fs blockStore:%s", store.id)
Expand Down

0 comments on commit 5b4f9e6

Please sign in to comment.