Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request Buffer #75

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open

Conversation

vermapratyush
Copy link

@vermapratyush vermapratyush commented Sep 26, 2017

Adds a buffer on requests before throwing ErrMaxConcurrency.
The Timeout is still adhered to, timer starts when the request is submitted to hystrix-go, not when execution starts. This basically implements MaxQueueSize as present in the Netflix's Hystrix

The default value of MaxQueueSize is 50 (5 * DefaultMaxConcurrency), although can be overridden when initialising circuit.

In addition to the request buffer, the PR includes a different way to solving for #67 . It uses channels instead of sync.Once as it makes the Go() function simpler. Also some go-lint fixes.

We have been running this in production at GrabTaxi for a while, and there seems to be no side effects.

})
if err != nil {
return err
}
err = sh.writeToRequests(eventBytes)
_ = sh.writeToRequests(eventBytes)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably return sh.writeToRequests(eventBytes) instead of ignoring the error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link

@tonyghita tonyghita left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really promising! I hope it gets merged soon.

.gitignore Outdated
@@ -1 +1,3 @@
.vagrant
*.iml

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IDE-specific ignore lines probably belong in your global .gitignore: https://help.github.com/articles/ignoring-files/#create-a-global-gitignore

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Timeout: 1000,
MaxConcurrentRequests: 100,
ErrorPercentThreshold: 25,
QueueSizeRejectionThreshold: 100,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you pick a good value for QueueSizeRejectionThreshold?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of QueueSizeRejectionThreshold is currently equal to the MaxConcurrentRequests. This should take care of request spike which is 2x the usual.
2x seems to be a decent number, although in some of the use-cases I have seen 3x-4x as well (in my workplace).
Netflix also use a number equal to MaxConcurrentRequests, although I am open to suggestion for a better default value.

@@ -45,6 +45,8 @@ func (m *metricCollectorRegistry) Register(initMetricCollector func(string) Metr
type MetricCollector interface {
// IncrementAttempts increments the number of updates.
IncrementAttempts()
// IncrementQueueSize increments the number of elements in the queue.
IncrementQueueSize()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would break any external implementations of this interface. Should probably start managing releases as suggested by #70 before merging this

Copy link
Owner

@afex afex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first let me thank you for this patch. it is a welcome change and i appreciate the time you've spent on it. i'm sorry i took so long to tackle this review.

before i can merge it, however, you need to address the regression presented by the addition of the queued event, as well as the default value for the queue size.

@@ -80,12 +86,18 @@ func ConfigureCommand(name string, config CommandConfig) {
errorPercent = config.ErrorPercentThreshold
}

queueSizeRejectionThreshold := DefaultQueueSizeRejectionThreshold
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

your PR comment mentions that the default is the same as MaxConcurrentRequests, but in fact it is statically set to 50 here even if the user provides a different concurrency setting.

i like the idea of having the queue size (if unset) be equal to the concurrency setting, which would change the code here to remove DefaultQueueSizeRejectionThreshold and replace it with max

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -16,14 +16,18 @@ var (
DefaultSleepWindow = 5000
// DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests
DefaultErrorPercentThreshold = 50
// DefaultQueueSizeRejectionThreshold reject requests when the queue size exceeds the given limit
DefaultQueueSizeRejectionThreshold = DefaultMaxConcurrent * 5
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend removing this default based on other comment in this file

@@ -95,6 +95,9 @@ func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCo
collector.IncrementAttempts()
collector.IncrementErrors()
}
if update.Types[0] == "queued" {
collector.IncrementQueueSize()
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as described in my comment in hystrix.go, this won't work to accurately track the event rate. a types list of [ queued, failure, fallback-success ] should apply all of:

collector.IncrementQueueSize()
collector.IncrementFailures()
collector.IncrementAttempts()
collector.IncrementErrors()
collector.IncrementFallbackSuccesses()

@@ -45,6 +45,8 @@ func (m *metricCollectorRegistry) Register(initMetricCollector func(string) Metr
type MetricCollector interface {
// IncrementAttempts increments the number of updates.
IncrementAttempts()
// IncrementQueueSize increments the number of elements in the queue.
IncrementQueueSize()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"queue size/length" does not seem like an accurate name since this is better stated as "rate at which executions were queued", or "number of queued events over a time window"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the name to IncrementQueuedItem. I tried to keep it in sync with other function names like IncrementAttempts. Let me know if you think of some other function name to be more appropriate.

returnTicket()
select {
case t := <-circuit.executorPool.WaitingTicket:
cmd.reportEvent("queued")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding a new event type is problematic here. i agree that we should track the rate at which executions are queued, but there is a current assumption being made about the event list for an execution which no longer holds true here.

currently the events []string field of a command is assumed to contain data in a format of:

[ success|failure|rejected|short-circuit|timeout, fallback-success|fallback-failure ]

for example, an events slice containing [ failure, fallback-success ] indicates the execution failed but the fallback did not. changing this to [ queued, failure, fallback-success ] makes sense (execution was queued, then failed, then fell back successfully) but other parts of the code assume the first element in the list indicates the run function's result. this changes that and will break stats reporting as well as closing a circuit after a success.

in order to add this queued event, you'll need to change CircuitBreaker.ReportEvent and metricExchange.IncrementMetrics to account for this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, I have incorporated the change by using a map structure to pass around the events.

@vermapratyush
Copy link
Author

vermapratyush commented Jan 14, 2018

@afex Thanks for the review. I have made the required changes in the PR.

In order to address the regression for buffer queue, I have added comment in loadtest/README.md apart from the unit test which asserts queue length. I have just mentioned that increasing the concurrency in bench tool should validate the buffer implementation.

Changes:

  1. Refactor function name IncrementQueuedItem
  2. Fix bug in events being reported.
  3. Incorporate changes related to queue size.
  4. Added comment to run regression for queue length.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants