Skip to content

Commit

Permalink
[statistics] Adds API to query multiple indicators in batch for clust…
Browse files Browse the repository at this point in the history
…er mode
  • Loading branch information
zhiyanliu authored and Jack47 committed Dec 29, 2017
1 parent d5680c5 commit a9eb3a3
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 21 deletions.
87 changes: 86 additions & 1 deletion doc/cluster_statistics_api_ref.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,61 @@ paths:
description: The description of statistics indicator.
schema:
$ref: '#/definitions/ClusterPipelineIndicatorDescription'
/{group}/pipelines/{pipelineName}/indicators/value:
get:
summary: Retrieves Pipeline Statistics Values From Multiple Indicators in a Group
description: |
The Pipeline Statistics Indicators Value retrieve endpoint in the group
returns the values of given multiple statistics indicators pipeline instance exposed.
parameters:
- name: group
in: path
required: true
type: string
description: Group name of cluster to query to query.
- name: pipelineName
in: path
description: Pipeline instance name to query.
required: true
type: string
- name: pipelineIndicatorsValueRetrieveRequest
in: body
schema:
$ref: '#/definitions/ClusterPipelineIndicatorsValueRetrieveRequest'
required: true
description: |
Timeout, the flag to present if returns the result of each cluster memeber and indicator names to query.
responses:
500:
description: Handle the reqeuest failed by internal error.
schema:
$ref: '#/definitions/Error'
503:
description: All operations are disallowed when running in standalone mode.
schema:
$ref: '#/definitions/Error'
400:
description: |
Invalid group name, timeout is less than 10s, invalid pipeline instance name or
invalid indicator names to query.
schema:
$ref: '#/definitions/Error'
404:
description: The statistics of pipeline instance not found by given name.
schema:
$ref: '#/definitions/Error'
408:
description: The timeout period expired.
schema:
$ref: '#/definitions/Error'
410:
description: The endpoint has gone.
schema:
$ref: '#/definitions/Error'
200:
description: The values of statistics indicators.
schema:
$ref: '#/definitions/ClusterPipelineIndicatorsValue'
/{group}/pipelines/{pipelineName}/task/indicators:
get:
summary: Retrieves Task Statistics Indicator Names in a Group
Expand Down Expand Up @@ -572,7 +627,7 @@ definitions:
type: integer
minimum: 10
maximum: 65535
default: 30
default: 120
details:
type: boolean
default: false
Expand Down Expand Up @@ -680,6 +735,36 @@ definitions:
description: |
A human readable description of the special plugin indicator type.
Check Pipeline Reference document for more information.
ClusterPipelineIndicatorsValueRetrieveRequest:
type: object
required:
- indicator_names
properties:
timeout_sec:
type: integer
minimum: 10
maximum: 65535
default: 120
details:
type: boolean
default: false
indicator_names:
type: array
items:
type: string
description: The name of the indicator to query statistics value.
ClusterPipelineIndicatorsValue:
type: object
required:
- values
properties:
values:
type: object
additionalProperties:
type: object
description: |
A special value object of the pipeline indicator type.
Check Pipeline Reference document for more information.
ClusterTaskIndicatorNames:
type: object
required:
Expand Down
1 change: 0 additions & 1 deletion src/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,6 @@ func (c *Cluster) broadcastRequestMessage(msg *messageRequest) error {

if !c.anyAlivePeerMembers() {
logger.Warnf("[no peer can respond, request ignored]")
// no peer can respond
return nil
}

Expand Down
29 changes: 29 additions & 0 deletions src/cluster/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,35 @@ func (gc *GatewayCluster) StatPipelineIndicatorValue(group string, timeout time.
return ret, nil
}

func (gc *GatewayCluster) StatPipelineIndicatorsValue(group string, timeout time.Duration,
pipelineName string, indicatorNames []string) (*ResultStatIndicatorsValue, *ClusterError) {

if gc.Stopped() {
return nil, newClusterError("can not retrieve pipeline statistics indicators value due to cluster gone",
IssueMemberGoneError)
}

filter := FilterPipelineIndicatorsValue{
PipelineName: pipelineName,
IndicatorNames: indicatorNames,
}

requestName := fmt.Sprintf("(group(%s)stat_pipleine_indicators_value)", group)

resp, err := gc.issueStat(group, timeout, requestName, &filter)
if err != nil {
return nil, err
}

ret, ok := resp.(*ResultStatIndicatorsValue)
if !ok {
logger.Errorf("[BUG: stat pipeline indicators value returns invalid result, got type %T]", resp)
return nil, newClusterError("stat pipeline indicators value returns invalid result", InternalServerError)
}

return ret, nil
}

func (gc *GatewayCluster) StatPipelineIndicatorDesc(group string, timeout time.Duration,
pipelineName, indicatorName string) (*ResultStatIndicatorDesc, *ClusterError) {

Expand Down
33 changes: 21 additions & 12 deletions src/cluster/gateway/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,25 @@ type (
ReqStat struct {
Timeout time.Duration

FilterPipelineIndicatorNames *FilterPipelineIndicatorNames
FilterPipelineIndicatorValue *FilterPipelineIndicatorValue
FilterPipelineIndicatorDesc *FilterPipelineIndicatorDesc
FilterPluginIndicatorNames *FilterPluginIndicatorNames
FilterPluginIndicatorValue *FilterPluginIndicatorValue
FilterPluginIndicatorDesc *FilterPluginIndicatorDesc
FilterTaskIndicatorNames *FilterTaskIndicatorNames
FilterTaskIndicatorValue *FilterTaskIndicatorValue
FilterTaskIndicatorDesc *FilterTaskIndicatorDesc
FilterPipelineIndicatorNames *FilterPipelineIndicatorNames
FilterPipelineIndicatorValue *FilterPipelineIndicatorValue
FilterPipelineIndicatorsValue *FilterPipelineIndicatorsValue
FilterPipelineIndicatorDesc *FilterPipelineIndicatorDesc
FilterPluginIndicatorNames *FilterPluginIndicatorNames
FilterPluginIndicatorValue *FilterPluginIndicatorValue
FilterPluginIndicatorDesc *FilterPluginIndicatorDesc
FilterTaskIndicatorNames *FilterTaskIndicatorNames
FilterTaskIndicatorValue *FilterTaskIndicatorValue
FilterTaskIndicatorDesc *FilterTaskIndicatorDesc
}
// Pack Header: statMessage | statRelayMessage
RespStat struct {
Err *ClusterError

Names []byte // json
Value []byte // json
Desc []byte // json
Names []byte // json
Value []byte // json
Values []byte // json
Desc []byte // json
}
FilterPipelineIndicatorNames struct {
PipelineName string
Expand All @@ -184,6 +186,10 @@ type (
PipelineName string
IndicatorName string
}
FilterPipelineIndicatorsValue struct {
PipelineName string
IndicatorNames []string
}
FilterPipelineIndicatorDesc struct {
PipelineName string
IndicatorName string
Expand Down Expand Up @@ -219,6 +225,9 @@ type (
ResultStatIndicatorValue struct {
Value interface{} `json:"value"`
}
ResultStatIndicatorsValue struct {
Values map[string]interface{} `json:"values"`
}
ResultStatIndicatorDesc struct {
Desc interface{} `json:"desc"`
}
Expand Down
Loading

0 comments on commit a9eb3a3

Please sign in to comment.