Skip to content

Commit

Permalink
gee-rpc: doc add day5 day6 day7
Browse files Browse the repository at this point in the history
  • Loading branch information
geektutu committed Oct 8, 2020
1 parent ac6ac7b commit 030fa31
Show file tree
Hide file tree
Showing 14 changed files with 1,251 additions and 35 deletions.
3 changes: 2 additions & 1 deletion gee-rpc/day6-load-balance/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func foo(xc *xclient.XClient, ctx context.Context, typ, serviceMethod string, ar
if err != nil {
log.Printf("%s %s error: %v", typ, serviceMethod, err)
} else {
log.Printf("%s Foo.Sum success: %d + %d = %d", typ, args.Num1, args.Num2, reply)
log.Printf("%s %s success: %d + %d = %d", typ, serviceMethod, args.Num1, args.Num2, reply)
}
}

Expand All @@ -69,6 +69,7 @@ func call(addr1, addr2 string) {
func broadcast(addr1, addr2 string) {
d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer func() { _ = xc.Close() }()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
Expand Down
14 changes: 9 additions & 5 deletions gee-rpc/day6-load-balance/xclient/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package xclient

import (
"errors"
"math"
"math/rand"
"sync"
"time"
Expand Down Expand Up @@ -49,15 +50,16 @@ func (d *MultiServersDiscovery) Update(servers []string) error {
func (d *MultiServersDiscovery) Get(mode SelectMode) (string, error) {
d.mu.Lock()
defer d.mu.Unlock()
if len(d.servers) == 0 {
n := len(d.servers)
if n == 0 {
return "", errors.New("rpc discovery: no available servers")
}
switch mode {
case RandomSelect:
return d.servers[d.r.Intn(len(d.servers))], nil
return d.servers[d.r.Intn(n)], nil
case RoundRobinSelect:
s := d.servers[d.index]
d.index = (d.index + 1) % len(d.servers)
s := d.servers[d.index%n] // servers could be updated, so mode n to ensure safety
d.index = (d.index + 1) % n
return s, nil
default:
return "", errors.New("rpc discovery: not supported select mode")
Expand All @@ -76,8 +78,10 @@ func (d *MultiServersDiscovery) GetAll() ([]string, error) {

// NewMultiServerDiscovery creates a MultiServersDiscovery instance
func NewMultiServerDiscovery(servers []string) *MultiServersDiscovery {
return &MultiServersDiscovery{
d := &MultiServersDiscovery{
servers: servers,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
d.index = d.r.Intn(math.MaxInt32 - 1)
return d
}
11 changes: 6 additions & 5 deletions gee-rpc/day7-registry/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"context"
"geerpc"
registy "geerpc/registry"
"geerpc/registry"
"geerpc/xclient"
"log"
"net"
Expand All @@ -29,17 +29,17 @@ func (f Foo) Sleep(args Args, reply *int) error {

func startRegistry(wg *sync.WaitGroup) {
l, _ := net.Listen("tcp", ":9999")
registy.HandleHTTP()
registry.HandleHTTP()
wg.Done()
_ = http.Serve(l, nil)
}

func startServer(registry string, wg *sync.WaitGroup) {
func startServer(registryAddr string, wg *sync.WaitGroup) {
var foo Foo
l, _ := net.Listen("tcp", ":0")
server := geerpc.NewServer()
_ = server.Register(&foo)
registy.Heartbeat(registry, "tcp@"+l.Addr().String(), 0)
registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)
wg.Done()
server.Accept(l)
}
Expand All @@ -56,7 +56,7 @@ func foo(xc *xclient.XClient, ctx context.Context, typ, serviceMethod string, ar
if err != nil {
log.Printf("%s %s error: %v", typ, serviceMethod, err)
} else {
log.Printf("%s Foo.Sum success: %d + %d = %d", typ, args.Num1, args.Num2, reply)
log.Printf("%s %s success: %d + %d = %d", typ, serviceMethod, args.Num1, args.Num2, reply)
}
}

Expand All @@ -79,6 +79,7 @@ func call(registry string) {
func broadcast(registry string) {
d := xclient.NewGeeRegistryDiscovery(registry, 0)
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer func() { _ = xc.Close() }()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
Expand Down
36 changes: 18 additions & 18 deletions gee-rpc/day7-registry/registry/registry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package registy
package registry

import (
"log"
Expand All @@ -9,10 +9,10 @@ import (
"time"
)

// Registry is a simple register center, provide following functions.
// GeeRegistry is a simple register center, provide following functions.
// add a server and receive heartbeat to keep it alive.
// returns all alive servers and delete dead servers sync simultaneously.
type Registry struct {
type GeeRegistry struct {
timeout time.Duration
mu sync.Mutex // protect following
servers map[string]*ServerItem
Expand All @@ -23,17 +23,22 @@ type ServerItem struct {
start time.Time
}

const (
defaultPath = "/_geerpc_/registry"
defaultTimeout = time.Minute * 5
)

// New create a registry instance with timeout setting
func New(timeout time.Duration) *Registry {
return &Registry{
func New(timeout time.Duration) *GeeRegistry {
return &GeeRegistry{
servers: make(map[string]*ServerItem),
timeout: timeout,
}
}

var DefaultRegister = New(defaultTimeout)
var DefaultGeeRegister = New(defaultTimeout)

func (r *Registry) putServer(addr string) {
func (r *GeeRegistry) putServer(addr string) {
r.mu.Lock()
defer r.mu.Unlock()
s := r.servers[addr]
Expand All @@ -44,7 +49,7 @@ func (r *Registry) putServer(addr string) {
}
}

func (r *Registry) aliveServers() []string {
func (r *GeeRegistry) aliveServers() []string {
r.mu.Lock()
defer r.mu.Unlock()
var alive []string
Expand All @@ -59,13 +64,8 @@ func (r *Registry) aliveServers() []string {
return alive
}

const (
defaultPath = "/_geerpc_/registry"
defaultTimeout = time.Minute * 5
)

// Runs at /_geerpc_/registry
func (r *Registry) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case "GET":
// keep it simple, server is in req.Header
Expand All @@ -83,14 +83,14 @@ func (r *Registry) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}

// HandleHTTP registers an HTTP handler for Registry messages on registryPath
func (r *Registry) HandleHTTP(registryPath string) {
// HandleHTTP registers an HTTP handler for GeeRegistry messages on registryPath
func (r *GeeRegistry) HandleHTTP(registryPath string) {
http.Handle(registryPath, r)
log.Println("rpc registry path:", registryPath)
}

func HandleHTTP() {
DefaultRegister.HandleHTTP(defaultPath)
DefaultGeeRegister.HandleHTTP(defaultPath)
}

// Heartbeat send a heartbeat message every once in a while
Expand All @@ -113,7 +113,7 @@ func Heartbeat(registry, addr string, duration time.Duration) {
}

func sendHeartbeat(registry, addr string) error {
log.Println(addr, "send heart beat to registry")
log.Println(addr, "send heart beat to registry", registry)
httpClient := &http.Client{}
req, _ := http.NewRequest("POST", registry, nil)
req.Header.Set("X-Geerpc-Server", addr)
Expand Down
5 changes: 4 additions & 1 deletion gee-rpc/day7-registry/xclient/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package xclient

import (
"errors"
"math"
"math/rand"
"sync"
"time"
Expand Down Expand Up @@ -77,8 +78,10 @@ func (d *MultiServersDiscovery) GetAll() ([]string, error) {

// NewMultiServerDiscovery creates a MultiServersDiscovery instance
func NewMultiServerDiscovery(servers []string) *MultiServersDiscovery {
return &MultiServersDiscovery{
d := &MultiServersDiscovery{
servers: servers,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
d.index = d.r.Intn(math.MaxInt32 - 1)
return d
}
4 changes: 2 additions & 2 deletions gee-rpc/doc/geerpc-day1.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ github: https://github.com/geektutu/7days-golang

![golang RPC framework](geerpc/geerpc.jpg)

本文是[7天用Go从零实现RPC框架GeeRPC]的第一篇。
本文是[7天用Go从零实现RPC框架GeeRPC](https://geektutu.com/post/geerpc.html)的第一篇。

- 使用 `encoding/gob` 实现消息的编解码(序列化与反序列化)
- 实现一个简易的服务端,仅接受消息,不处理,代码约 200 行
Expand Down Expand Up @@ -54,7 +54,7 @@ type Header struct {
- Error 是错误信息,客户端置为空,服务端如果如果发生错误,将错误信息置于 Error 中。


我们将和消息编解码相关的代码都防到 codec 目录中
我们将和消息编解码相关的代码都防到 codec 子目录中,在此之前,还需要在根目录下使用 `go mod init geerpc` 初始化项目,方便后续子 package 之间的引用

进一步,抽象出对消息体进行编解码的接口 Codec,抽象出接口是为了实现不同的 Codec 实例:

Expand Down
2 changes: 1 addition & 1 deletion gee-rpc/doc/geerpc-day2.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ github: https://github.com/geektutu/7days-golang

![golang RPC framework](geerpc/geerpc.jpg)

本文是[7天用Go从零实现RPC框架GeeRPC]的第二篇。
本文是[7天用Go从零实现RPC框架GeeRPC](https://geektutu.com/post/geerpc.html)的第二篇。

- 实现一个支持异步和并发的高性能客户端,代码约 250 行

Expand Down
2 changes: 1 addition & 1 deletion gee-rpc/doc/geerpc-day3.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ github: https://github.com/geektutu/7days-golang

![golang RPC framework](geerpc/geerpc.jpg)

本文是[7天用Go从零实现RPC框架GeeRPC]的第三篇。
本文是[7天用Go从零实现RPC框架GeeRPC](https://geektutu.com/post/geerpc.html)的第三篇。

- 通过反射实现服务注册功能
- 在服务端实现服务调用,代码约 150 行
Expand Down
2 changes: 1 addition & 1 deletion gee-rpc/doc/geerpc-day4.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ github: https://github.com/geektutu/7days-golang

![golang RPC framework](geerpc/geerpc.jpg)

本文是[7天用Go从零实现RPC框架GeeRPC]的第四篇。
本文是[7天用Go从零实现RPC框架GeeRPC](https://geektutu.com/post/geerpc.html)的第四篇。

- 增加连接超时的处理机制
- 增加服务端处理超时的处理机制,代码约 100 行
Expand Down
Loading

0 comments on commit 030fa31

Please sign in to comment.