Skip to content

Commit

Permalink
[cluster] Outputs a log when the request sent or timeout before send
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiyanliu authored and Jack47 committed Dec 26, 2017
1 parent 4ec376d commit 992f261
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions src/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,10 +686,13 @@ func (c *Cluster) broadcastMemberJoinMessage() error {
return err
}

timer := time.NewTimer(c.conf.MessageSendTimeout)
defer timer.Stop()

select {
case <-sentNotify:
case <-time.After(c.conf.MessageSendTimeout):
fmt.Println("send timeout:", c.conf.MessageSendTimeout)
logger.Debugf("[member join message is sent to the peers]")
case <-timer.C:
return fmt.Errorf("broadcast member join message timeout")
case <-c.stop:
return fmt.Errorf("cluster stopped")
Expand Down Expand Up @@ -721,9 +724,13 @@ func (c *Cluster) broadcastMemberLeaveMessage(nodeName string) error {
return err
}

timer := time.NewTimer(c.conf.MessageSendTimeout)
defer timer.Stop()

select {
case <-sentNotify:
case <-time.After(c.conf.MessageSendTimeout):
logger.Debugf("[member leave message is sent to the peers]")
case <-timer.C:
return fmt.Errorf("broadcast member leave message timeout")
case <-c.stop:
return fmt.Errorf("cluster stopped")
Expand Down Expand Up @@ -768,16 +775,33 @@ func (c *Cluster) broadcastRequestMessage(msg *messageRequest) error {
c.operateRequest(msg)

if !c.anyAlivePeerMembers() {
logger.Warnf("[no peer can respond, request ignored]")
// no peer can respond
return nil
}

err := fanoutMessage(c.requestMessageSendQueue, msg, requestMessage, nil) // need not to care if sending is done
sentNotify := make(chan struct{})

err := fanoutMessage(c.requestMessageSendQueue, msg, requestMessage, sentNotify)
if err != nil {
logger.Errorf("[broadcast request message failed: %v]", err)
return err
}

go func() {
timer := time.NewTimer(msg.RequestTimeout)
defer timer.Stop()

select {
case <-sentNotify:
logger.Debugf("[request %s is sent to the peers]", msg.RequestName)
case <-timer.C:
logger.Warnf("[request %s is timeout before send to the peers]", msg.RequestName)
case <-c.stop:
// exits
}
}()

return nil
}

Expand Down

0 comments on commit 992f261

Please sign in to comment.