diff --git a/.gitignore b/.gitignore index 53f77fb..984a739 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .draft/ *.jpg -*.png \ No newline at end of file +*.png +*.exe \ No newline at end of file diff --git a/consistenthash/consistenthash.go b/consistenthash/consistenthash.go index 964fb2a..c8199c9 100644 --- a/consistenthash/consistenthash.go +++ b/consistenthash/consistenthash.go @@ -1,7 +1,6 @@ package consistenthash import ( - "fmt" "hash/crc32" "sort" "strconv" @@ -35,7 +34,7 @@ func (m *Map) Add(keys ...string) { for _, key := range keys { for i := 0; i < m.replicas; i++ { hash := int(m.hash([]byte(strconv.Itoa(i) + key))) - fmt.Printf("key: %v, i: %v, hash: %v\n", key, i, hash) + // fmt.Printf("key: %v, i: %v, hash: %v\n", key, i, hash) m.keys = append(m.keys, hash) m.hashMap[hash] = key } @@ -43,7 +42,7 @@ func (m *Map) Add(keys ...string) { sort.Ints(m.keys) } -func(m *Map) Get(key string) string { +func (m *Map) Get(key string) string { if len(m.keys) == 0 { return "" } @@ -56,4 +55,13 @@ func(m *Map) Get(key string) string { }) // 得到真实节点 return m.hashMap[m.keys[idx%len(m.keys)]] -} \ No newline at end of file +} + +// func (m *Map) Remove(key string) { +// for i := 0; i < m.replicas; i++ { +// hash := int(m.hash([]byte(strconv.Itoa(i) + key))) +// idx := sort.SearchInts(m.keys, hash) +// m.keys = append(m.keys[:idx], m.keys[idx+1:]...) +// delete(m.hashMap, hash) +// } +// } diff --git a/geecache.go b/geecache.go index 09058e9..6982ff7 100644 --- a/geecache.go +++ b/geecache.go @@ -28,6 +28,7 @@ type Group struct { name string getter Getter mainCache cache + peers PeerPicker } var ( @@ -59,6 +60,14 @@ func GetGroup(name string) *Group { return g } +// RegisterPeers 将实现了 PeerPicker 接口的 HTTPPool 注入到 Group 中 +func (g *Group) RegisterPeers(peers PeerPicker) { + if g.peers != nil { + panic("RegisterPeerPicker called more than once") + } + g.peers = peers +} + func (g *Group) Get(key string) (ByteView, error) { if key == "" { return ByteView{}, fmt.Errorf("key is required") @@ -73,9 +82,27 @@ func (g *Group) Get(key string) (ByteView, error) { } func (g *Group) load(key string) (value ByteView, err error) { + if g.peers != nil { + if peer, ok := g.peers.PickPeer(key); ok { + if value, err = g.getFromPeer(peer, key); err == nil { + return value, nil + } + log.Println("[GeeCache] Failed to get from peer", err) + } + } + return g.getLocally(key) } +// getFromPeer 使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值 +func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) { + bytes, err := peer.Get(g.name, key) + if err != nil { + return ByteView{}, err + } + return ByteView{b: bytes}, nil +} + func (g *Group) getLocally(key string) (ByteView, error) { bytes, err := g.getter.Get(key) if err != nil { diff --git a/http.go b/http.go index 6c9e1fc..f005484 100644 --- a/http.go +++ b/http.go @@ -2,17 +2,27 @@ package geecache import ( "fmt" + "geecache/consistenthash" + "io/ioutil" "log" "net/http" + "net/url" "strings" + "sync" ) -const defaultBasePath = "/_geecache/" +const ( + defaultBasePath = "/_geecache/" + defaultReplicas = 50 +) // HTTPPool implements PeerPicker for a pool of HTTP peers. type HTTPPool struct { - self string // 记录自己的地址,包括主机名/ IP 和端口 - basePath string // 节点间通讯地址的前缀 + self string // 记录自己的地址,包括主机名/ IP 和端口 + basePath string // 节点间通讯地址的前缀 + mu sync.Mutex // guards peers and httpGetters + peers *consistenthash.Map + httpGetters map[string]*httpGetter } // NewHTTPPool initializes an HTTP pool of peers. @@ -61,3 +71,59 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Write(view.ByteSlice()) // log.Println(view.ByteSlice()) } + +// Set updates the pool's list of peers +func (p *HTTPPool) Set(peers ...string) { + p.mu.Lock() + defer p.mu.Unlock() + p.peers = consistenthash.New(defaultReplicas, nil) + p.peers.Add(peers...) + p.httpGetters = make(map[string]*httpGetter, len(peers)) + for _, peer := range peers { + p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath} + } +} + +// PickPeer picks a peer according to key +func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) { + p.mu.Lock() + defer p.mu.Unlock() + if peer := p.peers.Get(key); peer != "" && peer != p.self { + p.Log("Pick peer %s", peer) + return p.httpGetters[peer], true + } + return nil, false +} + +var _ PeerPicker = (*HTTPPool)(nil) + +type httpGetter struct { + baseURL string +} + +func (h *httpGetter) Get(group string, key string) ([]byte, error) { + u := fmt.Sprintf( + "%v%v/%v", + h.baseURL, + url.QueryEscape(group), + url.QueryEscape(key), + ) + res, err := http.Get(u) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("server returned: %v", res.Status) + } + + bytes, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("reading response body: %v", err) + } + + return bytes, nil +} + +var _ PeerGetter = (*httpGetter)(nil) diff --git a/main/main.go b/main/main.go index 4c246ec..ceda768 100644 --- a/main/main.go +++ b/main/main.go @@ -1,6 +1,7 @@ package main import ( + "flag" "fmt" "geecache" "log" @@ -13,19 +14,67 @@ var db = map[string]string{ "Sam": "567", } -func main() { - geecache.NewGroup("scores", 2<<10, geecache.GetterFunc( +func createGroup() *geecache.Group { + return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc( func(key string) ([]byte, error) { log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { return []byte(v), nil } return nil, fmt.Errorf("%s not exist", key) - }, - )) + })) +} - addr := "localhost:9999" +// startCacheServer 用来启动缓存服务器:创建 HTTPPool,添加节点信息,注册到 gee 中, +//启动 HTTP 服务(共3个端口,8001/8002/8003),用户不感知 +func startCacheServer(addr string, addrs []string, gee *geecache.Group) { peers := geecache.NewHTTPPool(addr) + peers.Set(addrs...) + gee.RegisterPeers(peers) log.Println("geecache is running at", addr) - log.Fatal(http.ListenAndServe(addr, peers)) + log.Fatal(http.ListenAndServe(addr[7:], peers)) +} + +// startApiServer 用来启动一个 API 服务(端口 9999),与用户进行交互,用户感知 +func startApiServer(apiAddr string, gee *geecache.Group) { + http.Handle("/api", http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + key := r.URL.Query().Get("key") + view, err := gee.Get(key) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/octet-stream") + w.Write(view.ByteSlice()) + })) + + log.Println("api server is running at", apiAddr) + log.Fatal(http.ListenAndServe(apiAddr[7:], nil)) +} + +func main() { + var port int + var api bool + flag.IntVar(&port, "port", 8001, "GeeCache server port") + flag.BoolVar(&api, "api", false, "Start a api server?") + flag.Parse() + + apiAddr := "http://localhost:9999" + addrMap := map[int]string{ + 8001: "http://localhost:8001", + 8002: "http://localhost:8002", + 8003: "http://localhost:8003", + } + + var addrs []string + for _, v := range addrMap { + addrs = append(addrs, v) + } + + gee := createGroup() + if api { + go startApiServer(apiAddr, gee) + } + startCacheServer(addrMap[port], []string(addrs), gee) } diff --git a/peers.go b/peers.go new file mode 100644 index 0000000..d6ceb7b --- /dev/null +++ b/peers.go @@ -0,0 +1,11 @@ +package geecache + +type PeerPicker interface { + // 根据传入的 key 选择相应节点 PeerGetter + PickPeer(key string) (peer PeerGetter, ok bool) +} + +type PeerGetter interface { + // 从对应 group 查找缓存值 + Get(group string, key string) ([]byte, error) +}