Skip to content

Commit

Permalink
service: adds a pull ticker
Browse files Browse the repository at this point in the history
Signed-off-by: Sander Pick <[email protected]>
  • Loading branch information
sanderpick committed Oct 16, 2019
1 parent 7db3efb commit eb554ce
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 8 deletions.
7 changes: 4 additions & 3 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func newService(t *threads) (*service, error) {
return s, nil
}

// Push receives a record request.
// Push receives a push request.
func (s *service) Push(ctx context.Context, req *pb.PushRequest) (*pb.PushReply, error) {
if req.Header == nil {
return nil, fmt.Errorf("request header is required")
Expand Down Expand Up @@ -158,6 +158,7 @@ func (s *service) Push(ctx context.Context, req *pb.PushRequest) (*pb.PushReply,
return reply, nil
}

// Pull receives a pull request.
func (s *service) Pull(ctx context.Context, req *pb.PullRequest) (*pb.PullReply, error) {
recs, err := s.threads.pullLocal(
ctx, req.ThreadID.ID,
Expand Down Expand Up @@ -561,8 +562,8 @@ func (s *service) handleInvite(
}

// Download log history
// @todo: should this even happen unless direcly asked for by a user?
// @todo: if auto, do we need to queue download a la, threads v1?
// @todo: Should this even happen unless direcly asked for by a user?
// @todo: If auto, do we need to queue downloads a la, threads v1?
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
Expand Down
2 changes: 2 additions & 0 deletions test/threads_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func ThreadsTest(t *testing.T) {
ts1 := newService(t, m1)
ts2 := newService(t, m2)

time.Sleep(time.Second)

ts1.Host().Peerstore().AddAddrs(ts2.Host().ID(), ts2.Host().Addrs(), peerstore.PermanentAddrTTL)
ts2.Host().Peerstore().AddAddrs(ts1.Host().ID(), ts1.Host().Addrs(), peerstore.PermanentAddrTTL)

Expand Down
45 changes: 40 additions & 5 deletions threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"sync"
"time"

bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -58,12 +59,16 @@ func init() {
// MaxPullLimit is the maximum page size for pulling records.
var MaxPullLimit = 10000

// PullInterval is the interval between automatic log pulls.
var PullInterval = time.Second * 10

// threads is an implementation of Threadservice.
type threads struct {
host host.Host
blocks bserv.BlockService
service *service
dagService format.DAGService
rpc *grpc.Server
service *service
bus *broadcast.Broadcaster
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -95,6 +100,7 @@ func NewThreads(
host: h,
blocks: bs,
dagService: ds,
rpc: grpc.NewServer(),
bus: broadcast.NewBroadcaster(0),
ctx: ctx,
cancel: cancel,
Expand All @@ -109,15 +115,18 @@ func NewThreads(
if err != nil {
return nil, err
}
rpc := grpc.NewServer()
go rpc.Serve(listener)
pb.RegisterThreadsServer(rpc, t.service)
go t.rpc.Serve(listener)
pb.RegisterThreadsServer(t.rpc, t.service)

go t.startPulling()

return t, nil
}

// Close the threads instance.
func (t *threads) Close() (err error) {
t.rpc.GracefulStop()

var errs []error
weakClose := func(name string, c interface{}) {
if cl, ok := c.(io.Closer); ok {
Expand All @@ -127,7 +136,7 @@ func (t *threads) Close() (err error) {
}
}

//weakClose("host", t.host) @todo: fix panic on close
weakClose("host", t.host)
weakClose("dagservice", t.dagService)
weakClose("threadstore", t.Threadstore)

Expand Down Expand Up @@ -566,6 +575,32 @@ func (t *threads) pullLocal(
return recs, nil
}

// startPulling periodically pulls on all threads.
func (t *threads) startPulling() {
tick := time.NewTicker(PullInterval)
defer tick.Stop()

for {
select {
case <-tick.C:
ts, err := t.Threads()
if err != nil {
log.Errorf("error listing threads: %s", err)
continue
}
for _, tid := range ts {
go func(id thread.ID) {
if err := t.Pull(t.ctx, id); err != nil {
log.Errorf("error pulling thread %s: %s", tid.String(), err)
}
}(tid)
}
case <-t.ctx.Done():
return
}
}
}

// setLogLevels sets the logging levels of the given log systems.
// color controls whether or not color codes are included in the output.
func setLogLevels(systems map[string]logger.Level, color bool) error {
Expand Down

0 comments on commit eb554ce

Please sign in to comment.