Skip to content

Commit

Permalink
[option] Adds options to allow operator tunes cluster mode based on n…
Browse files Browse the repository at this point in the history
…etwork and usage

- gossip_interval value
  specify the interval between sending messages that need to be gossiped that haven't been able to piggyback on probing messages in millisecond (default 200)
- gossip_packet_size value
  specify the size of a gossip packet in byte (will be for UDP packet), it depends on your network's MTU (default 4000)
  • Loading branch information
zhiyanliu authored and Jack47 committed Dec 26, 2017
1 parent 457601e commit 64a2c86
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 5 deletions.
4 changes: 2 additions & 2 deletions src/cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Config struct {

GossipNodes, IndirectCheckNodes uint

UDPBufferSize int
UDPBufferSize uint16

MessageSendTimeout, FailedMemberReconnectTimeout, MemberLeftRecordTimeout time.Duration
RecentMemberOperationTimeout time.Duration
Expand Down Expand Up @@ -126,7 +126,7 @@ func createMemberListConfig(conf *Config, eventDelegate memberlist.EventDelegate
DisableTcpPings: false,
DNSConfigPath: "/etc/resolv.conf",
HandoffQueueDepth: 1024,
UDPBufferSize: udpBufferSize,
UDPBufferSize: int(udpBufferSize),
RetransmitMult: gossipRetransmitMult,
SuspicionMult: memberSuspicionMult,
SuspicionMaxTimeoutMult: memberSuspicionMaxTimeoutMult,
Expand Down
7 changes: 5 additions & 2 deletions src/cluster/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func NewGatewayCluster(conf Config, mod *model.Model) (*GatewayCluster, error) {
basisConf.NodeTags[modeTagKey] = conf.ClusterMemberMode.String()
basisConf.BindAddress = option.ClusterHost
basisConf.AdvertiseAddress = option.ClusterHost
basisConf.UDPBufferSize = option.PacketBufferBytes
basisConf.GossipInterval = option.GossipInterval

if common.StrInSlice(basisConf.AdvertiseAddress, []string{"127.0.0.1", "localhost", "0.0.0.0"}) {
return nil, fmt.Errorf("invalid advertise address %s, it should be reachable from peer",
Expand Down Expand Up @@ -139,11 +141,12 @@ func NewGatewayCluster(conf Config, mod *model.Model) (*GatewayCluster, error) {
go gc.dispatch()

if len(conf.Peers) > 0 {
logger.Infof("[start to join peer member(s): %v]", conf.Peers)
logger.Infof("[start to join peer member(s) (total=%d): %s]",
len(conf.Peers), strings.Join(conf.Peers, ", "))

connected, err := basis.Join(conf.Peers)
if err != nil {
logger.Errorf("[join peer member(s) failed, running in standalone mode: %v]", err)
logger.Errorf("[join peer member(s) failed: %v]", err)
} else {
logger.Infof("[peer member(s) joined, connected to %d member(s) totally]", connected)
}
Expand Down
46 changes: 46 additions & 0 deletions src/common/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,52 @@ func (i *Uint32RangeValue) String() string {

////

type Uint16RangeValue struct {
v *uint16
min, max uint16
}

func NewUint16RangeValue(val uint16, p *uint16, min, max uint16) *Uint16RangeValue {
if p == nil {
p = new(uint16)
}
*p = val

return &Uint16RangeValue{
v: p,
min: min,
max: max,
}
}

func (i *Uint16RangeValue) Set(s string) error {
v, err := strconv.ParseUint(s, 0, 16)
if err != nil {
return err
}

v1 := uint16(v)

if v1 < i.min || v1 > i.max {
return fmt.Errorf("value out of range [%d, %d]", i.min, i.max)
}

*i.v = v1
return nil
}

func (i *Uint16RangeValue) Get() interface{} { return *i.v }

func (i *Uint16RangeValue) String() string {
if i.v == nil {
return strconv.FormatUint(0, 10) // zero value
} else {
return strconv.FormatUint(uint64(*i.v), 10)
}
}

////

type StringRegexValue struct {
s *string
re *regexp.Regexp
Expand Down
13 changes: 12 additions & 1 deletion src/option/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var (
OPLogPullInterval time.Duration
OPLogPullTimeout time.Duration
ClusterDefaultOpTimeout time.Duration
PacketBufferBytes uint16
GossipInterval time.Duration

RestHost string
Stage string
Expand Down Expand Up @@ -65,9 +67,16 @@ func init() {
clusterDefaultOpTimeout := new(uint16)
flag.Var(common.NewUint16Value(120, clusterDefaultOpTimeout), "cluster_default_op_timeout",
"specify default timeout of cluster operation in second")
packetBufferBytes := new(uint16)
flag.Var(common.NewUint16RangeValue(4000, packetBufferBytes, 1400, 65500), "gossip_packet_size",
"specify the size of a gossip packet in byte (will be for UDP packet), it depends on your network's MTU")
gossipInterval := new(uint16)
flag.Var(common.NewUint16RangeValue(200, gossipInterval, 50, 60000), "gossip_interval",
"specify the interval between sending messages that need to be gossiped that "+
"haven't been able to piggyback on probing messages in millisecond")

restHost := flag.String("rest_host", "localhost", "specify rest listen host")
stage := flag.String("stage", "debug", "sepcify runtime stage (debug, test, prod)")
stage := flag.String("stage", "debug", "specify runtime stage (debug, test, prod)")
configHome := flag.String("config", common.CONFIG_HOME_DIR, "specify config home path")
logHome := flag.String("log", common.LOG_HOME_DIR, "specify log home path")
cpuProfileFile := flag.String("cpuprofile", "", "specify cpu profile output file, "+
Expand Down Expand Up @@ -106,6 +115,8 @@ func init() {
OPLogPullInterval = time.Duration(*opLogPullInterval) * time.Second
OPLogPullTimeout = time.Duration(*opLogPullTimeout) * time.Second
ClusterDefaultOpTimeout = time.Duration(*clusterDefaultOpTimeout) * time.Second
PacketBufferBytes = *packetBufferBytes
GossipInterval = time.Duration(*gossipInterval) * time.Millisecond
Peers = make([]string, 0)
for _, peer := range strings.Split(*peers, ",") {
peer = strings.TrimSpace(peer)
Expand Down

0 comments on commit 64a2c86

Please sign in to comment.