Skip to content

Commit

Permalink
Uses lock to prevent overwrite map item for concurrent requests
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiyanliu authored and Jack47 committed Dec 20, 2017
1 parent 20c5b45 commit 3ae0ec9
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions src/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Cluster struct {
leftMembers, failedMembers *memberStatusBook
memberOperations *memberOperationBook

requestSendLock sync.Mutex
requestLock sync.RWMutex
futures map[logicalTime]*Future
requestOperations *requestOperationBook
Expand Down Expand Up @@ -227,8 +228,17 @@ func (c *Cluster) Request(name string, payload []byte, param *RequestParam) (*Fu
h.Write([]byte(uuid))

requestId := h.Sum64()

c.requestSendLock.Lock()
defer c.requestSendLock.Unlock()

requestTime := c.requestClock.Time()

msg, err := c.createRequestMessage(requestId, name, requestTime, payload, param)
if err != nil {
return nil, err
}

future := createFuture(requestId, requestTime, c.memberList.NumMembers(), param,
func() {
c.requestLock.Lock()
Expand All @@ -241,7 +251,7 @@ func (c *Cluster) Request(name string, payload []byte, param *RequestParam) (*Fu
c.futures[requestTime] = future
c.requestLock.Unlock()

err = c.broadcastRequestMessage(requestId, name, requestTime, payload, param)
err = c.broadcastRequestMessage(msg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -447,7 +457,7 @@ func (c *Cluster) updateNode(node *memberlist.Node) {

func (c *Cluster) resolveNodeConflict(knownNode, otherNode *memberlist.Node) {
if c.conf.NodeName != knownNode.Name {
logger.Warnf("[dectcted node name %s conflict at %s:%d and %s:%d]", knownNode.Name,
logger.Warnf("[detected node name %s conflict at %s:%d and %s:%d]", knownNode.Name,
knownNode.Addr, knownNode.Port, otherNode.Addr, otherNode.Port)
return
}
Expand Down Expand Up @@ -588,7 +598,7 @@ func (c *Cluster) operateResponse(msg *messageResponse) bool {
}

if future.requestId != msg.RequestId {
logger.Warnf("[BUG: request id %d is mismatch with response %d, ignored]",
logger.Warnf("[request id %d is mismatch with the id in response %d, ignored]",
future.requestId, msg.RequestId)
return false
}
Expand Down Expand Up @@ -722,8 +732,8 @@ func (c *Cluster) broadcastMemberLeaveMessage(nodeName string) error {
return nil
}

func (c *Cluster) broadcastRequestMessage(requestId uint64, name string, requestTime logicalTime,
payload []byte, param *RequestParam) error {
func (c *Cluster) createRequestMessage(requestId uint64, name string, requestTime logicalTime,
payload []byte, param *RequestParam) (*messageRequest, error) {

var flag uint32
if param.RequireAck {
Expand All @@ -732,7 +742,7 @@ func (c *Cluster) broadcastRequestMessage(requestId uint64, name string, request

source := c.memberList.LocalNode()

msg := messageRequest{
msg := &messageRequest{
RequestId: requestId,
RequestName: name,
RequestTime: requestTime,
Expand All @@ -747,18 +757,22 @@ func (c *Cluster) broadcastRequestMessage(requestId uint64, name string, request

err := msg.applyFilters(param)
if err != nil {
return err
return nil, err
}

return msg, nil
}

func (c *Cluster) broadcastRequestMessage(msg *messageRequest) error {
// handle operation message locally
c.operateRequest(&msg)
c.operateRequest(msg)

if !c.anyAlivePeerMembers() {
// no peer can respond
return nil
}

err = fanoutMessage(c.requestMessageSendQueue, &msg, requestMessage, nil) // need not to care if sending is done
err := fanoutMessage(c.requestMessageSendQueue, msg, requestMessage, nil) // need not to care if sending is done
if err != nil {
logger.Errorf("[broadcast request message failed: %v]", err)
return err
Expand Down

0 comments on commit 3ae0ec9

Please sign in to comment.