Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't wait while pushing #310

Merged
merged 1 commit into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions integrationtests/foldersync/foldersync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

func TestMain(m *testing.M) {
logging.SetLogLevel("foldersync", "info")
_ = logging.SetLogLevel("foldersync", "info")
// logging.SetLogLevel("store", "debug")
// logging.SetLogLevel("threads", "debug")
// logging.SetLogLevel("threadstore", "debug")
Expand Down Expand Up @@ -162,7 +162,6 @@ func TestNUsersBootstrap(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(fmt.Sprintf("Total%dCore%d", tt.totalClients, tt.totalCorePeers), func(t *testing.T) {
// t.Parallel()
var clients []*client

client0, clean0 := createRootClient(t, fmt.Sprintf("client%d", 0))
Expand Down
1 change: 1 addition & 0 deletions net/api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ func TestClient_Subscribe(t *testing.T) {
if _, err = client1.CreateRecord(context.Background(), info.ID, body2); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)

lock.Lock()
if rcount != 2 {
Expand Down
7 changes: 1 addition & 6 deletions net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,8 @@ func (s *server) pushRecord(ctx context.Context, id thread.ID, lid peer.ID, rec
}

// Push to each address
wg := sync.WaitGroup{}
for _, addr := range addrs {
wg.Add(1)
go func(addr ma.Multiaddr) {
defer wg.Done()
p, err := addr.ValueForProtocol(ma.P_P2P)
if err != nil {
log.Error(err)
Expand All @@ -337,7 +334,7 @@ func (s *server) pushRecord(ctx context.Context, id thread.ID, lid peer.ID, rec

log.Debugf("pushing record to %s...", p)

cctx, cancel := context.WithTimeout(ctx, reqTimeout)
cctx, cancel := context.WithTimeout(context.Background(), reqTimeout)
defer cancel()
conn, err := s.dial(cctx, pid, grpc.WithInsecure())
if err != nil {
Expand Down Expand Up @@ -386,8 +383,6 @@ func (s *server) pushRecord(ctx context.Context, id thread.ID, lid peer.ID, rec
if err = s.ps.Publish(ctx, id, req); err != nil {
log.Errorf("error publishing record: %s", err)
}

wg.Wait()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember why we waited here... probably out of an abundance of caution while developing.

return nil
}

Expand Down
9 changes: 6 additions & 3 deletions net/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,10 @@ func (n *net) AddRecord(ctx context.Context, id thread.ID, lid peer.ID, rec core
if err = rec.Verify(logpk); err != nil {
return err
}
return n.PutRecord(ctx, id, lid, rec)
if err = n.PutRecord(ctx, id, lid, rec); err != nil {
return err
}
return n.server.pushRecord(ctx, id, lid, rec)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely this must have been a bug... why not push out records from AddRecord as well as CreateRecord?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Eventually, this can be different if we plan to batch pushes or similar, but not the case now.
SGTM.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice yeah, good call

}

func (n *net) GetRecord(ctx context.Context, id thread.ID, rid cid.Cid, opts ...core.ThreadOption) (core.Record, error) {
Expand Down Expand Up @@ -819,10 +822,10 @@ func (n *net) putRecord(ctx context.Context, id thread.ID, lid peer.ID, rec core

log.Debugf("put record %s (thread=%s, log=%s)", r.Cid(), id, lg.ID)

if err = n.bus.SendWithTimeout(NewRecord(r, id, lg.ID), notifyTimeout); err != nil {
if err = n.store.SetHead(id, lg.ID, r.Cid()); err != nil {
return err
}
if err = n.store.SetHead(id, lg.ID, r.Cid()); err != nil {
if err = n.bus.SendWithTimeout(NewRecord(r, id, lg.ID), notifyTimeout); err != nil {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just flipping the order here. Seems safer to set the head before broadcasting.

return err
}
}
Expand Down
2 changes: 2 additions & 0 deletions net/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
rand "crypto/rand"
"testing"
"time"

bserv "github.com/ipfs/go-blockservice"
ds "github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -159,6 +160,7 @@ func TestNet_AddThread(t *testing.T) {
if _, err = n2.CreateRecord(ctx, info2.ID, body2); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)

info3, err := n1.GetThread(context.Background(), info.ID)
if err != nil {
Expand Down