Skip to content

Commit

Permalink
Add throttle processor
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Aug 10, 2018
1 parent d30a88d commit 753d4b3
Show file tree
Hide file tree
Showing 10 changed files with 338 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.

## Unreleased

### 0.23.9 - 2018-08-10

### Added

- New `throttle` processor.

### 0.23.6 - 2018-08-09

### Added
Expand Down
1 change: 1 addition & 0 deletions config/env/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ PROCESSOR_SPLIT_SIZE = 1
PROCESSOR_TEXT_ARG
PROCESSOR_TEXT_OPERATOR = trim_space
PROCESSOR_TEXT_VALUE
PROCESSOR_THROTTLE_PERIOD = 100us
PROCESSOR_UNARCHIVE_FORMAT = binary
```

Expand Down
2 changes: 2 additions & 0 deletions config/env/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ pipeline:
arg: ${PROCESSOR_TEXT_ARG}
operator: ${PROCESSOR_TEXT_OPERATOR:trim_space}
value: ${PROCESSOR_TEXT_VALUE}
throttle:
period: ${PROCESSOR_THROTTLE_PERIOD:100us}
type: ${PROCESSOR_TYPE:noop}
unarchive:
format: ${PROCESSOR_UNARCHIVE_FORMAT:binary}
Expand Down
2 changes: 2 additions & 0 deletions config/everything.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ pipeline:
operator: trim_space
arg: ""
value: ""
throttle:
period: 100us
unarchive:
format: binary
parts: []
Expand Down
59 changes: 59 additions & 0 deletions config/processors/throttle.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"http": {
"address": "0.0.0.0:4195",
"read_timeout_ms": 5000,
"root_path": "/benthos",
"debug_endpoints": false
},
"input": {
"type": "stdin",
"stdin": {
"delimiter": "",
"max_buffer": 1000000,
"multipart": false
}
},
"buffer": {
"type": "none",
"none": {}
},
"pipeline": {
"processors": [
{
"type": "throttle",
"throttle": {
"period": "100us"
}
}
],
"threads": 1
},
"output": {
"type": "stdout",
"stdout": {
"delimiter": ""
}
},
"resources": {
"caches": {},
"conditions": {}
},
"logger": {
"prefix": "benthos",
"level": "INFO",
"add_timestamp": true,
"json_format": true
},
"metrics": {
"type": "http_server",
"prefix": "benthos",
"http_server": {},
"prometheus": {},
"statsd": {
"address": "localhost:4040",
"flush_period": "100ms",
"max_packet_size": 1440,
"network": "udp"
}
}
}
43 changes: 43 additions & 0 deletions config/processors/throttle.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# This file was auto generated by benthos_config_gen.
http:
address: 0.0.0.0:4195
read_timeout_ms: 5000
root_path: /benthos
debug_endpoints: false
input:
type: stdin
stdin:
delimiter: ""
max_buffer: 1e+06
multipart: false
buffer:
type: none
none: {}
pipeline:
processors:
- type: throttle
throttle:
period: 100us
threads: 1
output:
type: stdout
stdout:
delimiter: ""
resources:
caches: {}
conditions: {}
logger:
prefix: benthos
level: INFO
add_timestamp: true
json_format: true
metrics:
type: http_server
prefix: benthos
http_server: {}
prometheus: {}
statsd:
address: localhost:4040
flush_period: 100ms
max_packet_size: 1440
network: udp
18 changes: 17 additions & 1 deletion docs/processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ element will be selected, and so on.
25. [`select_parts`](#select_parts)
26. [`split`](#split)
27. [`text`](#text)
28. [`unarchive`](#unarchive)
28. [`throttle`](#throttle)
29. [`unarchive`](#unarchive)

## `archive`

Expand Down Expand Up @@ -864,6 +865,21 @@ Removes all leading and trailing whitespace from the payload.

Removes all leading and trailing occurrences of characters within the arg field.

## `throttle`

``` yaml
type: throttle
throttle:
period: 100us
```

Throttles the throughput of a pipeline to a maximum of one message batch per
period. This throttle is per processing pipeline, and therefore four threads
each with a throttle would result in four times the rate specified.

The period should be specified as a time duration string. For example, '1s'
would be 1 second, '10ms' would be 10 milliseconds, etc.

## `unarchive`

``` yaml
Expand Down
3 changes: 3 additions & 0 deletions lib/processor/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ const (
TypeSelectParts = "select_parts"
TypeSplit = "split"
TypeText = "text"
TypeThrottle = "throttle"
TypeUnarchive = "unarchive"
)

Expand Down Expand Up @@ -116,6 +117,7 @@ type Config struct {
SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"`
Split SplitConfig `json:"split" yaml:"split"`
Text TextConfig `json:"text" yaml:"text"`
Throttle ThrottleConfig `json:"throttle" yaml:"throttle"`
Unarchive UnarchiveConfig `json:"unarchive" yaml:"unarchive"`
}

Expand Down Expand Up @@ -149,6 +151,7 @@ func NewConfig() Config {
SelectParts: NewSelectPartsConfig(),
Split: NewSplitConfig(),
Text: NewTextConfig(),
Throttle: NewThrottleConfig(),
Unarchive: NewUnarchiveConfig(),
}
}
Expand Down
119 changes: 119 additions & 0 deletions lib/processor/throttle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (c) 2018 Ashley Jeffs
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package processor

import (
"fmt"
"time"

"github.com/Jeffail/benthos/lib/log"
"github.com/Jeffail/benthos/lib/metrics"
"github.com/Jeffail/benthos/lib/types"
)

//------------------------------------------------------------------------------

func init() {
Constructors[TypeThrottle] = TypeSpec{
constructor: NewThrottle,
description: `
Throttles the throughput of a pipeline to a maximum of one message batch per
period. This throttle is per processing pipeline, and therefore four threads
each with a throttle would result in four times the rate specified.
The period should be specified as a time duration string. For example, '1s'
would be 1 second, '10ms' would be 10 milliseconds, etc.`,
}
}

//------------------------------------------------------------------------------

// ThrottleConfig contains configuration fields for the Throttle processor.
type ThrottleConfig struct {
Period string `json:"period" yaml:"period"`
}

// NewThrottleConfig returns a ThrottleConfig with default values.
func NewThrottleConfig() ThrottleConfig {
return ThrottleConfig{
Period: "100us",
}
}

//------------------------------------------------------------------------------

// Throttle is a processor that limits the stream of a pipeline to one message
// batch per period specified.
type Throttle struct {
conf Config
log log.Modular
stats metrics.Type

duration time.Duration
lastBatch time.Time

mCount metrics.StatCounter
mSent metrics.StatCounter
mSentParts metrics.StatCounter
}

// NewThrottle returns a Throttle processor.
func NewThrottle(
conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error) {
t := &Throttle{
conf: conf,
log: log.NewModule(".processor.throttle"),
stats: stats,

mCount: stats.GetCounter("processor.throttle.count"),
mSent: stats.GetCounter("processor.throttle.sent"),
mSentParts: stats.GetCounter("processor.throttle.parts.sent"),
}

var err error
if t.duration, err = time.ParseDuration(conf.Throttle.Period); err != nil {
return nil, fmt.Errorf("failed to parse period: %v", err)
}

return t, nil
}

//------------------------------------------------------------------------------

// ProcessMessage applies the processor to a message, either creating >0
// resulting messages or a response to be sent back to the message source.
func (m *Throttle) ProcessMessage(msg types.Message) ([]types.Message, types.Response) {
m.mCount.Incr(1)

if since := time.Since(m.lastBatch); m.duration > since {
time.Sleep(m.duration - since)
}

m.lastBatch = time.Now()

m.mSent.Incr(1)
m.mSentParts.Incr(int64(msg.Len()))
msgs := [1]types.Message{msg}
return msgs[:], nil
}

//------------------------------------------------------------------------------
Loading

0 comments on commit 753d4b3

Please sign in to comment.