Skip to content

Commit

Permalink
Import state from snapshotted data in stateleveldb (hyperledger#1516)
Browse files Browse the repository at this point in the history
This commit enables statelevedb for importing the state data
for a channel from an iterator that is expected to read from
previously snapshotted state data

Signed-off-by: manish <[email protected]>
  • Loading branch information
manish-sethi authored Jul 1, 2020
1 parent 4efce81 commit d882a2d
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 94 deletions.
15 changes: 15 additions & 0 deletions common/ledger/util/leveldbhelper/leveldb_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,21 @@ func (h *DBHandle) DeleteAll() error {
return nil
}

// IsEmpty returns true if no data exists for the DBHandle
func (h *DBHandle) IsEmpty() (bool, error) {
itr, err := h.GetIterator(nil, nil)
if err != nil {
return false, err
}
defer itr.Release()

if err := itr.Error(); err != nil {
return false, errors.WithMessagef(itr.Error(), "internal leveldb error while obtaining next entry from iterator")
}

return !itr.Next(), nil
}

// NewUpdateBatch returns a new UpdateBatch that can be used to update the db
func (h *DBHandle) NewUpdateBatch() *UpdateBatch {
return &UpdateBatch{
Expand Down
73 changes: 73 additions & 0 deletions common/ledger/util/leveldbhelper/leveldb_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,79 @@ func TestClose(t *testing.T) {
require.Equal(t, map[string]*DBHandle{}, p.dbHandles)
}

func TestIsEmpty(t *testing.T) {
var env *testDBProviderEnv
var db1, db2 *DBHandle

setup := func() {
env = newTestProviderEnv(t, testDBPath)
p := env.provider
db1 = p.GetDBHandle("db1")
db2 = p.GetDBHandle("db2")
}

cleanup := func() {
env.cleanup()
}

t.Run("both the dbs are empty", func(t *testing.T) {
setup()
defer cleanup()

empty, err := db1.IsEmpty()
require.NoError(t, err)
require.True(t, empty)

empty, err = db2.IsEmpty()
require.NoError(t, err)
require.True(t, empty)
})

t.Run("only one db is empty", func(t *testing.T) {
setup()
defer cleanup()

db1.Put([]byte("key"), []byte("value"), false)
empty, err := db1.IsEmpty()
require.NoError(t, err)
require.False(t, empty)

empty, err = db2.IsEmpty()
require.NoError(t, err)
require.True(t, empty)
})

t.Run("both the dbs contain data", func(t *testing.T) {
setup()
defer cleanup()

db1.Put([]byte("key"), []byte("value"), false)
db2.Put([]byte("key"), []byte("value"), false)

empty, err := db1.IsEmpty()
require.NoError(t, err)
require.False(t, empty)

empty, err = db2.IsEmpty()
require.NoError(t, err)
require.False(t, empty)
})

t.Run("iter error", func(t *testing.T) {
setup()
defer cleanup()

env.provider.Close()
empty, err := db1.IsEmpty()
require.EqualError(t, err, "internal leveldb error while obtaining db iterator: leveldb: closed")
require.False(t, empty)

empty, err = db2.IsEmpty()
require.EqualError(t, err, "internal leveldb error while obtaining db iterator: leveldb: closed")
require.False(t, empty)
})
}

func testFormatCheck(t *testing.T, dataFormat, expectedFormat string, dataExists bool, expectedErr *dataformat.ErrFormatMismatch) {
assert.NoError(t, os.RemoveAll(testDBPath))
defer func() {
Expand Down
13 changes: 0 additions & 13 deletions core/ledger/confighistory/db_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,6 @@ func (d *db) getNamespaceIterator(ns string) (*leveldbhelper.Iterator, error) {
return d.GetIterator(nsStartKey, nsEndKey)
}

func (d *db) isEmpty() (bool, error) {
itr, err := d.GetIterator(nil, nil)
if err != nil {
return false, err
}
defer itr.Release()
entryExist := itr.Next()
if err := itr.Error(); err != nil {
return false, errors.WithMessagef(err, "internal leveldb error while obtaining next entry from iterator")
}
return !entryExist, nil
}

func encodeCompositeKey(ns, key string, blockNum uint64) []byte {
b := []byte(keyPrefix + ns)
b = append(b, separatorByte)
Expand Down
40 changes: 0 additions & 40 deletions core/ledger/confighistory/db_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,46 +113,6 @@ func TestGetNamespaceIterator(t *testing.T) {
})
}

func TestIsNotEmpty(t *testing.T) {
testDBPath := "/tmp/fabric/core/ledger/confighistory"
deleteTestPath(t, testDBPath)
provider, err := newDBProvider(testDBPath)
require.NoError(t, err)
defer deleteTestPath(t, testDBPath)

db := provider.getDB("ledger1")

t.Run("db is empty", func(t *testing.T) {
empty, err := db.isEmpty()
require.NoError(t, err)
require.True(t, empty)
})

t.Run("db is not empty", func(t *testing.T) {
sampleData := []*compositeKV{
{
&compositeKey{
ns: "ns1",
key: "key1",
blockNum: 40,
},
[]byte("val1_40"),
},
}
populateDBWithSampleData(t, db, sampleData)
empty, err := db.isEmpty()
require.NoError(t, err)
require.False(t, empty)
})

t.Run("iter error", func(t *testing.T) {
provider.Close()
empty, err := db.isEmpty()
require.EqualError(t, err, "internal leveldb error while obtaining db iterator: leveldb: closed")
require.False(t, empty)
})
}

func verifyNsEntries(t *testing.T, nsItr *leveldbhelper.Iterator, expectedEntries []*compositeKV) {
var retrievedEntries []*compositeKV
for nsItr.Next() {
Expand Down
4 changes: 3 additions & 1 deletion core/ledger/confighistory/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (m *Mgr) Name() string {
return "collection configuration history listener"
}

// Initialize implements function from the interface ledger.StateListener
func (m *Mgr) Initialize(ledgerID string, qe ledger.SimpleQueryExecutor) error {
// Noop
return nil
Expand Down Expand Up @@ -112,7 +113,7 @@ func (m *Mgr) HandleStateUpdates(trigger *ledger.StateUpdateTrigger) error {
// ledgerID from the snapshot files present in the dir
func (m *Mgr) ImportConfigHistory(ledgerID string, dir string) error {
db := m.dbProvider.getDB(ledgerID)
empty, err := db.isEmpty()
empty, err := db.IsEmpty()
if err != nil {
return err
}
Expand Down Expand Up @@ -175,6 +176,7 @@ func (m *Mgr) Close() {
m.dbProvider.Close()
}

// Retriever helps consumer retrieve collection config history
type Retriever struct {
ledgerInfoRetriever LedgerInfoRetriever
ledgerID string
Expand Down
77 changes: 45 additions & 32 deletions core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,13 +983,12 @@ func TestApplyUpdatesWithNilHeight(t *testing.T, dbProvider statedb.VersionedDBP
// (because batch2 calls ApplyUpdates with savepoint as nil)
}

func TestFullScanIterator(
func TestDataExportImport(
t *testing.T,
dbProvider statedb.VersionedDBProvider,
valueFormat byte,
dbValueDeserializer func(b []byte) (*statedb.VersionedValue, error)) {
valueFormat byte) {

db, err := dbProvider.GetDBHandle("test-full-scan-iterator", nil)
sourceDB, err := dbProvider.GetDBHandle("sourceLedger", nil)
assert.NoError(t, err)

// generateSampleData returns a slice of KVs. The returned value contains five KVs for each of the namespaces
Expand All @@ -1014,43 +1013,56 @@ func TestFullScanIterator(
return sampleData
}

// add the sample data for four namespaces to the db
// add the sample data for five namespaces to the db
allNamesapces := stringset{"", "ns1", "ns2", "ns3", "ns4"}
batch := statedb.NewUpdateBatch()
for _, kv := range generateSampleData("", "ns1", "ns2", "ns3", "ns4") {
for _, kv := range generateSampleData(allNamesapces...) {
batch.PutValAndMetadata(kv.Namespace, kv.Key, kv.Value, kv.Metadata, kv.Version)
}
db.ApplyUpdates(batch, version.NewHeight(5, 5))
sourceDB.ApplyUpdates(batch, version.NewHeight(5, 5))

// verifyFullScanIterator verifies the output of the FullScanIterator with skipping zero or more namespaces
verifyFullScanIterator := func(skipNamespaces stringset) {
fullScanItr, valFormat, err := db.GetFullScanIterator(
// verifyExportImport uses FullScanIterator (with skipping zero or more namespaces)
// for exporting the data to import into another ledger instance and verifies the
// correctness of the imported data
verifyExportImport := func(destDBName string, skipNamespaces stringset) {
fullScanItr, valFormat, err := sourceDB.GetFullScanIterator(
func(ns string) bool {
return skipNamespaces.contains(ns)
},
)
require.NoError(t, err)
require.Equal(t, valueFormat, valFormat)
results := []*statedb.VersionedKV{}
for {
compositeKV, serializedVersionedValue, err := fullScanItr.Next()

destinationDB, err := dbProvider.GetDBHandle(destDBName, nil)
assert.NoError(t, err)

err = destinationDB.ImportState(fullScanItr, valFormat)
require.NoError(t, err)

for _, nsNotToBePresent := range skipNamespaces {
iter, err := destinationDB.GetStateRangeScanIterator(nsNotToBePresent, "", "")
require.NoError(t, err)
if compositeKV == nil {
break
}
versionedVal, err := dbValueDeserializer(serializedVersionedValue)
res, err := iter.Next()
require.NoError(t, err)
results = append(results, &statedb.VersionedKV{
CompositeKey: *compositeKV,
VersionedValue: *versionedVal,
})
require.Nil(t, res)
}

expectedNamespacesInResults := stringset{"", "ns1", "ns2", "ns3", "ns4"}.minus(skipNamespaces)
expectedResults := []*statedb.VersionedKV{}
for _, ns := range expectedNamespacesInResults {
expectedResults = append(expectedResults, generateSampleData(ns)...)
expectedNamespacesInDestinationDB := allNamesapces.minus(skipNamespaces)
results := []*statedb.VersionedKV{}
for _, nsToBePresent := range expectedNamespacesInDestinationDB {
iter, err := destinationDB.GetStateRangeScanIterator(nsToBePresent, "", "")
require.NoError(t, err)
for {
res, err := iter.Next()
require.NoError(t, err)
if res == nil {
break
}
versionedKV := res.(*statedb.VersionedKV)
results = append(results, versionedKV)
}
}
require.Equal(t, expectedResults, results)
require.Equal(t, generateSampleData(expectedNamespacesInDestinationDB...), results)
}

testCases := []stringset{
Expand All @@ -1067,29 +1079,30 @@ func TestFullScanIterator(
}

for i, testCase := range testCases {
name := fmt.Sprintf("testCase %d", i)
t.Run(
fmt.Sprintf("testCase %d", i),
name,
func(t *testing.T) {
verifyFullScanIterator(testCase)
verifyExportImport(name, testCase)
},
)
}
}

type stringset []string

func (universe stringset) contains(str string) bool {
for _, element := range universe {
func (s stringset) contains(str string) bool {
for _, element := range s {
if element == str {
return true
}
}
return false
}

func (universe stringset) minus(toMinus stringset) stringset {
func (s stringset) minus(toMinus stringset) stringset {
var final stringset
for _, element := range universe {
for _, element := range s {
if toMinus.contains(element) {
continue
}
Expand Down
Loading

0 comments on commit d882a2d

Please sign in to comment.