From eb554cea206be0cdebba75720e6346ba281d99e4 Mon Sep 17 00:00:00 2001 From: Sander Pick Date: Tue, 15 Oct 2019 17:18:21 -0700 Subject: [PATCH] service: adds a pull ticker Signed-off-by: Sander Pick --- service.go | 7 ++++--- test/threads_suite.go | 2 ++ threads.go | 45 ++++++++++++++++++++++++++++++++++++++----- 3 files changed, 46 insertions(+), 8 deletions(-) diff --git a/service.go b/service.go index fdc4b3af..677607f8 100644 --- a/service.go +++ b/service.go @@ -65,7 +65,7 @@ func newService(t *threads) (*service, error) { return s, nil } -// Push receives a record request. +// Push receives a push request. func (s *service) Push(ctx context.Context, req *pb.PushRequest) (*pb.PushReply, error) { if req.Header == nil { return nil, fmt.Errorf("request header is required") @@ -158,6 +158,7 @@ func (s *service) Push(ctx context.Context, req *pb.PushRequest) (*pb.PushReply, return reply, nil } +// Pull receives a pull request. func (s *service) Pull(ctx context.Context, req *pb.PullRequest) (*pb.PullReply, error) { recs, err := s.threads.pullLocal( ctx, req.ThreadID.ID, @@ -561,8 +562,8 @@ func (s *service) handleInvite( } // Download log history - // @todo: should this even happen unless direcly asked for by a user? - // @todo: if auto, do we need to queue download a la, threads v1? + // @todo: Should this even happen unless direcly asked for by a user? + // @todo: If auto, do we need to queue downloads a la, threads v1? go func() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() diff --git a/test/threads_suite.go b/test/threads_suite.go index add20899..c442891d 100644 --- a/test/threads_suite.go +++ b/test/threads_suite.go @@ -40,6 +40,8 @@ func ThreadsTest(t *testing.T) { ts1 := newService(t, m1) ts2 := newService(t, m2) + time.Sleep(time.Second) + ts1.Host().Peerstore().AddAddrs(ts2.Host().ID(), ts2.Host().Addrs(), peerstore.PermanentAddrTTL) ts2.Host().Peerstore().AddAddrs(ts1.Host().ID(), ts1.Host().Addrs(), peerstore.PermanentAddrTTL) diff --git a/threads.go b/threads.go index 3e33f1b0..05d27774 100644 --- a/threads.go +++ b/threads.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "sync" + "time" bserv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" @@ -58,12 +59,16 @@ func init() { // MaxPullLimit is the maximum page size for pulling records. var MaxPullLimit = 10000 +// PullInterval is the interval between automatic log pulls. +var PullInterval = time.Second * 10 + // threads is an implementation of Threadservice. type threads struct { host host.Host blocks bserv.BlockService - service *service dagService format.DAGService + rpc *grpc.Server + service *service bus *broadcast.Broadcaster ctx context.Context cancel context.CancelFunc @@ -95,6 +100,7 @@ func NewThreads( host: h, blocks: bs, dagService: ds, + rpc: grpc.NewServer(), bus: broadcast.NewBroadcaster(0), ctx: ctx, cancel: cancel, @@ -109,15 +115,18 @@ func NewThreads( if err != nil { return nil, err } - rpc := grpc.NewServer() - go rpc.Serve(listener) - pb.RegisterThreadsServer(rpc, t.service) + go t.rpc.Serve(listener) + pb.RegisterThreadsServer(t.rpc, t.service) + + go t.startPulling() return t, nil } // Close the threads instance. func (t *threads) Close() (err error) { + t.rpc.GracefulStop() + var errs []error weakClose := func(name string, c interface{}) { if cl, ok := c.(io.Closer); ok { @@ -127,7 +136,7 @@ func (t *threads) Close() (err error) { } } - //weakClose("host", t.host) @todo: fix panic on close + weakClose("host", t.host) weakClose("dagservice", t.dagService) weakClose("threadstore", t.Threadstore) @@ -566,6 +575,32 @@ func (t *threads) pullLocal( return recs, nil } +// startPulling periodically pulls on all threads. +func (t *threads) startPulling() { + tick := time.NewTicker(PullInterval) + defer tick.Stop() + + for { + select { + case <-tick.C: + ts, err := t.Threads() + if err != nil { + log.Errorf("error listing threads: %s", err) + continue + } + for _, tid := range ts { + go func(id thread.ID) { + if err := t.Pull(t.ctx, id); err != nil { + log.Errorf("error pulling thread %s: %s", tid.String(), err) + } + }(tid) + } + case <-t.ctx.Done(): + return + } + } +} + // setLogLevels sets the logging levels of the given log systems. // color controls whether or not color codes are included in the output. func setLogLevels(systems map[string]logger.Level, color bool) error {