Skip to content

Commit

Permalink
实现singleflight
Browse files Browse the repository at this point in the history
  • Loading branch information
peanutzhen committed Jul 30, 2021
1 parent 7e8b84c commit f24a9b4
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 8 deletions.
24 changes: 16 additions & 8 deletions peanutcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package peanutcache

import (
"fmt"
"github.com/peanutzhen/peanutcache/singlefilght"
"log"
"sync"
)
Expand Down Expand Up @@ -37,6 +38,7 @@ type Group struct {
cache *cache
retriever Retriever
server Picker
flight *singlefilght.Flight
}

// NewGroup 创建一个新的缓存空间
Expand All @@ -48,6 +50,7 @@ func NewGroup(name string, maxBytes int64, retriever Retriever) *Group {
name: name,
cache: newCache(maxBytes),
retriever: retriever,
flight: &singlefilght.Flight{},
}
mu.Lock()
groups[name] = g
Expand Down Expand Up @@ -84,17 +87,22 @@ func (g *Group) Get(key string) (ByteView, error) {
}

func (g *Group) load(key string) (ByteView, error) {
if g.server != nil {
if fetcher, ok := g.server.Pick(key); ok {
bytes, err := fetcher.Fetch(g.name, key)
if err == nil {
return ByteView{b: cloneBytes(bytes)}, nil
view, err := g.flight.Fly(key, func() (interface{}, error) {
if g.server != nil {
if fetcher, ok := g.server.Pick(key); ok {
bytes, err := fetcher.Fetch(g.name, key);
if err == nil {
return ByteView{b: cloneBytes(bytes)}, nil
}
log.Printf("fail to get *%s* from peer, %s.\n", key, err.Error())
}
log.Printf("fail to get *%s* from peer, %s.\n", key, err.Error())
return ByteView{}, err
}
return g.getLocally(key)
})
if err == nil {
return view.(ByteView), err
}
return g.getLocally(key)
return ByteView{}, err
}

// getLocally 本地向Retriever取回数据并填充缓存
Expand Down
51 changes: 51 additions & 0 deletions singlefilght/singlefilght.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2021 Peanutzhen. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.

package singlefilght

import "sync"

// singlefilght 为peanutcache提供缓存击穿的保护
// 当cache并发访问peer获取缓存时 如果peer未缓存该值
// 则会向db发送大量的请求获取 造成db的压力骤增
// 因此 将所有由key产生的请求抽象成flight
// 这个flight只会起飞一次(single) 这样就可以缓解击穿的可能性
// flight载有我们要的缓存数据 称为packet

type packet struct {
wg sync.WaitGroup
val interface{}
err error
}

type Flight struct {
mu sync.Mutex
flight map[string]*packet
}

// Fly 负责key航班的飞行 fn是获取packet的方法
func (f *Flight) Fly(key string, fn func() (interface{}, error)) (interface{}, error) {
f.mu.Lock()
if f.flight == nil {
f.flight = make(map[string]*packet)
}
if p, ok := f.flight[key]; ok {
f.mu.Unlock()
p.wg.Wait()
return p.val, p.err
}
p := new(packet)
p.wg.Add(1)
f.flight[key] = p
f.mu.Unlock()

p.val, p.err = fn()
p.wg.Done()

f.mu.Lock()
delete(f.flight, key) // 航班已完成
f.mu.Unlock()

return p.val, p.err
}

0 comments on commit f24a9b4

Please sign in to comment.