Skip to content

Commit

Permalink
✨ feat: Day5 分布式节点
Browse files Browse the repository at this point in the history
  • Loading branch information
p3ddd committed Apr 6, 2022
1 parent 49d5fc2 commit e420e43
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 14 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.draft/

*.jpg
*.png
*.png
*.exe
16 changes: 12 additions & 4 deletions consistenthash/consistenthash.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package consistenthash

import (
"fmt"
"hash/crc32"
"sort"
"strconv"
Expand Down Expand Up @@ -35,15 +34,15 @@ func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
fmt.Printf("key: %v, i: %v, hash: %v\n", key, i, hash)
// fmt.Printf("key: %v, i: %v, hash: %v\n", key, i, hash)
m.keys = append(m.keys, hash)
m.hashMap[hash] = key
}
}
sort.Ints(m.keys)
}

func(m *Map) Get(key string) string {
func (m *Map) Get(key string) string {
if len(m.keys) == 0 {
return ""
}
Expand All @@ -56,4 +55,13 @@ func(m *Map) Get(key string) string {
})
// 得到真实节点
return m.hashMap[m.keys[idx%len(m.keys)]]
}
}

// func (m *Map) Remove(key string) {
// for i := 0; i < m.replicas; i++ {
// hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
// idx := sort.SearchInts(m.keys, hash)
// m.keys = append(m.keys[:idx], m.keys[idx+1:]...)
// delete(m.hashMap, hash)
// }
// }
27 changes: 27 additions & 0 deletions geecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Group struct {
name string
getter Getter
mainCache cache
peers PeerPicker
}

var (
Expand Down Expand Up @@ -59,6 +60,14 @@ func GetGroup(name string) *Group {
return g
}

// RegisterPeers 将实现了 PeerPicker 接口的 HTTPPool 注入到 Group 中
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}

func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
Expand All @@ -73,9 +82,27 @@ func (g *Group) Get(key string) (ByteView, error) {
}

func (g *Group) load(key string) (value ByteView, err error) {
if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok {
if value, err = g.getFromPeer(peer, key); err == nil {
return value, nil
}
log.Println("[GeeCache] Failed to get from peer", err)
}
}

return g.getLocally(key)
}

// getFromPeer 使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{b: bytes}, nil
}

func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
Expand Down
72 changes: 69 additions & 3 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@ package geecache

import (
"fmt"
"geecache/consistenthash"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"sync"
)

const defaultBasePath = "/_geecache/"
const (
defaultBasePath = "/_geecache/"
defaultReplicas = 50
)

// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
self string // 记录自己的地址,包括主机名/ IP 和端口
basePath string // 节点间通讯地址的前缀
self string // 记录自己的地址,包括主机名/ IP 和端口
basePath string // 节点间通讯地址的前缀
mu sync.Mutex // guards peers and httpGetters
peers *consistenthash.Map
httpGetters map[string]*httpGetter
}

// NewHTTPPool initializes an HTTP pool of peers.
Expand Down Expand Up @@ -61,3 +71,59 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Write(view.ByteSlice())
// log.Println(view.ByteSlice())
}

// Set updates the pool's list of peers
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(defaultReplicas, nil)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
}
}

// PickPeer picks a peer according to key
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if peer := p.peers.Get(key); peer != "" && peer != p.self {
p.Log("Pick peer %s", peer)
return p.httpGetters[peer], true
}
return nil, false
}

var _ PeerPicker = (*HTTPPool)(nil)

type httpGetter struct {
baseURL string
}

func (h *httpGetter) Get(group string, key string) ([]byte, error) {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(group),
url.QueryEscape(key),
)
res, err := http.Get(u)
if err != nil {
return nil, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned: %v", res.Status)
}

bytes, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %v", err)
}

return bytes, nil
}

var _ PeerGetter = (*httpGetter)(nil)
61 changes: 55 additions & 6 deletions main/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"flag"
"fmt"
"geecache"
"log"
Expand All @@ -13,19 +14,67 @@ var db = map[string]string{
"Sam": "567",
}

func main() {
geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
func createGroup() *geecache.Group {
return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
},
))
}))
}

addr := "localhost:9999"
// startCacheServer 用来启动缓存服务器:创建 HTTPPool,添加节点信息,注册到 gee 中,
//启动 HTTP 服务(共3个端口,8001/8002/8003),用户不感知
func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
peers := geecache.NewHTTPPool(addr)
peers.Set(addrs...)
gee.RegisterPeers(peers)
log.Println("geecache is running at", addr)
log.Fatal(http.ListenAndServe(addr, peers))
log.Fatal(http.ListenAndServe(addr[7:], peers))
}

// startApiServer 用来启动一个 API 服务(端口 9999),与用户进行交互,用户感知
func startApiServer(apiAddr string, gee *geecache.Group) {
http.Handle("/api", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
view, err := gee.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(view.ByteSlice())
}))

log.Println("api server is running at", apiAddr)
log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
}

func main() {
var port int
var api bool
flag.IntVar(&port, "port", 8001, "GeeCache server port")
flag.BoolVar(&api, "api", false, "Start a api server?")
flag.Parse()

apiAddr := "http:https://localhost:9999"
addrMap := map[int]string{
8001: "http:https://localhost:8001",
8002: "http:https://localhost:8002",
8003: "http:https://localhost:8003",
}

var addrs []string
for _, v := range addrMap {
addrs = append(addrs, v)
}

gee := createGroup()
if api {
go startApiServer(apiAddr, gee)
}
startCacheServer(addrMap[port], []string(addrs), gee)
}
11 changes: 11 additions & 0 deletions peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package geecache

type PeerPicker interface {
// 根据传入的 key 选择相应节点 PeerGetter
PickPeer(key string) (peer PeerGetter, ok bool)
}

type PeerGetter interface {
// 从对应 group 查找缓存值
Get(group string, key string) ([]byte, error)
}

0 comments on commit e420e43

Please sign in to comment.