diff --git a/client.go b/client.go index 8f0dae36..66b84838 100644 --- a/client.go +++ b/client.go @@ -353,8 +353,6 @@ func (s *service) pushRecord( log.Error(err) return } - - log.Debugf("received reply from %s", p) }(addr) } diff --git a/service.go b/service.go index b23e62e1..e2ece8b4 100644 --- a/service.go +++ b/service.go @@ -57,6 +57,11 @@ func newService(t *threads) (*service, error) { // GetLogs receives a get logs request. // @todo: Verification, authentication func (s *service) GetLogs(ctx context.Context, req *pb.GetLogsRequest) (*pb.GetLogsReply, error) { + if req.Header == nil { + return nil, status.Error(codes.FailedPrecondition, "request header is required") + } + log.Debugf("received get logs request from %s", req.Header.From.ID.String()) + lgs, err := s.threads.getLogs(req.ThreadID.ID) if err != nil { return nil, status.Error(codes.Internal, err.Error()) @@ -78,6 +83,7 @@ func (s *service) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.Push if req.Header == nil { return nil, status.Error(codes.FailedPrecondition, "request header is required") } + log.Debugf("received push log request from %s", req.Header.From.ID.String()) lg := logFromProto(req.Log) if err := s.threads.store.AddLog(req.ThreadID.ID, lg); err != nil { @@ -117,6 +123,11 @@ func (s *service) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.Push // GetRecords receives a get records request. // @todo: Verification, authentication func (s *service) GetRecords(ctx context.Context, req *pb.GetRecordsRequest) (*pb.GetRecordsReply, error) { + if req.Header == nil { + return nil, status.Error(codes.FailedPrecondition, "request header is required") + } + log.Debugf("received get records request from %s", req.Header.From.ID.String()) + reqd := make(map[peer.ID]*pb.GetRecordsRequest_LogEntry) for _, l := range req.Logs { reqd[l.LogID.ID] = l @@ -177,6 +188,7 @@ func (s *service) PushRecord(ctx context.Context, req *pb.PushRecordRequest) (*p if req.Header == nil { return nil, status.Error(codes.FailedPrecondition, "request header is required") } + log.Debugf("received push record request from %s", req.Header.From.ID.String()) // Verify the request reqpk, err := requestPubKey(req) diff --git a/threads.go b/threads.go index b8df28ba..af5e492d 100644 --- a/threads.go +++ b/threads.go @@ -186,50 +186,51 @@ func (t *threads) AddThread(ctx context.Context, addr ma.Multiaddr) (info thread // Logs owned by this host are traversed locally. // Remotely addressed logs are pulled from the network. func (t *threads) PullThread(ctx context.Context, id thread.ID) error { - wg := sync.WaitGroup{} + log.Debugf("pulling thread %s", id.String()) + lgs, err := t.getLogs(id) if err != nil { return err } + + // Gather offsets for each log + offsets := make(map[peer.ID]cid.Cid) + for _, lg := range lgs { + offsets[lg.ID] = cid.Undef + if len(lg.Heads) > 0 { + has, err := t.bstore.Has(lg.Heads[0]) + if err != nil { + return err + } + if has { + offsets[lg.ID] = lg.Heads[0] + } + } + } + + wg := sync.WaitGroup{} for _, lg := range lgs { wg.Add(1) go func(lg thread.LogInfo) { defer wg.Done() - var offset cid.Cid - if len(lg.Heads) > 0 { - has, err := t.bstore.Has(lg.Heads[0]) - if err != nil { - log.Error(err) - return - } - if has { - offset = lg.Heads[0] - } - } - recs := make(map[peer.ID][]thread.Record) - if lg.PrivKey != nil { // This is our own log - rs, err := t.getLocalRecords(ctx, id, lg.ID, offset, MaxPullLimit) - if err != nil { - log.Error(err) - return - } - recs[lg.ID] = rs - } else { - // Pull from addresses - recs, err = t.service.getRecords( - ctx, - id, - lg.ID, - map[peer.ID]cid.Cid{lg.ID: offset}, - MaxPullLimit) - if err != nil { - log.Error(err) - return - } + // Pull from addresses + recs, err := t.service.getRecords( + ctx, + id, + lg.ID, + offsets, + MaxPullLimit) + if err != nil { + log.Error(err) + return } for lid, rs := range recs { for _, r := range rs { - err = t.putRecord(ctx, r, tserv.PutOpt.ThreadID(id), tserv.PutOpt.LogID(lid)) + err = t.putRecord( + ctx, + r, + tserv.PutOpt.ThreadID(id), + tserv.PutOpt.LogID(lid)) if err != nil { log.Error(err) return