Skip to content

Commit

Permalink
Maintain hashed index
Browse files Browse the repository at this point in the history
This commit adds code for creating hashed index entries during block
commit and deleting the hashed index entries during purge triggered by
Block to live based expiry

Signed-off-by: manish <[email protected]>
  • Loading branch information
manish-sethi authored and denyeart committed Jan 14, 2022
1 parent 08d7a98 commit f7e4999
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 12 deletions.
45 changes: 33 additions & 12 deletions core/ledger/kvledger/kv_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/ledger/queryresult"
"github.com/hyperledger/fabric-protos-go/ledger/rwset"
"github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validation"
"github.com/hyperledger/fabric/core/ledger/mock"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
Expand Down Expand Up @@ -1399,22 +1401,41 @@ func sampleDataWithPvtdataForAllTxs(t *testing.T, bg *testutil.BlockGenerator) [
}

func samplePvtData(t *testing.T, txNums []uint64) map[uint64]*ledger.TxPvtData {
pvtWriteSet := &rwset.TxPvtReadWriteSet{DataModel: rwset.TxReadWriteSet_KV}
pvtWriteSet.NsPvtRwset = []*rwset.NsPvtReadWriteSet{
{
Namespace: "ns-1",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "coll-1",
Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll1"),
},
{
CollectionName: "coll-2",
Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll2"),
txPvtWS := &rwsetutil.TxPvtRwSet{
NsPvtRwSet: []*rwsetutil.NsPvtRwSet{
{
NameSpace: "ns-1",
CollPvtRwSets: []*rwsetutil.CollPvtRwSet{
{
CollectionName: "coll-1",
KvRwSet: &kvrwset.KVRWSet{
Writes: []*kvrwset.KVWrite{
{
Key: "testKey",
Value: []byte("testValue"),
},
},
},
},
{
CollectionName: "coll-2",
KvRwSet: &kvrwset.KVRWSet{
Writes: []*kvrwset.KVWrite{
{
Key: "testKey",
Value: []byte("testValue"),
},
},
},
},
},
},
},
}

pvtWriteSet, err := txPvtWS.ToProtoMsg()
require.NoError(t, err)

var pvtData []*ledger.TxPvtData
for _, txNum := range txNums {
pvtData = append(pvtData, &ledger.TxPvtData{SeqInBlock: txNum, WriteSet: pvtWriteSet})
Expand Down
31 changes: 31 additions & 0 deletions core/ledger/pvtdatastorage/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ import (
"github.com/bits-and-blooms/bitset"
"github.com/hyperledger/fabric-protos-go/ledger/rwset"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/core/ledger/util"
)

func prepareStoreEntries(blockNum uint64, pvtData []*ledger.TxPvtData, btlPolicy pvtdatapolicy.BTLPolicy,
missingPvtData ledger.TxMissingPvtData) (*storeEntries, error) {
dataEntries := prepareDataEntries(blockNum, pvtData)

hashedIndexEntries, err := prepareHashedIndexEntries(dataEntries)
if err != nil {
return nil, err
}
elgMissingDataEntries, inelgMissingDataEntries := prepareMissingDataEntries(blockNum, missingPvtData)

expiryEntries, err := prepareExpiryEntries(blockNum, dataEntries, elgMissingDataEntries, inelgMissingDataEntries, btlPolicy)
Expand All @@ -28,6 +34,7 @@ func prepareStoreEntries(blockNum uint64, pvtData []*ledger.TxPvtData, btlPolicy

return &storeEntries{
dataEntries: dataEntries,
hashedIndexEntries: hashedIndexEntries,
expiryEntries: expiryEntries,
elgMissingDataEntries: elgMissingDataEntries,
inelgMissingDataEntries: inelgMissingDataEntries,
Expand Down Expand Up @@ -150,6 +157,30 @@ func prepareExpiryEntriesForMissingData(mapByExpiringBlk map[uint64]*ExpiryData,
return nil
}

func prepareHashedIndexEntries(dataEntires []*dataEntry) ([]*hashedIndexEntry, error) {
hashedIndexEntries := []*hashedIndexEntry{}
for _, d := range dataEntires {
collPvtWS, err := rwsetutil.CollPvtRwSetFromProtoMsg(d.value)
if err != nil {
return nil, err
}
for _, w := range collPvtWS.KvRwSet.Writes {
hashedIndexEntries = append(hashedIndexEntries,
&hashedIndexEntry{
key: &hashedIndexKey{
ns: d.key.ns,
coll: d.key.coll,
blkNum: d.key.blkNum,
txNum: d.key.txNum,
pvtkeyHash: util.ComputeStringHash(w.Key),
},
value: w.Key,
})
}
}
return hashedIndexEntries, nil
}

func getOrCreateExpiryData(mapByExpiringBlk map[uint64]*ExpiryData, expiringBlk uint64) *ExpiryData {
expiryData, ok := mapByExpiringBlk[expiringBlk]
if !ok {
Expand Down
45 changes: 45 additions & 0 deletions core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ type dataEntry struct {
value *rwset.CollectionPvtReadWriteSet
}

type hashedIndexEntry struct {
key *hashedIndexKey
value string
}

type expiryEntry struct {
key *expiryKey
value *ExpiryData
Expand Down Expand Up @@ -122,6 +127,7 @@ type hashedIndexKey struct {

type storeEntries struct {
dataEntries []*dataEntry
hashedIndexEntries []*hashedIndexEntry
expiryEntries []*expiryEntry
elgMissingDataEntries map[missingDataKey]*bitset.BitSet
inelgMissingDataEntries map[missingDataKey]*bitset.BitSet
Expand Down Expand Up @@ -284,6 +290,11 @@ func (s *Store) Commit(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtD
batch.Put(key, val)
}

for _, hashedIndexEntry := range storeEntries.hashedIndexEntries {
key := encodeHashedIndexKey(hashedIndexEntry.key)
batch.Put(key, []byte(hashedIndexEntry.value))
}

for _, expiryEntry := range storeEntries.expiryEntries {
key = encodeExpiryKey(expiryEntry.key)
if val, err = encodeExpiryValue(expiryEntry.value); err != nil {
Expand Down Expand Up @@ -650,6 +661,18 @@ func (s *Store) purgeExpiredData(minBlkNum, maxBlkNum uint64) error {
batch.Delete(encodeBootKVHashesKey(bootKVHashesKey))
}

dataEntries, err := s.retrieveDataEntries(dataKeys)
if err != nil {
return err
}
hashedIndexEntries, err := prepareHashedIndexEntries(dataEntries)
if err != nil {
return err
}
for _, hashedIndexEntry := range hashedIndexEntries {
batch.Delete(encodeHashedIndexKey(hashedIndexEntry.key))
}

if err := s.db.WriteBatch(batch, false); err != nil {
return err
}
Expand Down Expand Up @@ -686,6 +709,28 @@ func (s *Store) retrieveExpiryEntries(minBlkNum, maxBlkNum uint64) ([]*expiryEnt
return expiryEntries, nil
}

func (s *Store) retrieveDataEntries(dataKeys []*dataKey) ([]*dataEntry, error) {
dataEntries := []*dataEntry{}
for _, k := range dataKeys {
v, err := s.db.Get(encodeDataKey(k))
if err != nil {
return nil, err
}

collWS, err := decodeDataValue(v)
if err != nil {
return nil, err
}

dataEntries = append(dataEntries,
&dataEntry{
key: k,
value: collWS,
})
}
return dataEntries, nil
}

func (s *Store) launchCollElgProc() {
go func() {
if err := s.processCollElgEvents(); err != nil {
Expand Down
54 changes: 54 additions & 0 deletions core/ledger/pvtdatastorage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
btltestutil "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy/testutil"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -463,6 +464,22 @@ func TestStorePurge(t *testing.T) {
ns1Coll1 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: 1}, txNum: 2}
ns2Coll2 := &dataKey{nsCollBlk: nsCollBlk{ns: "ns-2", coll: "coll-2", blkNum: 1}, txNum: 2}

ns1Coll1Blk1Tx2HI := &hashedIndexKey{
ns: "ns-1",
coll: "coll-1",
pvtkeyHash: util.ComputeStringHash("key-ns-1-coll-1"),
blkNum: 1,
txNum: 2,
}

ns2Coll2Blk1Tx2HI := &hashedIndexKey{
ns: "ns-2",
coll: "coll-2",
pvtkeyHash: util.ComputeStringHash("key-ns-2-coll-2"),
blkNum: 1,
txNum: 2,
}

// eligible missingData entries for ns-1:coll-1, ns-1:coll-2 (neverExpires) should exist in store
ns1Coll1elgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: 1}}
ns1Coll2elgMD := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}}
Expand All @@ -481,6 +498,9 @@ func TestStorePurge(t *testing.T) {
require.True(t, testInelgMissingDataKeyExists(t, s, ns3Coll1inelgMD))
require.True(t, testInelgMissingDataKeyExists(t, s, ns3Coll2inelgMD))

require.True(t, testHashedIndexExists(t, s, ns1Coll1Blk1Tx2HI))
require.True(t, testHashedIndexExists(t, s, ns2Coll2Blk1Tx2HI))

deprioritizedList := ledger.MissingPvtDataInfo{
1: ledger.MissingBlockPvtdataInfo{
3: {
Expand All @@ -503,6 +523,10 @@ func TestStorePurge(t *testing.T) {
testWaitForPurgerRoutineToFinish(s)
require.True(t, testDataKeyExists(t, s, ns1Coll1))
require.True(t, testDataKeyExists(t, s, ns2Coll2))

require.True(t, testHashedIndexExists(t, s, ns1Coll1Blk1Tx2HI))
require.True(t, testHashedIndexExists(t, s, ns2Coll2Blk1Tx2HI))

// eligible missingData entries for ns-1:coll-1, ns-1:coll-2 (neverExpires) should exist in store
require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll1elgMD))
require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll2elgMD))
Expand All @@ -519,7 +543,11 @@ func TestStorePurge(t *testing.T) {
// but ns-2:coll-2 should exist because it expires at block 5
testWaitForPurgerRoutineToFinish(s)
require.False(t, testDataKeyExists(t, s, ns1Coll1))
require.False(t, testHashedIndexExists(t, s, ns1Coll1Blk1Tx2HI))

require.True(t, testDataKeyExists(t, s, ns2Coll2))
require.True(t, testHashedIndexExists(t, s, ns2Coll2Blk1Tx2HI))

// eligible missingData entries for ns-1:coll-1 should have expired and ns-1:coll-2 (neverExpires) should exist in store
require.False(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll1elgMD))
require.True(t, testElgPrioMissingDataKeyExists(t, s, ns1Coll2elgMD))
Expand All @@ -534,17 +562,32 @@ func TestStorePurge(t *testing.T) {
// ns-2:coll-2 should exist because though the data expires at block 5 but purger is launched every second block
testWaitForPurgerRoutineToFinish(s)
require.False(t, testDataKeyExists(t, s, ns1Coll1))
require.False(t, testHashedIndexExists(t, s, ns1Coll1Blk1Tx2HI))

require.True(t, testDataKeyExists(t, s, ns2Coll2))
require.True(t, testHashedIndexExists(t, s, ns2Coll2Blk1Tx2HI))

// write pvt data for block 6
require.NoError(t, s.Commit(6, nil, nil))
// ns-2:coll-2 should not exists now (because purger should be launched at block 6)
testWaitForPurgerRoutineToFinish(s)
require.False(t, testDataKeyExists(t, s, ns1Coll1))
require.False(t, testHashedIndexExists(t, s, ns1Coll1Blk1Tx2HI))

require.False(t, testDataKeyExists(t, s, ns2Coll2))
require.False(t, testHashedIndexExists(t, s, ns2Coll2Blk1Tx2HI))

// "ns-2:coll-1" should never have been purged (because, it was no btl was declared for this)
require.True(t, testDataKeyExists(t, s, &dataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-2", blkNum: 1}, txNum: 2}))
require.True(t, testHashedIndexExists(t, s,
&hashedIndexKey{
ns: "ns-1",
coll: "coll-2",
pvtkeyHash: util.ComputeStringHash("key-ns-1-coll-2"),
blkNum: 1,
txNum: 2,
},
))
}

func TestStoreState(t *testing.T) {
Expand Down Expand Up @@ -843,6 +886,17 @@ func testInelgMissingDataKeyExists(t *testing.T, s *Store, missingDataKey *missi
return len(val) != 0
}

func testHashedIndexExists(t *testing.T, s *Store, h *hashedIndexKey) bool {
val, err := s.db.Get(encodeHashedIndexKey(h))
require.NoError(t, err)

if len(val) == 0 {
return false
}
require.Equal(t, h.pvtkeyHash, util.ComputeHash(val))
return true
}

func testWaitForPurgerRoutineToFinish(s *Store) {
time.Sleep(1 * time.Second)
s.purgerLock.Lock()
Expand Down

0 comments on commit f7e4999

Please sign in to comment.