Skip to content

Commit

Permalink
Merge pull request #130 from aliyun/fix/producer-consumer
Browse files Browse the repository at this point in the history
[fix] consumer map race
  • Loading branch information
shabicheng committed Jul 14, 2021
2 parents ca0645c + 921e143 commit d1c9ae9
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 48 deletions.
80 changes: 46 additions & 34 deletions consumer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

"github.com/aliyun/aliyun-log-go-sdk"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.uber.org/atomic"
Expand All @@ -16,7 +16,7 @@ type ConsumerWorker struct {
consumerHeatBeat *ConsumerHeatBeat
client *ConsumerClient
workerShutDownFlag *atomic.Bool
shardConsumer map[int]*ShardConsumerWorker
shardConsumer sync.Map // map[int]*ShardConsumerWorker
do func(shard int, logGroup *sls.LogGroupList) string
waitGroup sync.WaitGroup
Logger log.Logger
Expand All @@ -30,9 +30,9 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str
consumerHeatBeat: consumerHeatBeat,
client: consumerClient,
workerShutDownFlag: atomic.NewBool(false),
shardConsumer: make(map[int]*ShardConsumerWorker),
do: do,
Logger: logger,
//shardConsumer: make(map[int]*ShardConsumerWorker),
do: do,
Logger: logger,
}
consumerClient.createConsumerGroup()
return consumerWorker
Expand Down Expand Up @@ -82,50 +82,62 @@ func (consumerWorker *ConsumerWorker) run() {
func (consumerWorker *ConsumerWorker) shutDownAndWait() {
for {
time.Sleep(500 * time.Millisecond)
for shard, consumer := range consumerWorker.shardConsumer {
if !consumer.isShutDownComplete() {
consumer.consumerShutDown()
} else if consumer.isShutDownComplete() {
delete(consumerWorker.shardConsumer, shard)
}
}
if len(consumerWorker.shardConsumer) == 0 {
count := 0
consumerWorker.shardConsumer.Range(
func(key, value interface{}) bool {
count++
consumer := value.(*ShardConsumerWorker)
if !consumer.isShutDownComplete() {
consumer.consumerShutDown()
} else if consumer.isShutDownComplete() {
consumerWorker.shardConsumer.Delete(key)
}
return true
},
)
if count == 0 {
break
}
}

}

func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsumerWorker {
consumer := consumerWorker.shardConsumer[shardId]
if consumer != nil {
return consumer
consumer, ok := consumerWorker.shardConsumer.Load(shardId)
if ok {
return consumer.(*ShardConsumerWorker)
}
consumer = initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.do, consumerWorker.Logger)
consumerWorker.shardConsumer[shardId] = consumer
return consumer
consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.do, consumerWorker.Logger)
consumerWorker.shardConsumer.Store(shardId, consumerIns)
return consumerIns

}

func (consumerWorker *ConsumerWorker) cleanShardConsumer(owned_shards []int) {
for shard, consumer := range consumerWorker.shardConsumer {

if !Contain(shard, owned_shards) {
level.Info(consumerWorker.Logger).Log("msg", "try to call shut down for unassigned consumer shard", "shardId", shard)
consumer.consumerShutDown()
level.Info(consumerWorker.Logger).Log("msg", "Complete call shut down for unassigned consumer shard", "shardId", shard)
}
consumerWorker.shardConsumer.Range(
func(key, value interface{}) bool {
shard := key.(int)
consumer := value.(*ShardConsumerWorker)

if consumer.isShutDownComplete() {
isDeleteShard := consumerWorker.consumerHeatBeat.removeHeartShard(shard)
if isDeleteShard {
level.Info(consumerWorker.Logger).Log("msg", "Remove an assigned consumer shard", "shardId", shard)
delete(consumerWorker.shardConsumer, shard)
} else {
level.Info(consumerWorker.Logger).Log("msg", "Remove an assigned consumer shard failed", "shardId", shard)
if !Contain(shard, owned_shards) {
level.Info(consumerWorker.Logger).Log("msg", "try to call shut down for unassigned consumer shard", "shardId", shard)
consumer.consumerShutDown()
level.Info(consumerWorker.Logger).Log("msg", "Complete call shut down for unassigned consumer shard", "shardId", shard)
}
}
}

if consumer.isShutDownComplete() {
isDeleteShard := consumerWorker.consumerHeatBeat.removeHeartShard(shard)
if isDeleteShard {
level.Info(consumerWorker.Logger).Log("msg", "Remove an assigned consumer shard", "shardId", shard)
consumerWorker.shardConsumer.Delete(shard)
} else {
level.Info(consumerWorker.Logger).Log("msg", "Remove an assigned consumer shard failed", "shardId", shard)
}
}
return true
},
)
}

// This function is used to initialize the global log configuration
Expand Down
15 changes: 9 additions & 6 deletions producer/io_thread_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func (threadPool *IoThreadPool) addTask(batch *ProducerBatch) {
func (threadPool *IoThreadPool) popTask() *ProducerBatch {
defer threadPool.lock.Unlock()
threadPool.lock.Lock()
if threadPool.queue.Len() <= 0 {
return nil
}
ele := threadPool.queue.Front()
threadPool.queue.Remove(ele)
return ele.Value.(*ProducerBatch)
Expand All @@ -50,12 +53,12 @@ func (threadPool *IoThreadPool) hasTask() bool {
func (threadPool *IoThreadPool) start(ioWorkerWaitGroup *sync.WaitGroup, ioThreadPoolwait *sync.WaitGroup) {
defer ioThreadPoolwait.Done()
for {
if threadPool.hasTask() {
select {
case threadPool.ioworker.maxIoWorker <- 1:
ioWorkerWaitGroup.Add(1)
go threadPool.ioworker.sendToServer(threadPool.popTask(), ioWorkerWaitGroup)
}
if task := threadPool.popTask(); task != nil {
threadPool.ioworker.startSendTask(ioWorkerWaitGroup)
go func(producerBatch *ProducerBatch) {
defer threadPool.ioworker.closeSendTask(ioWorkerWaitGroup)
threadPool.ioworker.sendToServer(producerBatch)
}(task)
} else {
if !threadPool.threadPoolShutDownFlag.Load() {
time.Sleep(100 * time.Millisecond)
Expand Down
17 changes: 9 additions & 8 deletions producer/io_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,9 @@ func initIoWorker(client sls.ClientInterface, retryQueue *RetryQueue, logger log
}
}

func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch, ioWorkerWaitGroup *sync.WaitGroup) {
if producerBatch == nil || ioWorkerWaitGroup == nil {
return
}
func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch) {
level.Debug(ioWorker.logger).Log("msg", "ioworker send data to server")
defer ioWorker.closeSendTask(ioWorkerWaitGroup)
var err error
atomic.AddInt64(&ioWorker.taskCount, 1)
if producerBatch.shardHash != nil {
err = ioWorker.client.PostLogStoreLogs(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup, producerBatch.getShardHash())
} else {
Expand Down Expand Up @@ -116,9 +111,15 @@ func (ioWorker *IoWorker) addErrorMessageToBatchAttempt(producerBatch *ProducerB
}

func (ioWorker *IoWorker) closeSendTask(ioWorkerWaitGroup *sync.WaitGroup) {
ioWorkerWaitGroup.Done()
atomic.AddInt64(&ioWorker.taskCount, -1)
<-ioWorker.maxIoWorker
atomic.AddInt64(&ioWorker.taskCount, -1)
ioWorkerWaitGroup.Done()
}

func (ioWorker *IoWorker) startSendTask(ioWorkerWaitGroup *sync.WaitGroup) {
atomic.AddInt64(&ioWorker.taskCount, 1)
ioWorker.maxIoWorker <- 1
ioWorkerWaitGroup.Add(1)
}

func (ioWorker *IoWorker) excuteFailedCallback(producerBatch *ProducerBatch) {
Expand Down

0 comments on commit d1c9ae9

Please sign in to comment.