From 8b52fe0bb0d904e7ac0c87e91d180fea2147690d Mon Sep 17 00:00:00 2001 From: jsign Date: Tue, 26 Nov 2019 16:55:39 -0300 Subject: [PATCH 1/7] threads: fetch all unknown records Signed-off-by: jsign --- cbor/event.go | 6 +- cbor/record.go | 7 ++- client.go | 2 +- eventstore/storethread.go | 9 ++- service.go | 13 ++++- threads.go | 119 ++++++++++++++++++++++++-------------- 6 files changed, 105 insertions(+), 51 deletions(-) diff --git a/cbor/event.go b/cbor/event.go index c4497bef..5bc6f3e2 100644 --- a/cbor/event.go +++ b/cbor/event.go @@ -7,7 +7,7 @@ import ( "github.com/ipfs/go-cid" cbornode "github.com/ipfs/go-ipld-cbor" - "github.com/ipfs/go-ipld-format" + format "github.com/ipfs/go-ipld-format" mh "github.com/multiformats/go-multihash" "github.com/textileio/go-textile-core/crypto" "github.com/textileio/go-textile-core/crypto/symmetric" @@ -91,7 +91,7 @@ func GetEvent(ctx context.Context, dag format.DAGService, id cid.Cid) (thread.Ev } // EventFromNode decodes the given node into an event. -func EventFromNode(node format.Node) (thread.Event, error) { +func EventFromNode(node format.Node) (*Event, error) { obj := new(event) err := cbornode.DecodeInto(node.RawData(), obj) if err != nil { @@ -104,7 +104,7 @@ func EventFromNode(node format.Node) (thread.Event, error) { } // EventFromRecord returns the event within the given node. -func EventFromRecord(ctx context.Context, dag format.DAGService, rec thread.Record) (thread.Event, error) { +func EventFromRecord(ctx context.Context, dag format.DAGService, rec thread.Record) (*Event, error) { block, err := rec.GetBlock(ctx, dag) if err != nil { return nil, err diff --git a/cbor/record.go b/cbor/record.go index facb4b9b..6eb1fead 100644 --- a/cbor/record.go +++ b/cbor/record.go @@ -5,8 +5,8 @@ import ( "fmt" "github.com/ipfs/go-cid" - "github.com/ipfs/go-ipld-cbor" - "github.com/ipfs/go-ipld-format" + cbornode "github.com/ipfs/go-ipld-cbor" + format "github.com/ipfs/go-ipld-format" ic "github.com/libp2p/go-libp2p-core/crypto" mh "github.com/multiformats/go-multihash" "github.com/textileio/go-textile-core/crypto" @@ -101,14 +101,17 @@ func RecordToProto(ctx context.Context, dag format.DAGService, rec thread.Record if err != nil { return nil, err } + // (jsign): shouldn't we check if block.(*cbor.Event) to avoid EventFromNode()? event, err := EventFromNode(block) if err != nil { return nil, err } + // (jsign): same here? header, err := event.GetHeader(ctx, dag, nil) if err != nil { return nil, err } + // same here? body, err := event.GetBody(ctx, dag, nil) if err != nil { return nil, err diff --git a/client.go b/client.go index 83973987..703e187c 100644 --- a/client.go +++ b/client.go @@ -22,7 +22,7 @@ import ( const ( // reqTimeout is the duration to wait for a request to complete. - reqTimeout = time.Second * 5 + reqTimeout = time.Second * 10 ) // getLogs in a thread. diff --git a/eventstore/storethread.go b/eventstore/storethread.go index c1abd30f..2cf48c0b 100644 --- a/eventstore/storethread.go +++ b/eventstore/storethread.go @@ -85,7 +85,14 @@ func (a *singleThreadAdapter) threadToStore(wg *sync.WaitGroup) { event, err := threadcbor.EventFromRecord(ctx, a.api, rec.Value()) if err != nil { - log.Fatalf("error when getting event from record: %v", err) // ToDo: Buffer them and retry... + block, err := rec.Value().GetBlock(ctx, a.api) + if err != nil { // ToDo: Buffer them and retry... + log.Fatalf("error when getting block from record: %v", err) + } + event, err = threadcbor.EventFromNode(block) + if err != nil { + log.Fatalf("error when decoding block to event: %v", err) + } } readKey, err := a.api.Store().ReadKey(a.threadID) diff --git a/service.go b/service.go index fe8b0b4e..cc37f6d3 100644 --- a/service.go +++ b/service.go @@ -118,11 +118,19 @@ func (s *service) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.Push } lg := logFromProto(req.Log) - if err := s.threads.store.AddLog(req.ThreadID.ID, lg); err != nil { + head, err := s.threads.store.Heads(req.ThreadID.ID, lg.ID) + if err != nil { return nil, status.Error(codes.Internal, err.Error()) } + if head == nil { + if err := s.threads.store.AddLog(req.ThreadID.ID, lg); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + } go func() { + s.threads.pullLock.Lock() + defer s.threads.pullLock.Unlock() // Get log records for this new log recs, err := s.getRecords( s.threads.ctx, @@ -222,6 +230,9 @@ func (s *service) GetRecords(ctx context.Context, req *pb.GetRecordsRequest) (*p // PushRecord receives a push record request. func (s *service) PushRecord(ctx context.Context, req *pb.PushRecordRequest) (*pb.PushRecordReply, error) { + // ToDo: fix concurrency + s.threads.pullLock.Lock() + defer s.threads.pullLock.Unlock() if req.Header == nil { return nil, status.Error(codes.FailedPrecondition, "request header is required") } diff --git a/threads.go b/threads.go index a9af4c88..ff0d5ebd 100644 --- a/threads.go +++ b/threads.go @@ -187,6 +187,14 @@ func (t *threads) Close() (err error) { if len(errs) > 0 { return fmt.Errorf("failed while closing threads; err(s): %q", errs) } + + t.pullLock.Lock() + defer t.pullLock.Unlock() + // Wait for all thread pulls to finish + for _, semaph := range t.pullLocks { + semaph <- struct{}{} + } + return nil } @@ -271,22 +279,24 @@ func (t *threads) AddThread( return t.store.ThreadInfo(id) } -// PullThread for new records. -// Logs owned by this host are traversed locally. -// Remotely addressed logs are pulled from the network. -func (t *threads) PullThread(ctx context.Context, id thread.ID) error { - log.Debugf("pulling thread %s...", id.String()) +func (t *threads) getThreadSemaphore(id thread.ID) chan struct{} { var ptl chan struct{} var ok bool - // ToDo: fix concurrency t.pullLock.Lock() defer t.pullLock.Unlock() if ptl, ok = t.pullLocks[id]; !ok { ptl = make(chan struct{}, 1) t.pullLocks[id] = ptl } - // t.pullLock.Unlock() + return ptl +} +// PullThread for new records. +// Logs owned by this host are traversed locally. +// Remotely addressed logs are pulled from the network. +func (t *threads) PullThread(ctx context.Context, id thread.ID) error { + log.Debugf("pulling thread %s...", id.String()) + ptl := t.getThreadSemaphore(id) select { case ptl <- struct{}{}: lgs, err := t.getLogs(id) @@ -310,6 +320,7 @@ func (t *threads) PullThread(ctx context.Context, id thread.ID) error { } wg := sync.WaitGroup{} for _, lg := range lgs { + // (jsign): after fist iteration, offsets might not be correct anymore wg.Add(1) // ToDo: fix concurrency func(lg thread.LogInfo) { @@ -541,11 +552,24 @@ func (t *threads) Subscribe(opts ...options.SubOption) tserv.Subscription { // putRecord adds an existing record. See PutOption for more. func (t *threads) putRecord(ctx context.Context, id thread.ID, lid peer.ID, rec thread.Record) error { - knownRecord, err := t.bstore.Has(rec.Cid()) - if err != nil { - return err + unknownRecords := []thread.Record{} + cid := rec.Cid() + for cid.Defined() { + exist, err := t.bstore.Has(cid) + if err != nil { + return err + } + if exist { + break + } + r, err := t.GetRecord(ctx, id, cid) + if err != nil { + return err + } + unknownRecords = append(unknownRecords, r) + cid = r.PrevID() } - if knownRecord { + if len(unknownRecords) == 0 { return nil } @@ -555,42 +579,51 @@ func (t *threads) putRecord(ctx context.Context, id thread.ID, lid peer.ID, rec return err } - // Save the record locally - // Note: These get methods will return cached nodes. - block, err := rec.GetBlock(ctx, t) - if err != nil { - return err - } - event, ok := block.(*cbor.Event) - if !ok { - return fmt.Errorf("invalid event") - } - header, err := event.GetHeader(ctx, t, nil) - if err != nil { - return err - } - body, err := event.GetBody(ctx, t, nil) - if err != nil { - return err - } - err = t.AddMany(ctx, []format.Node{rec, event, header, body}) - if err != nil { - return err - } + for i := len(unknownRecords) - 1; i >= 0; i-- { + r := unknownRecords[i] + // Save the record locally + // Note: These get methods will return cached nodes. + block, err := r.GetBlock(ctx, t) + if err != nil { + return err + } + event, ok := block.(*cbor.Event) + if !ok { + event, err = cbor.EventFromNode(block) + if err != nil { + return fmt.Errorf("invalid event: %v", err) + } + } + header, err := event.GetHeader(ctx, t, nil) + if err != nil { + return err + } + body, err := event.GetBody(ctx, t, nil) + if err != nil { + return err + } + err = t.AddMany(ctx, []format.Node{r, event, header, body}) + if err != nil { + return err + } + + log.Debugf("put record %s (thread=%s, log=%s)", r.Cid().String(), id, lg.ID) + // Notify local listeners + err = t.bus.SendWithTimeout(&record{ + Record: r, + threadID: id, + logID: lg.ID, + }, time.Second) + if err != nil { + return err + } + } // Update head - if err = t.store.SetHead(id, lg.ID, rec.Cid()); err != nil { + if err = t.store.SetHead(id, lg.ID, unknownRecords[0].Cid()); err != nil { return err } - - log.Debugf("put record %s (thread=%s, log=%s)", rec.Cid().String(), id, lg.ID) - - // Notify local listeners - return t.bus.SendWithTimeout(&record{ - Record: rec, - threadID: id, - logID: lg.ID, - }, time.Second) + return nil } // getPrivKey returns the host's private key. From 79aab92d01214b47039c1d2a233ecfbdeac8f438 Mon Sep 17 00:00:00 2001 From: jsign Date: Tue, 26 Nov 2019 17:04:12 -0300 Subject: [PATCH 2/7] del comments Signed-off-by: jsign --- cbor/record.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cbor/record.go b/cbor/record.go index 6eb1fead..9ee1d3c3 100644 --- a/cbor/record.go +++ b/cbor/record.go @@ -101,17 +101,17 @@ func RecordToProto(ctx context.Context, dag format.DAGService, rec thread.Record if err != nil { return nil, err } - // (jsign): shouldn't we check if block.(*cbor.Event) to avoid EventFromNode()? + event, err := EventFromNode(block) if err != nil { return nil, err } - // (jsign): same here? + header, err := event.GetHeader(ctx, dag, nil) if err != nil { return nil, err } - // same here? + body, err := event.GetBody(ctx, dag, nil) if err != nil { return nil, err From 2cfc705f23c4864447d27c3cdfe7cf82a1ce4541 Mon Sep 17 00:00:00 2001 From: jsign Date: Tue, 26 Nov 2019 17:08:16 -0300 Subject: [PATCH 3/7] fix Signed-off-by: jsign --- cbor/record.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cbor/record.go b/cbor/record.go index 9ee1d3c3..96180a6a 100644 --- a/cbor/record.go +++ b/cbor/record.go @@ -101,17 +101,17 @@ func RecordToProto(ctx context.Context, dag format.DAGService, rec thread.Record if err != nil { return nil, err } - - event, err := EventFromNode(block) + event, ok := block.(*Event) + if !ok { + event, err = EventFromNode(block) if err != nil { return nil, err } - +} header, err := event.GetHeader(ctx, dag, nil) if err != nil { return nil, err } - body, err := event.GetBody(ctx, dag, nil) if err != nil { return nil, err From 0668603802d9c9fa81dafb5e23a70e83c3086a51 Mon Sep 17 00:00:00 2001 From: jsign Date: Tue, 26 Nov 2019 17:19:14 -0300 Subject: [PATCH 4/7] fix Signed-off-by: jsign --- cbor/record.go | 8 ++++---- service.go | 5 ++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/cbor/record.go b/cbor/record.go index 96180a6a..1f100b11 100644 --- a/cbor/record.go +++ b/cbor/record.go @@ -103,11 +103,11 @@ func RecordToProto(ctx context.Context, dag format.DAGService, rec thread.Record } event, ok := block.(*Event) if !ok { - event, err = EventFromNode(block) - if err != nil { - return nil, err + event, err = EventFromNode(block) + if err != nil { + return nil, err + } } -} header, err := event.GetHeader(ctx, dag, nil) if err != nil { return nil, err diff --git a/service.go b/service.go index cc37f6d3..52b78106 100644 --- a/service.go +++ b/service.go @@ -122,15 +122,14 @@ func (s *service) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.Push if err != nil { return nil, status.Error(codes.Internal, err.Error()) } + // Fix concurrency: adding head without having the block guarantee if head == nil { if err := s.threads.store.AddLog(req.ThreadID.ID, lg); err != nil { return nil, status.Error(codes.Internal, err.Error()) } } - go func() { - s.threads.pullLock.Lock() - defer s.threads.pullLock.Unlock() + func() { // Get log records for this new log recs, err := s.getRecords( s.threads.ctx, From b08aabae881e85195750f930532a45d3f70d2c3c Mon Sep 17 00:00:00 2001 From: jsign Date: Tue, 26 Nov 2019 17:22:24 -0300 Subject: [PATCH 5/7] fix Signed-off-by: jsign --- service.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/service.go b/service.go index 52b78106..3e1b95c5 100644 --- a/service.go +++ b/service.go @@ -118,15 +118,10 @@ func (s *service) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.Push } lg := logFromProto(req.Log) - head, err := s.threads.store.Heads(req.ThreadID.ID, lg.ID) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + // Fix concurrency: adding head without having the block guarantee - if head == nil { - if err := s.threads.store.AddLog(req.ThreadID.ID, lg); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + if err := s.threads.store.AddLog(req.ThreadID.ID, lg); err != nil { + return nil, status.Error(codes.Internal, err.Error()) } func() { From ca251ef4d4918f4035cb28c8b41d46875629d091 Mon Sep 17 00:00:00 2001 From: jsign Date: Tue, 26 Nov 2019 17:38:05 -0300 Subject: [PATCH 6/7] fix Signed-off-by: jsign --- threads.go | 1 - 1 file changed, 1 deletion(-) diff --git a/threads.go b/threads.go index ff0d5ebd..0e7afcdc 100644 --- a/threads.go +++ b/threads.go @@ -320,7 +320,6 @@ func (t *threads) PullThread(ctx context.Context, id thread.ID) error { } wg := sync.WaitGroup{} for _, lg := range lgs { - // (jsign): after fist iteration, offsets might not be correct anymore wg.Add(1) // ToDo: fix concurrency func(lg thread.LogInfo) { From 3eb1af9cd953229c64d85cf84a854e5bb2cf9ed8 Mon Sep 17 00:00:00 2001 From: jsign Date: Tue, 26 Nov 2019 20:59:29 -0300 Subject: [PATCH 7/7] fix Signed-off-by: jsign --- threads.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/threads.go b/threads.go index 0e7afcdc..1213cd72 100644 --- a/threads.go +++ b/threads.go @@ -297,6 +297,9 @@ func (t *threads) getThreadSemaphore(id thread.ID) chan struct{} { func (t *threads) PullThread(ctx context.Context, id thread.ID) error { log.Debugf("pulling thread %s...", id.String()) ptl := t.getThreadSemaphore(id) + // Fix concurrency, temporary global lock + t.pullLock.Lock() + defer t.pullLock.Unlock() select { case ptl <- struct{}{}: lgs, err := t.getLogs(id)