Skip to content

Commit

Permalink
service: add log exchange while pulling records
Browse files Browse the repository at this point in the history
Signed-off-by: Sander Pick <[email protected]>
  • Loading branch information
sanderpick committed Oct 25, 2019
1 parent 37801a2 commit 28707a3
Show file tree
Hide file tree
Showing 9 changed files with 1,686 additions and 964 deletions.
76 changes: 60 additions & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,29 +124,45 @@ func (s *service) getRecords(
ctx context.Context,
id thread.ID,
lid peer.ID,
offset cid.Cid,
offsets map[peer.ID]cid.Cid,
limit int,
) ([]thread.Record, error) {
lg, err := s.threads.store.LogInfo(id, lid)
if err != nil {
return nil, err
) (map[peer.ID][]thread.Record, error) {
lgs := make(map[peer.ID]thread.LogInfo)
var err error
for lid := range offsets {
lgs[lid], err = s.threads.store.LogInfo(id, lid)
if err != nil {
return nil, err
}
}
if lg.PubKey == nil {
return nil, fmt.Errorf("log not found")

pblgs := make([]*pb.GetRecordsRequest_LogEntry, 0)
for _, lg := range lgs {
pblgs = append(pblgs, &pb.GetRecordsRequest_LogEntry{
LogID: &pb.ProtoPeerID{ID: lg.ID},
Offset: &pb.ProtoCid{Cid: offsets[lg.ID]},
Limit: int32(limit),
})
}

req := &pb.GetRecordsRequest{
Header: &pb.GetRecordsRequest_Header{
From: &pb.ProtoPeerID{ID: s.threads.host.ID()},
},
ThreadID: &pb.ProtoThreadID{ID: id},
LogID: &pb.ProtoPeerID{ID: lid},
Offset: &pb.ProtoCid{Cid: offset},
Limit: int32(limit),
Logs: pblgs,
}

lg, err := s.threads.store.LogInfo(id, lid)
if err != nil {
return nil, err
}
if lg.PubKey == nil {
return nil, fmt.Errorf("log not found")
}

// Pull from each address
recs := newRecords()
recs := make(map[peer.ID]*records)
wg := sync.WaitGroup{}
for _, addr := range lg.Addrs {
wg.Add(1)
Expand Down Expand Up @@ -182,21 +198,49 @@ func (s *service) getRecords(
return
}

log.Debugf("received %d records from %s", len(reply.Records), p)
for _, l := range reply.Logs {
log.Debugf("received %d records in log %s from %s", len(l.Records), l.LogID.ID.String(), p)

for _, r := range reply.Records {
rec, err := cbor.RecordFromProto(r, lg.FollowKey)
lg, err := s.threads.store.LogInfo(id, l.LogID.ID)
if err != nil {
log.Error(err)
return
}
recs.Store(rec.Cid(), rec)
if lg.PubKey == nil {
if l.Log != nil {
lg = logFromProto(l.Log)
if err = s.threads.store.AddLog(id, lg); err != nil {
log.Error(err)
return
}
} else {
continue
}
}

if recs[lg.ID] == nil {
recs[lg.ID] = newRecords()
}

for _, r := range l.Records {
rec, err := cbor.RecordFromProto(r, lg.FollowKey)
if err != nil {
log.Error(err)
return
}
recs[lg.ID].Store(rec.Cid(), rec)
}
}
}(addr)
}

wg.Wait()
return recs.List(), nil

res := make(map[peer.ID][]thread.Record)
for lid, rs := range recs {
res[lid] = rs.List()
}
return res, nil
}

// pushRecord to log addresses and thread topic.
Expand Down
Loading

0 comments on commit 28707a3

Please sign in to comment.