Skip to content

Commit

Permalink
reuse leveldb batch and fix batchsize reset (hyperledger#1533)
Browse files Browse the repository at this point in the history
This commit
(1) Enables the reuse of leveldb batch -- reduces the memory utilisation
    and also the burden on GC.
(2) Fixes a missing logic of batchsize reset in confighistory store
    `ImportXXX()`

Signed-off-by: senthil <[email protected]>
  • Loading branch information
cendhu authored Jul 2, 2020
1 parent 74dd4dc commit 50272f1
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 9 deletions.
5 changes: 3 additions & 2 deletions common/ledger/blkstorage/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ func importTxIDsFromSnapshot(
lastBlockNumInSnapshot uint64,
db *leveldbhelper.DBHandle) error {

batch := db.NewUpdateBatch()
txIDsMetadata, err := snapshot.OpenFile(filepath.Join(snapshotDir, snapshotMetadataFileName), snapshotFileFormat)
if err != nil {
return err
Expand All @@ -356,6 +355,8 @@ func importTxIDsFromSnapshot(
if err != nil {
return err
}

batch := db.NewUpdateBatch()
for i := uint64(0); i < numTxIDs; i++ {
txID, err := txIDsData.DecodeString()
if err != nil {
Expand All @@ -369,7 +370,7 @@ func importTxIDsFromSnapshot(
if err := db.WriteBatch(batch, true); err != nil {
return err
}
batch = db.NewUpdateBatch()
batch.Reset()
}
}
batch.Put(indexSavePointKey, encodeBlockNum(lastBlockNumInSnapshot))
Expand Down
10 changes: 6 additions & 4 deletions common/ledger/blkstorage/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type rollbackMgr struct {
dbProvider *leveldbhelper.Provider
indexStore *blockIndex
targetBlockNum uint64
reusableBatch *leveldbhelper.UpdateBatch
}

// Rollback reverts changes made to the block store beyond a given block number.
Expand Down Expand Up @@ -70,6 +71,7 @@ func newRollbackMgr(blockStorageDir, ledgerID string, indexConfig *IndexConfig,
}
indexDB := r.dbProvider.GetDBHandle(ledgerID)
r.indexStore, err = newBlockIndex(indexConfig, indexDB)
r.reusableBatch = r.indexStore.db.NewUpdateBatch()
return r, err
}

Expand Down Expand Up @@ -113,7 +115,7 @@ func (r *rollbackMgr) deleteIndexEntriesRange(startBlkNum, endBlkNum uint64) err
// entries. However, if there is more than more than 1 channel, dropping of
// index would impact the time taken to recover the peer. We need to analyze
// a bit before making a decision on rollback vs drop of index. FAB-15672
batch := r.indexStore.db.NewUpdateBatch()
r.reusableBatch.Reset()
lp, err := r.indexStore.getBlockLocByBlockNum(startBlkNum)
if err != nil {
return err
Expand All @@ -135,12 +137,12 @@ func (r *rollbackMgr) deleteIndexEntriesRange(startBlkNum, endBlkNum uint64) err
if err != nil {
return err
}
addIndexEntriesToBeDeleted(batch, blockInfo, r.indexStore)
addIndexEntriesToBeDeleted(r.reusableBatch, blockInfo, r.indexStore)
numberOfBlocksToRetrieve--
}

batch.Put(indexSavePointKey, encodeBlockNum(startBlkNum-1))
return r.indexStore.db.WriteBatch(batch, true)
r.reusableBatch.Put(indexSavePointKey, encodeBlockNum(startBlkNum-1))
return r.indexStore.db.WriteBatch(r.reusableBatch, true)
}

func addIndexEntriesToBeDeleted(batch *leveldbhelper.UpdateBatch, blockInfo *serializedBlockInfo, indexStore *blockIndex) error {
Expand Down
2 changes: 1 addition & 1 deletion common/ledger/util/leveldbhelper/leveldb_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (h *DBHandle) DeleteAll() error {
}
logger.Infof("Have removed %d entries for channel %s in leveldb %s", numKeys, h.dbName, h.db.conf.DBPath)
batchSize = 0
batch = &leveldb.Batch{}
batch.Reset()
}
}
if batch.Len() > 0 {
Expand Down
3 changes: 2 additions & 1 deletion core/ledger/confighistory/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ func (m *Mgr) ImportConfigHistory(ledgerID string, dir string) error {
if err := db.WriteBatch(batch, true); err != nil {
return err
}
batch = db.NewUpdateBatch()
currentBatchSize = 0
batch.Reset()
}
}
return db.WriteBatch(batch, true)
Expand Down
2 changes: 1 addition & 1 deletion core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ func (s *Store) processCollElgEvents() error {
collEntriesConverted++
if batch.Len() > s.maxBatchSize {
s.db.WriteBatch(batch, true)
batch = s.db.NewUpdateBatch()
batch.Reset()
sleepTime := time.Duration(s.batchesInterval)
logger.Infof("Going to sleep for %d milliseconds between batches. Entries for [ns=%s, coll=%s] converted so far = %d",
sleepTime, ns, coll, collEntriesConverted)
Expand Down

0 comments on commit 50272f1

Please sign in to comment.