Skip to content

Commit

Permalink
service: log exchange fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Sander Pick <[email protected]>
  • Loading branch information
sanderpick committed Oct 26, 2019
1 parent b7c6655 commit ad52904
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 35 deletions.
2 changes: 0 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,6 @@ func (s *service) pushRecord(
log.Error(err)
return
}

log.Debugf("received reply from %s", p)
}(addr)
}

Expand Down
12 changes: 12 additions & 0 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func newService(t *threads) (*service, error) {
// GetLogs receives a get logs request.
// @todo: Verification, authentication
func (s *service) GetLogs(ctx context.Context, req *pb.GetLogsRequest) (*pb.GetLogsReply, error) {
if req.Header == nil {
return nil, status.Error(codes.FailedPrecondition, "request header is required")
}
log.Debugf("received get logs request from %s", req.Header.From.ID.String())

lgs, err := s.threads.getLogs(req.ThreadID.ID)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
Expand All @@ -78,6 +83,7 @@ func (s *service) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.Push
if req.Header == nil {
return nil, status.Error(codes.FailedPrecondition, "request header is required")
}
log.Debugf("received push log request from %s", req.Header.From.ID.String())

lg := logFromProto(req.Log)
if err := s.threads.store.AddLog(req.ThreadID.ID, lg); err != nil {
Expand Down Expand Up @@ -117,6 +123,11 @@ func (s *service) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.Push
// GetRecords receives a get records request.
// @todo: Verification, authentication
func (s *service) GetRecords(ctx context.Context, req *pb.GetRecordsRequest) (*pb.GetRecordsReply, error) {
if req.Header == nil {
return nil, status.Error(codes.FailedPrecondition, "request header is required")
}
log.Debugf("received get records request from %s", req.Header.From.ID.String())

reqd := make(map[peer.ID]*pb.GetRecordsRequest_LogEntry)
for _, l := range req.Logs {
reqd[l.LogID.ID] = l
Expand Down Expand Up @@ -177,6 +188,7 @@ func (s *service) PushRecord(ctx context.Context, req *pb.PushRecordRequest) (*p
if req.Header == nil {
return nil, status.Error(codes.FailedPrecondition, "request header is required")
}
log.Debugf("received push record request from %s", req.Header.From.ID.String())

// Verify the request
reqpk, err := requestPubKey(req)
Expand Down
67 changes: 34 additions & 33 deletions threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,50 +186,51 @@ func (t *threads) AddThread(ctx context.Context, addr ma.Multiaddr) (info thread
// Logs owned by this host are traversed locally.
// Remotely addressed logs are pulled from the network.
func (t *threads) PullThread(ctx context.Context, id thread.ID) error {
wg := sync.WaitGroup{}
log.Debugf("pulling thread %s", id.String())

lgs, err := t.getLogs(id)
if err != nil {
return err
}

// Gather offsets for each log
offsets := make(map[peer.ID]cid.Cid)
for _, lg := range lgs {
offsets[lg.ID] = cid.Undef
if len(lg.Heads) > 0 {
has, err := t.bstore.Has(lg.Heads[0])
if err != nil {
return err
}
if has {
offsets[lg.ID] = lg.Heads[0]
}
}
}

wg := sync.WaitGroup{}
for _, lg := range lgs {
wg.Add(1)
go func(lg thread.LogInfo) {
defer wg.Done()
var offset cid.Cid
if len(lg.Heads) > 0 {
has, err := t.bstore.Has(lg.Heads[0])
if err != nil {
log.Error(err)
return
}
if has {
offset = lg.Heads[0]
}
}
recs := make(map[peer.ID][]thread.Record)
if lg.PrivKey != nil { // This is our own log
rs, err := t.getLocalRecords(ctx, id, lg.ID, offset, MaxPullLimit)
if err != nil {
log.Error(err)
return
}
recs[lg.ID] = rs
} else {
// Pull from addresses
recs, err = t.service.getRecords(
ctx,
id,
lg.ID,
map[peer.ID]cid.Cid{lg.ID: offset},
MaxPullLimit)
if err != nil {
log.Error(err)
return
}
// Pull from addresses
recs, err := t.service.getRecords(
ctx,
id,
lg.ID,
offsets,
MaxPullLimit)
if err != nil {
log.Error(err)
return
}
for lid, rs := range recs {
for _, r := range rs {
err = t.putRecord(ctx, r, tserv.PutOpt.ThreadID(id), tserv.PutOpt.LogID(lid))
err = t.putRecord(
ctx,
r,
tserv.PutOpt.ThreadID(id),
tserv.PutOpt.LogID(lid))
if err != nil {
log.Error(err)
return
Expand Down

0 comments on commit ad52904

Please sign in to comment.