Skip to content

Commit

Permalink
Foldersync Integration Test (textileio#297)
Browse files Browse the repository at this point in the history
* Most things compiling

Signed-off-by: Aaron Sutula <[email protected]>

* improvements around invite links

Signed-off-by: Aaron Sutula <[email protected]>

* clean up test, add simple test

Signed-off-by: Aaron Sutula <[email protected]>

* remove skip

Signed-off-by: Aaron Sutula <[email protected]>

* bug fix in logstore!

Signed-off-by: Aaron Sutula <[email protected]>

* cleaning up

Signed-off-by: Aaron Sutula <[email protected]>

* move to integrationtests

Signed-off-by: Aaron Sutula <[email protected]>

* remove unused method

Signed-off-by: Aaron Sutula <[email protected]>

* little cleanup

Signed-off-by: Aaron Sutula <[email protected]>

* remove unused code

Signed-off-by: Aaron Sutula <[email protected]>

* oops more merge changes

Signed-off-by: Aaron Sutula <[email protected]>

* Updating to new apis, cleaing up options

Signed-off-by: Aaron Sutula <[email protected]>

* bring in sanders changes to fix thread sync bug

Signed-off-by: Aaron Sutula <[email protected]>

* Android updates

Signed-off-by: Aaron Sutula <[email protected]>

* cleaned up net layer and getting thread addrs

Signed-off-by: Aaron Sutula <[email protected]>

* Nicer return types from client.GetInviteInfo

Signed-off-by: Aaron Sutula <[email protected]>

* mod tidy

Signed-off-by: Aaron Sutula <[email protected]>

* GetDBInfo naming

Signed-off-by: Aaron Sutula <[email protected]>

* use free local port for net host address in all tests

Signed-off-by: Aaron Sutula <[email protected]>

* non parallel foldersync tests

Signed-off-by: Aaron Sutula <[email protected]>

* disable 5 peer tests

Signed-off-by: Aaron Sutula <[email protected]>

* try one 3 peer test

Signed-off-by: Aaron Sutula <[email protected]>

* max 3 peers in all tests

Signed-off-by: Aaron Sutula <[email protected]>
  • Loading branch information
asutula committed Apr 3, 2020
1 parent 4e03b9a commit e501a76
Show file tree
Hide file tree
Showing 29 changed files with 1,274 additions and 354 deletions.
20 changes: 18 additions & 2 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,31 @@ func collectionConfigToPb(c db.CollectionConfig) (*pb.CollectionConfig, error) {
}

// GetDBInfo retrives db addresses and keys.
func (c *Client) GetDBInfo(ctx context.Context, dbID thread.ID, opts ...db.ManagedDBOption) (*pb.GetDBInfoReply, error) {
func (c *Client) GetDBInfo(ctx context.Context, dbID thread.ID, opts ...db.ManagedDBOption) ([]ma.Multiaddr, thread.Key, error) {
args := &db.ManagedDBOptions{}
for _, opt := range opts {
opt(args)
}
ctx = thread.NewTokenContext(ctx, args.Token)
return c.c.GetDBInfo(ctx, &pb.GetDBInfoRequest{
res, err := c.c.GetDBInfo(ctx, &pb.GetDBInfoRequest{
DbID: dbID.Bytes(),
})
if err != nil {
return nil, thread.Key{}, err
}
addrs := make([]ma.Multiaddr, len(res.Addrs))
for i, bytes := range res.Addrs {
addr, err := ma.NewMultiaddrBytes(bytes)
if err != nil {
return nil, thread.Key{}, err
}
addrs[i] = addr
}
key, err := thread.KeyFromBytes(res.Key)
if err != nil {
return nil, thread.Key{}, err
}
return addrs, key, nil
}

// NewCollection creates a new collection.
Expand Down
18 changes: 7 additions & 11 deletions api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,11 @@ func TestNewDBFromAddr(t *testing.T) {
id := thread.NewIDV1(thread.Raw, 32)
err := client1.NewDB(context.Background(), id)
checkErr(t, err)
info, err := client1.GetDBInfo(context.Background(), id)
addrs, key, err := client1.GetDBInfo(context.Background(), id)
checkErr(t, err)

t.Run("test new db from address", func(t *testing.T) {
addr, err := ma.NewMultiaddrBytes(info.Addrs[0])
checkErr(t, err)
key, err := thread.KeyFromBytes(info.Key)
checkErr(t, err)
if err = client2.NewDBFromAddr(context.Background(), addr, key); err != nil {
if err = client2.NewDBFromAddr(context.Background(), addrs[0], key); err != nil {
t.Fatalf("failed to create new db from address: %v", err)
}
})
Expand Down Expand Up @@ -127,14 +123,14 @@ func TestGetDBInfo(t *testing.T) {
err := client.NewDB(context.Background(), id)
checkErr(t, err)

info, err := client.GetDBInfo(context.Background(), id)
addrs, key, err := client.GetDBInfo(context.Background(), id)
if err != nil {
t.Fatalf("failed to create collection: %v", err)
}
if info.Key == nil {
t.Fatal("got nil db key")
if !key.Defined() {
t.Fatal("got undefined db key")
}
if len(info.Addrs) == 0 {
if len(addrs) == 0 {
t.Fatal("got empty addresses")
}
})
Expand Down Expand Up @@ -603,7 +599,7 @@ func makeServer(t *testing.T) (ma.Multiaddr, func()) {
if err != nil {
t.Fatal(err)
}
n, err := common.DefaultNetwork(dir, common.WithNetDebug(true))
n, err := common.DefaultNetwork(dir, common.WithNetDebug(true), common.WithNetHostAddr(util.FreeLocalAddr()))
if err != nil {
t.Fatal(err)
}
Expand Down
20 changes: 11 additions & 9 deletions api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/textileio/go-threads/core/app"
core "github.com/textileio/go-threads/core/db"
lstore "github.com/textileio/go-threads/core/logstore"
"github.com/textileio/go-threads/core/net"
"github.com/textileio/go-threads/core/thread"
"github.com/textileio/go-threads/db"
"github.com/textileio/go-threads/util"
Expand Down Expand Up @@ -50,7 +49,7 @@ func NewService(network app.Net, conf Config) (*Service, error) {
}
}

manager, err := db.NewManager(network, db.WithRepoPath(conf.RepoPath), db.WithDebug(conf.Debug))
manager, err := db.NewManager(network, db.WithNewDBRepoPath(conf.RepoPath), db.WithNewDBDebug(conf.Debug))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -224,21 +223,24 @@ func (s *Service) GetDBInfo(ctx context.Context, req *pb.GetDBInfoRequest) (*pb.
if err != nil {
return nil, err
}
tinfo, err := s.manager.Net().GetThread(ctx, id, net.WithThreadToken(token))

d, err := s.getDB(ctx, id, token)
if err != nil {
return nil, err
}
host := s.manager.Net().Host()
peerID, _ := ma.NewComponent("p2p", host.ID().String())
threadID, _ := ma.NewComponent("thread", tinfo.ID.String())
addrs := host.Addrs()

addrs, key, err := d.GetDBInfo(db.WithInviteInfoToken(token))
if err != nil {
return nil, err
}

res := make([][]byte, len(addrs))
for i := range addrs {
res[i] = addrs[i].Encapsulate(peerID).Encapsulate(threadID).Bytes()
res[i] = addrs[i].Bytes()
}
reply := &pb.GetDBInfoReply{
Addrs: res,
Key: tinfo.Key.Bytes(),
Key: key.Bytes(),
}
return reply, nil
}
Expand Down
7 changes: 4 additions & 3 deletions core/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ type Net interface {

// Connector connects an app to a thread.
type Connector struct {
Net Net

app App
net Net
threadID thread.ID
threadKey thread.Key
logID peer.ID
Expand All @@ -127,8 +128,8 @@ func NewConnector(app App, net Net, tinfo thread.Info, conn Connection) (*Connec
return nil, fmt.Errorf("own log for thread %s does not exist", tinfo.ID)
}
a := &Connector{
Net: net,
app: app,
net: net,
threadID: tinfo.ID,
threadKey: tinfo.Key,
logID: lg.ID,
Expand Down Expand Up @@ -204,7 +205,7 @@ func (c *Connector) appToThread(wg *sync.WaitGroup) {
return
}
ctx, cancel := context.WithTimeout(context.Background(), addRecordTimeout)
if _, err := c.net.CreateRecord(ctx, c.threadID, event.Node, net.WithThreadToken(event.Token)); err != nil {
if _, err := c.Net.CreateRecord(ctx, c.threadID, event.Node, net.WithThreadToken(event.Token)); err != nil {
log.Fatalf("error writing record: %v", err)
}
cancel()
Expand Down
11 changes: 6 additions & 5 deletions core/thread/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,12 @@ func (s IDSlice) Len() int { return len(s) }
func (s IDSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s IDSlice) Less(i, j int) bool { return s[i].str < s[j].str }

// Info holds thread logs and keys.
// Info holds thread logs, keys and addresses.
type Info struct {
ID ID
Key Key
Logs []LogInfo
ID ID
Key Key
Logs []LogInfo
Addrs []ma.Multiaddr
}

// GetOwnLog returns the first log found with a private key.
Expand All @@ -318,7 +319,7 @@ func (i Info) GetOwnLog() *LogInfo {
return nil
}

// GetLog holds log keys, addresses, and heads.
// LogInfo holds log keys, addresses, and heads.
type LogInfo struct {
ID peer.ID
PubKey crypto.PubKey
Expand Down
6 changes: 3 additions & 3 deletions db/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func checkBenchErr(b *testing.B, err error) {
}
}

func createBenchDB(b *testing.B, opts ...Option) (*DB, func()) {
func createBenchDB(b *testing.B, opts ...NewDBOption) (*DB, func()) {
dir, err := ioutil.TempDir("", "")
checkBenchErr(b, err)
n, err := common.DefaultNetwork(dir)
n, err := common.DefaultNetwork(dir, common.WithNetDebug(true), common.WithNetHostAddr(util.FreeLocalAddr()))
checkBenchErr(b, err)
opts = append(opts, WithRepoPath(dir))
opts = append(opts, WithNewDBRepoPath(dir))
d, err := NewDB(context.Background(), n, thread.NewIDV1(thread.Raw, 32), opts...)
checkBenchErr(b, err)
return d, func() {
Expand Down
68 changes: 41 additions & 27 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ var (
type DB struct {
io.Closer

net app.Net
connector *app.Connector

datastore ds.TxnDatastore
Expand All @@ -68,44 +67,44 @@ type DB struct {

// NewDB creates a new DB, which will *own* ds and dispatcher for internal use.
// Saying it differently, ds and dispatcher shouldn't be used externally.
func NewDB(ctx context.Context, network app.Net, id thread.ID, opts ...Option) (*DB, error) {
config := &Config{}
func NewDB(ctx context.Context, network app.Net, id thread.ID, opts ...NewDBOption) (*DB, error) {
options := &NewDBOptions{}
for _, opt := range opts {
if err := opt(config); err != nil {
if err := opt(options); err != nil {
return nil, err
}
}

if _, err := network.CreateThread(ctx, id, net.WithNewThreadToken(config.Token)); err != nil {
if _, err := network.CreateThread(ctx, id, net.WithNewThreadToken(options.Token)); err != nil {
if !errors.Is(err, lstore.ErrThreadExists) {
return nil, err
}
}
return newDB(network, id, config)
return newDB(network, id, options)
}

// NewDBFromAddr creates a new DB from a thread hosted by another peer at address,
// which will *own* ds and dispatcher for internal use.
// Saying it differently, ds and dispatcher shouldn't be used externally.
func NewDBFromAddr(ctx context.Context, network app.Net, addr ma.Multiaddr, key thread.Key, opts ...Option) (*DB, error) {
config := &Config{}
func NewDBFromAddr(ctx context.Context, network app.Net, addr ma.Multiaddr, key thread.Key, opts ...NewDBOption) (*DB, error) {
options := &NewDBOptions{}
for _, opt := range opts {
if err := opt(config); err != nil {
if err := opt(options); err != nil {
return nil, err
}
}

ti, err := network.AddThread(ctx, addr, net.WithThreadKey(key), net.WithNewThreadToken(config.Token))
ti, err := network.AddThread(ctx, addr, net.WithThreadKey(key), net.WithNewThreadToken(options.Token))
if err != nil {
return nil, err
}
d, err := newDB(network, ti.ID, config)
d, err := newDB(network, ti.ID, options)
if err != nil {
return nil, err
}

go func() {
if err := network.PullThread(ctx, ti.ID, net.WithThreadToken(config.Token)); err != nil {
if err := network.PullThread(ctx, ti.ID, net.WithThreadToken(options.Token)); err != nil {
log.Errorf("error pulling thread %s", ti.ID)
}
}()
Expand All @@ -114,19 +113,19 @@ func NewDBFromAddr(ctx context.Context, network app.Net, addr ma.Multiaddr, key

// newDB is used directly by a db manager to create new dbs
// with the same config.
func newDB(n app.Net, id thread.ID, config *Config) (*DB, error) {
if config.Datastore == nil {
datastore, err := newDefaultDatastore(config.RepoPath, config.LowMem)
func newDB(n app.Net, id thread.ID, options *NewDBOptions) (*DB, error) {
if options.Datastore == nil {
datastore, err := newDefaultDatastore(options.RepoPath, options.LowMem)
if err != nil {
return nil, err
}
config.Datastore = datastore
options.Datastore = datastore
}
if config.EventCodec == nil {
config.EventCodec = newDefaultEventCodec()
if options.EventCodec == nil {
options.EventCodec = newDefaultEventCodec()
}
if !managedDatastore(config.Datastore) {
if config.Debug {
if !managedDatastore(options.Datastore) {
if options.Debug {
if err := util.SetLogLevels(map[string]logging.LogLevel{
"db": logging.LevelDebug,
}); err != nil {
Expand All @@ -136,9 +135,9 @@ func newDB(n app.Net, id thread.ID, config *Config) (*DB, error) {
}

d := &DB{
datastore: config.Datastore,
dispatcher: newDispatcher(config.Datastore),
eventcodec: config.EventCodec,
datastore: options.Datastore,
dispatcher: newDispatcher(options.Datastore),
eventcodec: options.EventCodec,
collectionNames: make(map[string]*Collection),
localEventsBus: app.NewLocalEventsBus(),
stateChangedNotifee: &stateChangedNotifee{},
Expand All @@ -148,7 +147,7 @@ func newDB(n app.Net, id thread.ID, config *Config) (*DB, error) {
}
d.dispatcher.Register(d)

for _, cc := range config.Collections {
for _, cc := range options.Collections {
if _, err := d.NewCollection(cc); err != nil {
return nil, err
}
Expand Down Expand Up @@ -289,10 +288,25 @@ func (d *DB) Reduce(events []core.Event) error {
return nil
}

// GetDBInfo returns the addresses and key that can be used to join the DB thread
func (d *DB) GetDBInfo(opts ...InviteInfoOption) ([]ma.Multiaddr, thread.Key, error) {
options := &InviteInfoOptions{}
for _, opt := range opts {
opt(options)
}

tinfo, err := d.connector.Net.GetThread(context.Background(), d.connector.ThreadID(), net.WithThreadToken(options.Token))
if err != nil {
return nil, thread.Key{}, err
}
return tinfo.Addrs, tinfo.Key, nil
}

// Close closes the db.
func (d *DB) Close() error {
d.lock.Lock()
defer d.lock.Unlock()

if d.closed {
return nil
}
Expand All @@ -319,7 +333,7 @@ func (d *DB) HandleNetRecord(rec net.ThreadRecord, key thread.Key, lid peer.ID,
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
event, err := threadcbor.EventFromRecord(ctx, d.net, rec.Value())
event, err := threadcbor.EventFromRecord(ctx, d.connector.Net, rec.Value())
if err != nil {
block, err := d.getBlockWithRetry(ctx, rec.Value())
if err != nil {
Expand All @@ -330,7 +344,7 @@ func (d *DB) HandleNetRecord(rec net.ThreadRecord, key thread.Key, lid peer.ID,
return fmt.Errorf("error when decoding block to event: %v", err)
}
}
node, err := event.GetBody(ctx, d.net, key.Read())
node, err := event.GetBody(ctx, d.connector.Net, key.Read())
if err != nil {
return fmt.Errorf("error when getting body of event on thread %s/%s: %v", d.connector.ThreadID(), rec.LogID(), err)
}
Expand All @@ -347,7 +361,7 @@ func (d *DB) getBlockWithRetry(ctx context.Context, rec net.Record) (format.Node
backoff := getBlockInitialTimeout
var err error
for i := 1; i <= getBlockRetries; i++ {
n, err := rec.GetBlock(ctx, d.net)
n, err := rec.GetBlock(ctx, d.connector.Net)
if err == nil {
return n, nil
}
Expand Down
Loading

0 comments on commit e501a76

Please sign in to comment.