Skip to content

Commit

Permalink
Improve private data logging (hyperledger#1545)
Browse files Browse the repository at this point in the history
Changed following warning to debug message:
WARN The PvtRwset of PvtSimulationResultsWithConfig for txID [...] is nil. Skipping.
In v2.x peer attempts to retrieve private data from transient store one collection at a time.
If peer is eligible for multiple collections in a transaction,
and each transient store entry has one collection,
some of the retrieved records will have all collections filtered out and therefore the above message
is expected in v2.x and should not be a Warning (in v1.x this should not happen
since each transient store record was filtered against ALL eligible collections, therefore it
was a legitimate unexpected Warning message in v1.x).

Suppressed the following confusing message when there IS NO private data in a block:
INFO Successfully fetched all eligible collection private write sets for block [31]

When there IS private data in the block, replaced the message with a more useful message like this:
INFO Successfully fetched all 2 eligible collection private write sets for block [35] (0 from local cache, 1 from transient store, 1 from other peers)

And if peer can't fetch all the private data in a block, you'll now get this message:
WARN Could not fetch all 2 eligible collection private write sets for block [36] (0 from local cache, 1 from transient store, 0 from other peers).
     Will commit block with missing private write sets.

Finally, removed some of the redundant or not useful debug messages.

Signed-off-by: David Enyeart <[email protected]>
  • Loading branch information
denyeart committed Jul 8, 2020
1 parent ff66f5e commit 9d08ec7
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 42 deletions.
2 changes: 2 additions & 0 deletions core/transientstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ func (scanner *RwsetScanner) Next() (*EndorserPvtSimulationResults, error) {
return nil, err
}

// trim the tx rwset based on the current collection filter,
// nil will be returned to filteredTxPvtRWSet if the transient store txid entry does not contain the data for the collection
filteredTxPvtRWSet = trimPvtWSet(txPvtRWSetWithConfig.GetPvtRwset(), scanner.filter)
configs, err := trimPvtCollectionConfigs(txPvtRWSetWithConfig.CollectionConfigs, scanner.filter)
if err != nil {
Expand Down
106 changes: 64 additions & 42 deletions gossip/privdata/pvtdataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ func (ec *eligibilityComputer) computeEligibility(mspID string, pvtdataToRetriev
}

return &pvtdataRetrievalInfo{
sources: sources,
txns: txList,
eligibleMissingKeys: eligibleMissingKeys,
ineligibleMissingKeys: ineligibleMissingKeys,
sources: sources,
txns: txList,
remainingEligibleMissingKeys: eligibleMissingKeys,
ineligibleMissingKeys: ineligibleMissingKeys,
}, nil
}

Expand Down Expand Up @@ -202,32 +202,48 @@ func (pdp *PvtdataProvider) RetrievePvtdata(pvtdataToRetrieve []*ledger.TxPvtdat

pvtdata := make(rwsetByKeys)

//If there is no private data to retrieve for the block, skip all population attempts and return
if len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) == 0 {
pdp.logger.Debugf("No eligible collection private write sets to fetch for block [%d]", pdp.blockNum)
retrievedPvtdata.pvtdataRetrievalInfo = pvtdataRetrievalInfo
retrievedPvtdata.blockPvtdata = pdp.prepareBlockPvtdata(pvtdata, pvtdataRetrievalInfo)
return retrievedPvtdata, nil
}

fetchStats := &fetchStats{}

totalEligibleMissingKeysToRetrieve := len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)

// POPULATE FROM CACHE
pdp.populateFromCache(pvtdata, pvtdataRetrievalInfo, pvtdataToRetrieve)
if len(pvtdataRetrievalInfo.eligibleMissingKeys) == 0 {
pdp.logger.Debug("No missing collection private write sets to fetch from transient store")
fetchStats.fromLocalCache = totalEligibleMissingKeysToRetrieve - len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)

if len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) == 0 {
pdp.logger.Infof("Successfully fetched all %d eligible collection private write sets for block [%d] %s", totalEligibleMissingKeysToRetrieve, pdp.blockNum, fetchStats)
retrievedPvtdata.pvtdataRetrievalInfo = pvtdataRetrievalInfo
retrievedPvtdata.blockPvtdata = pdp.prepareBlockPvtdata(pvtdata, pvtdataRetrievalInfo)
return retrievedPvtdata, nil
}

// POPULATE FROM TRANSIENT STORE
pdp.logger.Debugf("Could not find all collection private write sets in cache for block [%d]", pdp.blockNum)
pdp.logger.Debugf("Fetching %d collection private write sets from transient store", len(pvtdataRetrievalInfo.eligibleMissingKeys))
numRemainingToFetch := len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)
pdp.populateFromTransientStore(pvtdata, pvtdataRetrievalInfo)
if len(pvtdataRetrievalInfo.eligibleMissingKeys) == 0 {
pdp.logger.Debug("No missing collection private write sets to fetch from remote peers")
fetchStats.fromTransientStore = numRemainingToFetch - len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)

if len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) == 0 {
pdp.logger.Infof("Successfully fetched all %d eligible collection private write sets for block [%d] %s", totalEligibleMissingKeysToRetrieve, pdp.blockNum, fetchStats)
retrievedPvtdata.pvtdataRetrievalInfo = pvtdataRetrievalInfo
retrievedPvtdata.blockPvtdata = pdp.prepareBlockPvtdata(pvtdata, pvtdataRetrievalInfo)
return retrievedPvtdata, nil
}

// POPULATE FROM REMOTE PEERS
numRemainingToFetch = len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)
retryThresh := pdp.pullRetryThreshold
pdp.logger.Debugf("Could not find all collection private write sets in local peer transient store for block [%d]", pdp.blockNum)
pdp.logger.Debugf("Fetching %d collection private write sets from remote peers for a maximum duration of %s", len(pvtdataRetrievalInfo.eligibleMissingKeys), retryThresh)
pdp.logger.Debugf("Fetching %d collection private write sets from remote peers for a maximum duration of %s", len(pvtdataRetrievalInfo.remainingEligibleMissingKeys), retryThresh)
startPull := time.Now()
for len(pvtdataRetrievalInfo.eligibleMissingKeys) > 0 && time.Since(startPull) < retryThresh {
for len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) > 0 && time.Since(startPull) < retryThresh {
if needToRetry := pdp.populateFromRemotePeers(pvtdata, pvtdataRetrievalInfo); !needToRetry {
break
}
Expand All @@ -237,11 +253,14 @@ func (pdp *PvtdataProvider) RetrievePvtdata(pvtdataToRetrieve []*ledger.TxPvtdat
elapsedPull := int64(time.Since(startPull) / time.Millisecond) // duration in ms
pdp.fetchDurationHistogram.Observe(time.Since(startPull).Seconds())

if len(pvtdataRetrievalInfo.eligibleMissingKeys) == 0 {
fetchStats.fromRemotePeer = numRemainingToFetch - len(pvtdataRetrievalInfo.remainingEligibleMissingKeys)

if len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) == 0 {
pdp.logger.Debugf("Fetched all missing collection private write sets from remote peers for block [%d] (%dms)", pdp.blockNum, elapsedPull)
pdp.logger.Infof("Successfully fetched all %d eligible collection private write sets for block [%d] %s", totalEligibleMissingKeysToRetrieve, pdp.blockNum, fetchStats)
} else {
pdp.logger.Debugf("Could not fetch all missing collection private write sets from remote peers for block [%d]",
pdp.blockNum)
pdp.logger.Warningf("Could not fetch all %d eligible collection private write sets for block [%d] %s. Will commit block with missing private write sets:[%v]",
totalEligibleMissingKeysToRetrieve, pdp.blockNum, fetchStats, pvtdataRetrievalInfo.remainingEligibleMissingKeys)
}

retrievedPvtdata.pvtdataRetrievalInfo = pvtdataRetrievalInfo
Expand All @@ -252,7 +271,7 @@ func (pdp *PvtdataProvider) RetrievePvtdata(pvtdataToRetrieve []*ledger.TxPvtdat
// populateFromCache populates pvtdata with data fetched from cache and updates
// pvtdataRetrievalInfo by removing missing data that was fetched from cache
func (pdp *PvtdataProvider) populateFromCache(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo, pvtdataToRetrieve []*ledger.TxPvtdataInfo) {
pdp.logger.Debugf("Attempting to retrieve %d private write sets from cache.", len(pvtdataRetrievalInfo.eligibleMissingKeys))
pdp.logger.Debugf("Attempting to retrieve %d private write sets from cache.", len(pvtdataRetrievalInfo.remainingEligibleMissingKeys))

for _, txPvtdata := range pdp.prefetchedPvtdata {
txID := getTxIDBySeqInBlock(txPvtdata.SeqInBlock, pvtdataToRetrieve)
Expand All @@ -271,14 +290,14 @@ func (pdp *PvtdataProvider) populateFromCache(pvtdata rwsetByKeys, pvtdataRetrie
hash: hex.EncodeToString(commonutil.ComputeSHA256(col.Rwset)),
}
// skip if key not originally missing
if _, missing := pvtdataRetrievalInfo.eligibleMissingKeys[key]; !missing {
if _, missing := pvtdataRetrievalInfo.remainingEligibleMissingKeys[key]; !missing {
pdp.logger.Warningf("Found extra data in prefetched:[%v]. Skipping.", key)
continue
}
// populate the pvtdata with the RW set from the cache
pvtdata[key] = col.Rwset
// remove key from missing
delete(pvtdataRetrievalInfo.eligibleMissingKeys, key)
delete(pvtdataRetrievalInfo.remainingEligibleMissingKeys, key)
} // iterate over collections in the namespace
} // iterate over the namespaces in the WSet
} // iterate over cached private data in the block
Expand All @@ -287,10 +306,10 @@ func (pdp *PvtdataProvider) populateFromCache(pvtdata rwsetByKeys, pvtdataRetrie
// populateFromTransientStore populates pvtdata with data fetched from transient store
// and updates pvtdataRetrievalInfo by removing missing data that was fetched from transient store
func (pdp *PvtdataProvider) populateFromTransientStore(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo) {
pdp.logger.Debugf("Attempting to retrieve %d private write sets from transient store.", len(pvtdataRetrievalInfo.eligibleMissingKeys))
pdp.logger.Debugf("Attempting to retrieve %d private write sets from transient store.", len(pvtdataRetrievalInfo.remainingEligibleMissingKeys))

// Put into pvtdata RW sets that are missing and found in the transient store
for k := range pvtdataRetrievalInfo.eligibleMissingKeys {
for k := range pvtdataRetrievalInfo.remainingEligibleMissingKeys {
filter := ledger.NewPvtNsCollFilter()
filter.Add(k.namespace, k.collection)
iterator, err := pdp.transientStore.GetTxPvtRWSetByTxid(k.txID, filter)
Expand All @@ -314,8 +333,9 @@ func (pdp *PvtdataProvider) populateFromTransientStore(pvtdata rwsetByKeys, pvtd
continue
}
simRes := res.PvtSimulationResultsWithConfig
// simRes.PvtRwset will be nil if the transient store contains an entry for the txid but the entry does not contain the data for the collection
if simRes.PvtRwset == nil {
pdp.logger.Warningf("The PvtRwset of PvtSimulationResultsWithConfig for txID [%s] is nil. Skipping.", k.txID)
pdp.logger.Debugf("The PvtRwset of PvtSimulationResultsWithConfig for txID [%s] is nil. Skipping.", k.txID)
continue
}
for _, ns := range simRes.PvtRwset.NsPvtRwset {
Expand All @@ -328,13 +348,14 @@ func (pdp *PvtdataProvider) populateFromTransientStore(pvtdata rwsetByKeys, pvtd
hash: hex.EncodeToString(commonutil.ComputeSHA256(col.Rwset)),
}
// skip if not missing
if _, missing := pvtdataRetrievalInfo.eligibleMissingKeys[key]; !missing {
if _, missing := pvtdataRetrievalInfo.remainingEligibleMissingKeys[key]; !missing {
continue
}
// populate the pvtdata with the RW set from the transient store
pdp.logger.Debugf("Found private data for key %v in transient store", key)
pvtdata[key] = col.Rwset
// remove key from missing
delete(pvtdataRetrievalInfo.eligibleMissingKeys, key)
delete(pvtdataRetrievalInfo.remainingEligibleMissingKeys, key)
} // iterating over all collections
} // iterating over all namespaces
} // iterating over the TxPvtRWSet results
Expand All @@ -344,11 +365,11 @@ func (pdp *PvtdataProvider) populateFromTransientStore(pvtdata rwsetByKeys, pvtd
// populateFromRemotePeers populates pvtdata with data fetched from remote peers and updates
// pvtdataRetrievalInfo by removing missing data that was fetched from remote peers
func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo) bool {
pdp.logger.Debugf("Attempting to retrieve %d private write sets from remote peers.", len(pvtdataRetrievalInfo.eligibleMissingKeys))
pdp.logger.Debugf("Attempting to retrieve %d private write sets from remote peers.", len(pvtdataRetrievalInfo.remainingEligibleMissingKeys))

dig2src := make(map[pvtdatacommon.DigKey][]*peer.Endorsement)
var skipped int
for k, v := range pvtdataRetrievalInfo.eligibleMissingKeys {
for k, v := range pvtdataRetrievalInfo.remainingEligibleMissingKeys {
if v.invalid && pdp.skipPullingInvalidTransactions {
pdp.logger.Debugf("Skipping invalid key [%v] because peer is configured to skip pulling rwsets of invalid transactions.", k)
skipped++
Expand Down Expand Up @@ -387,7 +408,7 @@ func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdata
hash: hex.EncodeToString(commonutil.ComputeSHA256(rws)),
}
// skip if not missing
if _, missing := pvtdataRetrievalInfo.eligibleMissingKeys[key]; !missing {
if _, missing := pvtdataRetrievalInfo.remainingEligibleMissingKeys[key]; !missing {
// key isn't missing and was never fetched earlier, log that it wasn't originally requested
if _, exists := pvtdata[key]; !exists {
pdp.logger.Debugf("Ignoring [%v] because it was never requested.", key)
Expand All @@ -397,26 +418,26 @@ func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdata
// populate the pvtdata with the RW set from the remote peer
pvtdata[key] = rws
// remove key from missing
delete(pvtdataRetrievalInfo.eligibleMissingKeys, key)
delete(pvtdataRetrievalInfo.remainingEligibleMissingKeys, key)
pdp.logger.Debugf("Fetched [%v]", key)
}
}
// Iterate over purged data
for _, dig := range fetchedData.PurgedElements {
// delete purged key from missing keys
for missingPvtRWKey := range pvtdataRetrievalInfo.eligibleMissingKeys {
for missingPvtRWKey := range pvtdataRetrievalInfo.remainingEligibleMissingKeys {
if missingPvtRWKey.namespace == dig.Namespace &&
missingPvtRWKey.collection == dig.Collection &&
missingPvtRWKey.seqInBlock == dig.SeqInBlock &&
missingPvtRWKey.txID == dig.TxId {
delete(pvtdataRetrievalInfo.eligibleMissingKeys, missingPvtRWKey)
delete(pvtdataRetrievalInfo.remainingEligibleMissingKeys, missingPvtRWKey)
pdp.logger.Warningf("Missing key because was purged or will soon be purged, "+
"continue block commit without [%+v] in private rwset", missingPvtRWKey)
}
}
}

return len(pvtdataRetrievalInfo.eligibleMissingKeys) > skipped
return len(pvtdataRetrievalInfo.remainingEligibleMissingKeys) > skipped
}

// prepareBlockPvtdata consolidates the fetched private data as well as ineligible and eligible
Expand All @@ -427,13 +448,6 @@ func (pdp *PvtdataProvider) prepareBlockPvtdata(pvtdata rwsetByKeys, pvtdataRetr
MissingPvtData: make(ledger.TxMissingPvtDataMap),
}

if len(pvtdataRetrievalInfo.eligibleMissingKeys) == 0 {
pdp.logger.Infof("Successfully fetched all eligible collection private write sets for block [%d]", pdp.blockNum)
} else {
pdp.logger.Warningf("Could not fetch all missing eligible collection private write sets for block [%d]. Will commit block with missing private write sets:[%v]",
pdp.blockNum, pvtdataRetrievalInfo.eligibleMissingKeys)
}

for seqInBlock, nsRWS := range pvtdata.bySeqsInBlock() {
// add all found pvtdata to blockPvtDataPvtdata for seqInBlock
blockPvtdata.PvtData[seqInBlock] = &ledger.TxPvtData{
Expand All @@ -442,7 +456,7 @@ func (pdp *PvtdataProvider) prepareBlockPvtdata(pvtdata rwsetByKeys, pvtdataRetr
}
}

for key := range pvtdataRetrievalInfo.eligibleMissingKeys {
for key := range pvtdataRetrievalInfo.remainingEligibleMissingKeys {
blockPvtdata.MissingPvtData.Add(key.seqInBlock, key.namespace, key.collection, true)
}

Expand All @@ -454,10 +468,10 @@ func (pdp *PvtdataProvider) prepareBlockPvtdata(pvtdata rwsetByKeys, pvtdataRetr
}

type pvtdataRetrievalInfo struct {
sources map[rwSetKey][]*peer.Endorsement
txns []string
eligibleMissingKeys rwsetKeys
ineligibleMissingKeys rwsetKeys
sources map[rwSetKey][]*peer.Endorsement
txns []string
remainingEligibleMissingKeys rwsetKeys
ineligibleMissingKeys rwsetKeys
}

// rwset types
Expand Down Expand Up @@ -564,3 +578,11 @@ func endorsersFromEligibleOrgs(ns string, col string, endorsers []*peer.Endorsem
}
return res
}

type fetchStats struct {
fromLocalCache, fromTransientStore, fromRemotePeer int
}

func (stats fetchStats) String() string {
return fmt.Sprintf("(%d from local cache, %d from transient store, %d from other peers)", stats.fromLocalCache, stats.fromTransientStore, stats.fromRemotePeer)
}
9 changes: 9 additions & 0 deletions gossip/privdata/pvtdataprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,15 @@ func TestRetrievedPvtdataPurgeBelowHeight(t *testing.T) {
}
}

func TestFetchStats(t *testing.T) {
fetchStats := fetchStats{
fromLocalCache: 1,
fromTransientStore: 2,
fromRemotePeer: 3,
}
assert.Equal(t, "(1 from local cache, 2 from transient store, 3 from other peers)", fetchStats.String())
}

func testRetrievePvtdataSuccess(t *testing.T,
scenario string,
ts testSupport,
Expand Down

0 comments on commit 9d08ec7

Please sign in to comment.