Skip to content

Commit

Permalink
Fixes a concurrency error in groupcache. Previously, multiple Get
Browse files Browse the repository at this point in the history
requests could simultaneously result in a load().  Only requests that
enter singleflight Do concurrently would be deduped, so it was possible for
populateCache to be called multiple times for the same key.  That
would result in overcounting nbytes, and eventually leading to a
livelock where nbytes > cacheBytes, but there were no entries in the
cache.

Change-Id: I5b304ce82041c1310c31b662486744e86509cc53
  • Loading branch information
tippjammer authored and bradfitz committed Jan 27, 2016
1 parent 604ed57 commit d995f8a
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 1 deletion.
36 changes: 35 additions & 1 deletion groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *G
getter: getter,
peers: peers,
cacheBytes: cacheBytes,
loadGroup: &singleflight.Group{},
}
if fn := newGroupHook; fn != nil {
fn(g)
Expand Down Expand Up @@ -163,12 +164,20 @@ type Group struct {
// loadGroup ensures that each key is only fetched once
// (either locally or remotely), regardless of the number of
// concurrent callers.
loadGroup singleflight.Group
loadGroup flightGroup

// Stats are statistics on the group.
Stats Stats
}

// flightGroup is defined as an interface which flightgroup.Group
// satisfies. We define this so that we may test with an alternate
// implementation.
type flightGroup interface {
// Done is called when Do is done.
Do(key string, fn func() (interface{}, error)) (interface{}, error)
}

// Stats are per-group statistics.
type Stats struct {
Gets AtomicInt // any Get request, including from peers
Expand Down Expand Up @@ -225,6 +234,31 @@ func (g *Group) Get(ctx Context, key string, dest Sink) error {
func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// Check the cache again because singleflight can only dedup calls
// that overlap concurrently. It's possible for 2 concurrent
// requests to miss the cache, resulting in 2 load() calls. An
// unfortunate goroutine scheduling would result in this callback
// being run twice, serially. If we don't check the cache again,
// cache.nbytes would be incremented below even though there will
// be only one entry for this key.
//
// Consider the following serialized event ordering for two
// goroutines in which this callback gets called twice for hte
// same key:
// 1: Get("key")
// 2: Get("key")
// 1: lookupCache("key")
// 2: lookupCache("key")
// 1: load("key")
// 2: load("key")
// 1: loadGroup.Do("key", fn)
// 1: fn()
// 2: loadGroup.Do("key", fn)
// 2: fn()
if value, cacheHit := g.lookupCache(key); cacheHit {
g.Stats.CacheHits.Add(1)
return value, nil
}
g.Stats.LoadsDeduped.Add(1)
var value ByteView
var err error
Expand Down
80 changes: 80 additions & 0 deletions groupcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,5 +363,85 @@ func TestAllocatingByteSliceTarget(t *testing.T) {
}
}

// orderedFlightGroup allows the caller to force the schedule of when
// orig.Do will be called. This is useful to serialize calls such
// that singleflight cannot dedup them.
type orderedFlightGroup struct {
mu sync.Mutex
stage1 chan bool
stage2 chan bool
orig flightGroup
}

func (g *orderedFlightGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
<-g.stage1
<-g.stage2
g.mu.Lock()
defer g.mu.Unlock()
return g.orig.Do(key, fn)
}

// TestNoDedup tests invariants on the cache size when singleflight is
// unable to dedup calls.
func TestNoDedup(t *testing.T) {
const testkey = "testkey"
const testval = "testval"
g := newGroup("testgroup", 1024, GetterFunc(func(_ Context, key string, dest Sink) error {
return dest.SetString(testval)
}), nil)

orderedGroup := &orderedFlightGroup{
stage1: make(chan bool),
stage2: make(chan bool),
orig: g.loadGroup,
}
// Replace loadGroup with our wrapper so we can control when
// loadGroup.Do is entered for each concurrent request.
g.loadGroup = orderedGroup

// Issue two idential requests concurrently. Since the cache is
// empty, it will miss. Both will enter load(), but we will only
// allow one at a time to enter singleflight.Do, so the callback
// function will be called twice.
resc := make(chan string, 2)
for i := 0; i < 2; i++ {
go func() {
var s string
if err := g.Get(dummyCtx, testkey, StringSink(&s)); err != nil {
resc <- "ERROR:" + err.Error()
return
}
resc <- s
}()
}

// Ensure both goroutines have entered the Do routine. This implies
// both concurrent requests have checked the cache, found it empty,
// and called load().
orderedGroup.stage1 <- true
orderedGroup.stage1 <- true
orderedGroup.stage2 <- true
orderedGroup.stage2 <- true

for i := 0; i < 2; i++ {
if s := <-resc; s != testval {
t.Errorf("result is %s want %s", s, testval)
}
}

const wantItems = 1
if g.mainCache.items() != wantItems {
t.Errorf("mainCache has %d items, want %d", g.mainCache.items(), wantItems)
}

// If the singleflight callback doesn't double-check the cache again
// upon entry, we would increment nbytes twice but the entry would
// only be in the cache once.
const wantBytes = int64(len(testkey) + len(testval))
if g.mainCache.nbytes != wantBytes {
t.Errorf("cache has %d bytes, want %d", g.mainCache.nbytes, wantBytes)
}
}

// TODO(bradfitz): port the Google-internal full integration test into here,
// using HTTP requests instead of our RPC system.

0 comments on commit d995f8a

Please sign in to comment.