Skip to content

Commit

Permalink
[plugin] Triggers input event without lock
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiyanliu authored and Jack47 committed Dec 12, 2017
1 parent 4c9b2af commit 2cf6124
Showing 1 changed file with 9 additions and 16 deletions.
25 changes: 9 additions & 16 deletions src/plugins/http_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,7 @@ type httpInput struct {
instanceId string
waitQueueLengthIndicatorAdded bool
wipRequestCountIndicatorAdded bool
contextsLock sync.RWMutex
contexts map[string]pipelines.PipelineContext
contexts *sync.Map
}

func httpInputConstructor(conf plugins.Config) (plugins.Plugin, plugins.PluginType, bool, error) {
Expand All @@ -209,7 +208,7 @@ func httpInputConstructor(conf plugins.Config) (plugins.Plugin, plugins.PluginTy
h := &httpInput{
conf: c,
httpTaskChan: make(chan *httpTask, 32767),
contexts: make(map[string]pipelines.PipelineContext),
contexts: new(sync.Map),
}

h.instanceId = fmt.Sprintf("%p", h)
Expand Down Expand Up @@ -279,9 +278,7 @@ func (h *httpInput) Prepare(ctx pipelines.PipelineContext) {

h.wipRequestCountIndicatorAdded = added

h.contextsLock.Lock()
h.contexts[ctx.PipelineName()] = ctx
h.contextsLock.Unlock()
h.contexts.Store(ctx.PipelineName(), ctx)
}

func (h *httpInput) handler(w http.ResponseWriter, req *http.Request, urlParams map[string]string,
Expand Down Expand Up @@ -317,13 +314,11 @@ func (h *httpInput) handler(w http.ResponseWriter, req *http.Request, urlParams
}()
h.httpTaskChan <- &httpTask

h.contextsLock.RLock()

for _, ctx := range h.contexts {
ctx.TriggerSourceInput("httpTaskQueueLengthGetter", h.getHTTPTaskQueueLength)
}

h.contextsLock.RUnlock()
h.contexts.Range(func(key, value interface{}) bool {
value.(pipelines.PipelineContext).TriggerSourceInput(
"httpTaskQueueLengthGetter", h.getHTTPTaskQueueLength)
return true // iterate next
})
}()

<-httpTask.finishedChan
Expand Down Expand Up @@ -549,9 +544,7 @@ func (h *httpInput) CleanUp(ctx pipelines.PipelineContext) {
ctx.Statistics().UnregisterPluginIndicator(h.Name(), h.instanceId, "WIP_REQUEST_COUNT")
}

h.contextsLock.Lock()
delete(h.contexts, ctx.PipelineName())
h.contextsLock.Unlock()
h.contexts.Delete(ctx.PipelineName())
}

func (h *httpInput) Close() {
Expand Down

0 comments on commit 2cf6124

Please sign in to comment.