Skip to content

Commit

Permalink
Cluster etcd config (#647)
Browse files Browse the repository at this point in the history
* cluster config only support etcd

* fully delete previous cluster config in option

* update test

* add more test

* add more test
  • Loading branch information
suchen-sci authored Jun 7, 2022
1 parent e409b6e commit b503f42
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 438 deletions.
13 changes: 12 additions & 1 deletion example/primary-single/conf/config.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
name: primary-single
cluster-name: cluster-test
cluster-role: primary
cluster:
listen-peer-urls:
- https://localhost:2380
listen-client-urls:
- https://localhost:2379
advertise-client-urls:
- https://localhost:2379
initial-advertise-peer-urls:
- https://localhost:2380
initial-cluster:
- primary-single: https://localhost:2380
api-addr: 127.0.0.1:12381
data-dir: ./data
wal-dir: ""
cpu-profile-file:
memory-profile-file:
log-dir: ./log
debug: false
debug: false
95 changes: 2 additions & 93 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,6 @@ func (c *cluster) getReady() error {
return nil
}

if !c.opt.UseInitialCluster() && !c.opt.ForceNewCluster && c.members != nil && c.members.knownMembersLen() > 1 {
client, _ := c.getClient()
if client != nil {
err := c.addSelfToCluster()
if err != nil {
logger.Errorf("add self to cluster failed: %v", err)
}
}
}

done, timeout, err := c.startServer()
if err != nil {
return fmt.Errorf("start server failed: %v", err)
Expand Down Expand Up @@ -283,81 +273,6 @@ func (c *cluster) getReady() error {
return nil
}

func (c *cluster) addSelfToCluster() error {
client, err := c.getClient()
if err != nil {
return err
}

respList, err := func() (*clientv3.MemberListResponse, error) {
ctx, cancel := c.requestContext()
defer cancel()
return client.MemberList(ctx)
}()
if err != nil {
return err
}

self := c.members.self()

found := false
for _, member := range respList.Members {
// Reference: https://github.com/etcd-io/etcd/blob/b7bf33bf5d1cbb1092b542fc4f3cdc911ccc3eaa/etcdctl/ctlv3/command/printer.go#L164-L167
if len(member.Name) == 0 {
_, err := func() (*clientv3.MemberRemoveResponse, error) {
ctx, cancel := c.requestContext()
defer cancel()
return client.MemberRemove(ctx, member.ID)
}()
if err != nil {
err = fmt.Errorf("remove unhealthy etcd member %x failed: %v",
member.ID, err)
panic(err)
} else {
logger.Warnf("remove unhealthy etcd member %x for adding self to cluster",
member.ID)
}
}

if self.Name == member.Name && self.ID == member.ID {
found = true
break
} else if self.Name == member.Name && self.ID != member.ID {
err := fmt.Errorf("conflict id with same name %s: local(%x) != existed(%x). "+
"purge this node, clean data directory, and rejoin it back",
self.Name, self.ID, member.ID)
logger.Errorf("%v", err)
panic(err)
} else if self.ID == member.ID && self.Name != member.Name {
err := fmt.Errorf("conflict name with same id %x: local(%s) != existed(%s). "+
"purge this node, clean data directory, and rejoin it back",
self.ID, self.Name, member.Name)
logger.Errorf("%v", err)
panic(err)
}
}

if !found {
err := c.checkClusterName()
if err != nil {
return err
}

respAdd, err := func() (*clientv3.MemberAddResponse, error) {
ctx, cancel := c.requestContext()
defer cancel()
return client.MemberAdd(ctx, c.opt.ClusterInitialAdvertisePeerURLs)
}()
if err != nil {
return fmt.Errorf("add member failed: %v", err)
}
logger.Infof("add %s to member list", self.Name)
c.members.updateClusterMembers(respAdd.Members)
}

return nil
}

// checkClusterName checks if the local configured cluster name
// matches the existed cluster name in etcd.
// This function returns error if it can't check,
Expand Down Expand Up @@ -675,14 +590,8 @@ func (c *cluster) startServer() (done, timeout chan struct{}, err error) {
close(done)
return done, timeout, nil
}
var (
etcdConfig *embed.Config
)
if c.opt.UseInitialCluster() {
etcdConfig, err = CreateStaticClusterEtcdConfig(c.opt)
} else {
etcdConfig, err = CreateEtcdConfig(c.opt, c.members)
}

etcdConfig, err := CreateStaticClusterEtcdConfig(c.opt)
if err != nil {
return nil, nil, err
}
Expand Down
95 changes: 15 additions & 80 deletions pkg/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,63 +34,6 @@ import (
"github.com/stretchr/testify/assert"
)

func mockClusters(count int) []*cluster {
opts, _, _ := mockMembers(count)

clusters := make([]*cluster, count)

bootCluster, err := New(opts[0])
if err != nil {
panic(fmt.Errorf("new cluster failed: %v", err))
}
clusters[0] = bootCluster.(*cluster)

time.Sleep(HeartbeatInterval)

for i := 1; i < count; i++ {
opts[i].ClusterJoinURLs = opts[0].ClusterListenPeerURLs

cls, err := New(opts[i])

if err != nil {
totalRetryTime := time.After(60 * time.Second)
Loop:
for {
if err == nil {
break
}
select {
case <-totalRetryTime:
break Loop

case <-time.After(HeartbeatInterval):
cls, err = New(opts[i])
}
}

}
if err != nil {
panic(fmt.Errorf("new cluster failed: %v", err))
}

c := cls.(*cluster)

for {
_, err := c.getClient()
time.Sleep(HeartbeatInterval)
if err != nil {
continue
} else {
break
}
}

clusters[i] = c
}

return clusters
}

func mockStaticCluster(count int) []*cluster {
opts, _, _ := mockStaticClusterMembers(count)

Expand Down Expand Up @@ -140,7 +83,7 @@ func closeClusters(clusters []*cluster) {
func createSecondaryNode(clusterName string, primaryListenPeerURLs []string) *cluster {
ports, err := freeport.GetFreePorts(1)
check(err)
name := fmt.Sprintf("secondary-member-x")
name := "secondary-member-x"
opt := option.New()
opt.Name = name
opt.ClusterName = clusterName
Expand All @@ -160,20 +103,12 @@ func createSecondaryNode(clusterName string, primaryListenPeerURLs []string) *cl
}

func TestCluster(t *testing.T) {
t.Run("start cluster dynamically", func(t *testing.T) {
clusters := mockClusters(3)
defer closeClusters(clusters)
// for testing longRequestContext()
clusters[0].longRequestContext()
})
t.Run("start static sized cluster", func(t *testing.T) {
clusterNodes := mockStaticCluster(3)
primaryName := clusterNodes[0].opt.ClusterName
primaryAddress := clusterNodes[0].opt.Cluster.InitialAdvertisePeerURLs
secondaryNode := createSecondaryNode(primaryName, primaryAddress)
defer closeClusters(clusterNodes)
defer closeClusters([]*cluster{secondaryNode})
})
clusterNodes := mockStaticCluster(3)
primaryName := clusterNodes[0].opt.ClusterName
primaryAddress := clusterNodes[0].opt.Cluster.InitialAdvertisePeerURLs
secondaryNode := createSecondaryNode(primaryName, primaryAddress)
defer closeClusters(clusterNodes)
defer closeClusters([]*cluster{secondaryNode})
}

func TestLease(t *testing.T) {
Expand Down Expand Up @@ -202,26 +137,26 @@ func TestClusterStart(t *testing.T) {
c := cls.(*cluster)

_, _, err = c.StartServer()

if err != nil {
t.Errorf("start server failed, %v", err)
}
}

func TestClusterPurgeMember(t *testing.T) {
assert := assert.New(t)
opts, _, _ := mockMembers(2)

cls, err := New(opts[0])
go func() {
_, err := New(opts[1])
assert.Nil(err)
}()

if err != nil {
t.Errorf("init failed: %v", err)
}
cls, err := New(opts[0])
assert.Nil(err)

c := cls.(*cluster)
err = c.PurgeMember("no-member")
if err == nil {
t.Errorf("purge a none exit member, should be failed")
}
assert.NotNil(err)
}

func TestClusterSyncer(t *testing.T) {
Expand Down
73 changes: 0 additions & 73 deletions pkg/cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package cluster

import (
"fmt"
"net/url"
"path/filepath"

Expand Down Expand Up @@ -110,75 +109,3 @@ func CreateStaticClusterEtcdConfig(opt *option.Options) (*embed.Config, error) {

return ec, nil
}

// CreateEtcdConfig creates an embedded etcd config that starts the cluster by adding one member at a time. Deprecated: Use CreateStaticClusterEtcdConfig instead.
func CreateEtcdConfig(opt *option.Options, members *members) (*embed.Config, error) {
ec := embed.NewConfig()

var (
clientURLs []url.URL
peerURLs []url.URL
clientAdURLs []url.URL
peerAdURLs []url.URL
)
clientURLs, err := option.ParseURLs(opt.ClusterListenClientURLs)
if err != nil {
return nil, err
}
peerURLs, err = option.ParseURLs(opt.ClusterListenPeerURLs)
if err != nil {
return nil, err
}
clientAdURLs, err = option.ParseURLs(opt.ClusterAdvertiseClientURLs)
if err != nil {
return nil, err
}
peerAdURLs, err = option.ParseURLs(opt.ClusterInitialAdvertisePeerURLs)
if err != nil {
return nil, err
}

ec.Name = opt.Name

ec.Dir = opt.AbsDataDir
ec.WalDir = opt.AbsWALDir
ec.InitialClusterToken = opt.ClusterName
ec.EnableV2 = false
ec.LCUrls = clientURLs
ec.ACUrls = clientAdURLs
ec.LPUrls = peerURLs
ec.APUrls = peerAdURLs
ec.AutoCompactionMode = autoCompactionMode
ec.AutoCompactionRetention = autoCompactionRetention
ec.QuotaBackendBytes = quotaBackendBytes
ec.MaxTxnOps = maxTxnOps
ec.MaxRequestBytes = maxRequestBytes
ec.SnapshotCount = snapshotCount
ec.Logger = "zap"
ec.LogOutputs = []string{common.NormalizeZapLogPath(filepath.Join(opt.AbsLogDir, logFilename))}

ec.ClusterState = embed.ClusterStateFlagExisting
if opt.ForceNewCluster {
ec.ClusterState = embed.ClusterStateFlagNew
ec.ForceNewCluster = true
self := members.self()
ec.InitialCluster = fmt.Sprintf("%s=%s", self.Name, self.PeerURL)
} else {
if len(opt.ClusterJoinURLs) == 0 {
if members.clusterMembersLen() == 1 &&
common.IsDirEmpty(opt.AbsDataDir) {
ec.ClusterState = embed.ClusterStateFlagNew
}
} else if members.clusterMembersLen() == 1 {
return nil, fmt.Errorf("join mode with only one cluster member: %v",
*members.ClusterMembers)
}
ec.InitialCluster = members.initCluster()
}

logger.Infof("etcd config: advertise-client-urls: %+v advertise-peer-urls: %+v init-cluster: %s cluster-state: %s force-new-cluster: %v",
ec.ACUrls, ec.APUrls,
ec.InitialCluster, ec.ClusterState, ec.ForceNewCluster)

return ec, nil
}
23 changes: 0 additions & 23 deletions pkg/cluster/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,6 @@ import (
func TestCreateEtcdConfigFailures(t *testing.T) {
testData := make([]*option.Options, 0)
testData = append(testData, mockTestOpt())
testData[len(testData)-1].ClusterListenClientURLs = []string{"::::::"}
testData = append(testData, mockTestOpt())
testData[len(testData)-1].ClusterListenPeerURLs = []string{"::::::"}
testData = append(testData, mockTestOpt())
testData[len(testData)-1].ClusterAdvertiseClientURLs = []string{"::::::"}
testData = append(testData, mockTestOpt())
testData[len(testData)-1].ClusterInitialAdvertisePeerURLs = []string{"::::::"}

for i, opt := range testData {
t.Run(fmt.Sprintf("CreateEtcdConfig: options invalid url i=%d", i), func(t *testing.T) {
membersInstance, _ := newMembers(opt)
_, err := CreateEtcdConfig(opt, membersInstance)
if err == nil {
t.Error("There should be an error")
}
if !strings.Contains(err.Error(), "missing protocol scheme") {
t.Error("Error should contain missing protocol scheme")
}
})
}

testData = make([]*option.Options, 0)
testData = append(testData, mockTestOpt())
testData[len(testData)-1].Cluster.ListenClientURLs = []string{"::::::"}
testData = append(testData, mockTestOpt())
testData[len(testData)-1].Cluster.ListenPeerURLs = []string{"::::::"}
Expand Down
3 changes: 1 addition & 2 deletions pkg/cluster/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ const (
type (
// Layout represents storage tree layout.
Layout struct {
memberName string
statusMemberKey string
memberName string
}
)

Expand Down
Loading

0 comments on commit b503f42

Please sign in to comment.