[lxc-devel] [lxd/master] Support stand-by cluster members

freeekanayaka on Github lxc-bot at linuxcontainers.org
Tue Jan 7 16:51:44 UTC 2020


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20200107/5dbcb0bc/attachment-0001.bin>
-------------- next part --------------
From 378ef995bac92be26e5b1b0fe34f414957ff0f8f Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 08:46:21 +0000
Subject: [PATCH 01/15] Fix typo

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/api_cluster.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index bf96d39774..dab233e518 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -800,7 +800,7 @@ func clusterInitMember(d, client lxd.InstanceServer, memberConfig []api.ClusterM
 }
 
 // Perform a request to the /internal/cluster/accept endpoint to check if a new
-// mode can be accepted into the cluster and obtain joining information such as
+// node can be accepted into the cluster and obtain joining information such as
 // the cluster private certificate.
 func clusterAcceptMember(
 	client lxd.InstanceServer,

From bfd2d4060566f00a82cbe1d307d57274a67885b4 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 09:37:47 +0000
Subject: [PATCH 02/15] Don't add accepted members to the local raft_nodes
 table

It will be populated by heartbeats. Also, use the ID from the nodes table in the
cluster database, so Raft ID and Member ID will match.

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/membership.go | 16 ++++------------
 1 file changed, 4 insertions(+), 12 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index f2bd48eb72..46c8661f3c 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -165,6 +165,8 @@ func Accept(state *state.State, gateway *Gateway, name, address string, schema,
 		return nil, fmt.Errorf("node address must not be empty")
 	}
 
+	var id int64
+
 	// Insert the new node into the nodes table.
 	err := state.Cluster.Transaction(func(tx *db.ClusterTx) error {
 		// Check that the node can be accepted with these parameters.
@@ -181,7 +183,7 @@ func Accept(state *state.State, gateway *Gateway, name, address string, schema,
 		}
 
 		// Add the new node
-		id, err := tx.NodeAddWithArch(name, address, arch)
+		id, err = tx.NodeAddWithArch(name, address, arch)
 		if err != nil {
 			return errors.Wrap(err, "Failed to insert new node into the database")
 		}
@@ -211,17 +213,7 @@ func Accept(state *state.State, gateway *Gateway, name, address string, schema,
 		return nil, errors.Wrap(err, "Fetch cluster members count")
 	}
 	if count != 2 && len(nodes) < membershipMaxRaftNodes {
-		err = state.Node.Transaction(func(tx *db.NodeTx) error {
-			id, err := tx.RaftNodeAdd(address)
-			if err != nil {
-				return err
-			}
-			nodes = append(nodes, db.RaftNode{ID: id, Address: address})
-			return nil
-		})
-		if err != nil {
-			return nil, errors.Wrap(err, "Failed to insert new node into raft_nodes")
-		}
+		nodes = append(nodes, db.RaftNode{ID: id, Address: address})
 	}
 
 	return nodes, nil

From 3548d27cd87fef8de1c013a8347420aead451f10 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 09:40:18 +0000
Subject: [PATCH 03/15] Fix TestAccept test: the second member does *not* get
 the db role

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/membership_test.go | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index 5462f7fc3e..37c28f8dde 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -210,8 +210,9 @@ func TestAccept_UnmetPreconditions(t *testing.T) {
 	}
 }
 
-// When a node gets accepted, it gets included in the raft nodes.
-func TestAccept(t *testing.T) {
+// When there is only one cluster member and a new member gets accepted, the
+// new member doesn't immediately get the database role.
+func TestAccept_ClusterHasOneMember(t *testing.T) {
 	state, cleanup := state.NewTestState(t)
 	defer cleanup()
 
@@ -221,16 +222,14 @@ func TestAccept(t *testing.T) {
 
 	f := &membershipFixtures{t: t, state: state}
 	f.RaftNode("1.2.3.4:666")
-	f.ClusterNode("1.2.3.4:666")
+	f.ClusterFirstNode("1.2.3.4:666")
 
 	nodes, err := cluster.Accept(
 		state, gateway, "buzz", "5.6.7.8:666", cluster.SchemaVersion, len(version.APIExtensions))
 	assert.NoError(t, err)
-	assert.Len(t, nodes, 2)
+	require.Len(t, nodes, 1)
 	assert.Equal(t, int64(1), nodes[0].ID)
-	assert.Equal(t, int64(2), nodes[1].ID)
 	assert.Equal(t, "1.2.3.4:666", nodes[0].Address)
-	assert.Equal(t, "5.6.7.8:666", nodes[1].Address)
 }
 
 func TestJoin(t *testing.T) {
@@ -492,3 +491,11 @@ func (h *membershipFixtures) ClusterNode(address string) {
 	})
 	require.NoError(h.t, err)
 }
+
+// Set the address of the first node.
+func (h *membershipFixtures) ClusterFirstNode(address string) {
+	err := h.state.Cluster.Transaction(func(tx *db.ClusterTx) error {
+		return tx.NodeUpdate(1, "none", address)
+	})
+	require.NoError(h.t, err)
+}

From 930cc134828380fbb3aae877c1233834dd14a0ff Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 12:17:36 +0000
Subject: [PATCH 04/15] Gateway.currentRaftNodes: return only voter nodes

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/gateway.go | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index f16705ec43..38747ffc6f 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -705,19 +705,22 @@ func (g *Gateway) currentRaftNodes() ([]db.RaftNode, error) {
 	if !isLeader {
 		return nil, errNotLeader
 	}
-	client, err := g.getClient()
+	cli, err := g.getClient()
 	if err != nil {
 		return nil, err
 	}
-	defer client.Close()
+	defer cli.Close()
 
-	servers, err := client.Cluster(context.Background())
+	servers, err := cli.Cluster(context.Background())
 	if err != nil {
 		return nil, err
 	}
 	provider := raftAddressProvider{db: g.db}
 	nodes := make([]db.RaftNode, len(servers))
 	for i, server := range servers {
+		if server.Role != client.Voter {
+			continue
+		}
 		address, err := provider.ServerAddr(int(server.ID))
 		if err != nil {
 			if err != db.ErrNoSuchObject {

From 29eb4f3830e632fe16b28a291c7b0ed80dabefe1 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 12:22:51 +0000
Subject: [PATCH 05/15] Rename currentRaftNodes to currentRaftVoters

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/gateway.go             | 2 +-
 lxd/cluster/gateway_export_test.go | 2 +-
 lxd/cluster/heartbeat.go           | 2 +-
 lxd/cluster/membership.go          | 4 ++--
 4 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 38747ffc6f..574e05cacb 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -690,7 +690,7 @@ var errNotLeader = fmt.Errorf("Not leader")
 // Return information about the LXD nodes that a currently part of the raft
 // cluster, as configured in the raft log. It returns an error if this node is
 // not the leader.
-func (g *Gateway) currentRaftNodes() ([]db.RaftNode, error) {
+func (g *Gateway) currentRaftVoters() ([]db.RaftNode, error) {
 	g.lock.RLock()
 	defer g.lock.RUnlock()
 
diff --git a/lxd/cluster/gateway_export_test.go b/lxd/cluster/gateway_export_test.go
index 2ee96d5bc1..88887f774d 100644
--- a/lxd/cluster/gateway_export_test.go
+++ b/lxd/cluster/gateway_export_test.go
@@ -17,5 +17,5 @@ func (g *Gateway) Cert() *shared.CertInfo {
 
 // RaftNodes returns the nodes currently part of the raft cluster.
 func (g *Gateway) RaftNodes() ([]db.RaftNode, error) {
-	return g.currentRaftNodes()
+	return g.currentRaftVoters()
 }
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index 6a8c1c0cd0..273a9c2555 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -191,7 +191,7 @@ func (g *Gateway) heartbeat(ctx context.Context, initialHeartbeat bool) {
 		return
 	}
 
-	raftNodes, err := g.currentRaftNodes()
+	raftNodes, err := g.currentRaftVoters()
 	if err == errNotLeader {
 		return
 	}
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 46c8661f3c..88d2137f31 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -204,7 +204,7 @@ func Accept(state *state.State, gateway *Gateway, name, address string, schema,
 
 	// Possibly insert the new node into the raft_nodes table (if we have
 	// less than 3 database nodes).
-	nodes, err := gateway.currentRaftNodes()
+	nodes, err := gateway.currentRaftVoters()
 	if err != nil {
 		return nil, errors.Wrap(err, "Failed to get raft nodes from the log")
 	}
@@ -481,7 +481,7 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
 func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, error) {
 	// First get the current raft members, since this method should be
 	// called after a node has left.
-	currentRaftNodes, err := gateway.currentRaftNodes()
+	currentRaftNodes, err := gateway.currentRaftVoters()
 	if err != nil {
 		return "", nil, errors.Wrap(err, "failed to get current raft nodes")
 	}

From 51f33a1200b2b88eacdbe0686ba745eefb59bfd6 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 12:36:57 +0000
Subject: [PATCH 06/15] Extract currentDqliteNodes from currentRaftVoters

currentDqliteNodes return all dqlite nodes, not only voting ones

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/gateway.go | 36 ++++++++++++++++++++++++++----------
 1 file changed, 26 insertions(+), 10 deletions(-)

diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 574e05cacb..88f2526d91 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -687,10 +687,9 @@ func (g *Gateway) isLeader() (bool, error) {
 // Internal error signalling that a node not the leader.
 var errNotLeader = fmt.Errorf("Not leader")
 
-// Return information about the LXD nodes that a currently part of the raft
-// cluster, as configured in the raft log. It returns an error if this node is
-// not the leader.
-func (g *Gateway) currentRaftVoters() ([]db.RaftNode, error) {
+// Return all current dqlite nodes. Each node has either the Voter, StandBy or
+// Spare role.
+func (g *Gateway) currentDqliteNodes() ([]client.NodeInfo, error) {
 	g.lock.RLock()
 	defer g.lock.RUnlock()
 
@@ -705,6 +704,7 @@ func (g *Gateway) currentRaftVoters() ([]db.RaftNode, error) {
 	if !isLeader {
 		return nil, errNotLeader
 	}
+
 	cli, err := g.getClient()
 	if err != nil {
 		return nil, err
@@ -716,11 +716,7 @@ func (g *Gateway) currentRaftVoters() ([]db.RaftNode, error) {
 		return nil, err
 	}
 	provider := raftAddressProvider{db: g.db}
-	nodes := make([]db.RaftNode, len(servers))
 	for i, server := range servers {
-		if server.Role != client.Voter {
-			continue
-		}
 		address, err := provider.ServerAddr(int(server.ID))
 		if err != nil {
 			if err != db.ErrNoSuchObject {
@@ -731,8 +727,28 @@ func (g *Gateway) currentRaftVoters() ([]db.RaftNode, error) {
 			// its raft_nodes table is not fully up-to-date yet.
 			address = server.Address
 		}
-		nodes[i].ID = int64(server.ID)
-		nodes[i].Address = string(address)
+		servers[i].Address = string(address)
+	}
+	return servers, nil
+}
+
+// Return information about the LXD nodes that a currently part of the raft
+// cluster, as configured in the raft log. It returns an error if this node is
+// not the leader.
+func (g *Gateway) currentRaftVoters() ([]db.RaftNode, error) {
+	servers, err := g.currentDqliteNodes()
+	if err != nil {
+		return nil, err
+	}
+	nodes := make([]db.RaftNode, 0)
+	for _, server := range servers {
+		if server.Role != client.Voter {
+			continue
+		}
+		node := db.RaftNode{}
+		node.ID = int64(server.ID)
+		node.Address = string(server.Address)
+		nodes = append(nodes, node)
 	}
 	return nodes, nil
 }

From cdf295ada8b030d526e15f1825dc519ba95999f8 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 12:38:26 +0000
Subject: [PATCH 07/15] Add Role field to the internalRaftNode struct

It will be used by the accepting node to communicate to the joining node what
role it should have.

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/api_cluster.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index dab233e518..10e762e030 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -1082,6 +1082,7 @@ type internalClusterPostAcceptResponse struct {
 type internalRaftNode struct {
 	ID      int64  `json:"id" yaml:"id"`
 	Address string `json:"address" yaml:"address"`
+	Role    int    `json:"role" yaml:"role"`
 }
 
 // Used to update the cluster after a database node has been removed, and

From 64dea5bb937111e96f11b574012412e27b0469d5 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:28:48 +0000
Subject: [PATCH 08/15] Gateway.Accept: return a full list of nodes, not only
 voters

This way we can account for different roles to assign to joining nodes

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/membership.go | 27 ++++++++++++++++-----------
 1 file changed, 16 insertions(+), 11 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 88d2137f31..30a41ff4ae 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -5,7 +5,6 @@ import (
 	"fmt"
 	"os"
 	"path/filepath"
-	"strconv"
 	"time"
 
 	"github.com/canonical/go-dqlite/client"
@@ -156,7 +155,7 @@ func Bootstrap(state *state.State, gateway *Gateway, name string) error {
 //
 // Return an updated list raft database nodes (possibly including the newly
 // accepted node).
-func Accept(state *state.State, gateway *Gateway, name, address string, schema, api int) ([]db.RaftNode, error) {
+func Accept(state *state.State, gateway *Gateway, name, address string, schema, api int) ([]client.NodeInfo, error) {
 	// Check parameters
 	if name == "" {
 		return nil, fmt.Errorf("node name must not be empty")
@@ -202,20 +201,26 @@ func Accept(state *state.State, gateway *Gateway, name, address string, schema,
 		return nil, err
 	}
 
-	// Possibly insert the new node into the raft_nodes table (if we have
-	// less than 3 database nodes).
-	nodes, err := gateway.currentRaftVoters()
+	// Get the current nodes and figure what database role the new node
+	// should have.
+	nodes, err := gateway.currentDqliteNodes()
 	if err != nil {
 		return nil, errors.Wrap(err, "Failed to get raft nodes from the log")
 	}
-	count, err := Count(state)
-	if err != nil {
-		return nil, errors.Wrap(err, "Fetch cluster members count")
+	voters := 0
+	for _, node := range nodes {
+		if node.Role == client.Voter {
+			voters++
+		}
 	}
-	if count != 2 && len(nodes) < membershipMaxRaftNodes {
-		nodes = append(nodes, db.RaftNode{ID: id, Address: address})
+	node := client.NodeInfo{ID: uint64(id), Address: address}
+	count := len(nodes) + 1 // Total nodes count
+	if count != 2 && voters < membershipMaxRaftNodes {
+		node.Role = client.Voter
+	} else {
+		node.Role = client.Spare
 	}
-
+	nodes = append(nodes, node)
 	return nodes, nil
 }
 

From b9dd3ba3eb8ca4cfeb96a9bd1c2b3cafbac2ab30 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:30:14 +0000
Subject: [PATCH 09/15] Gateway.Join: always add the joining node to the raft
 configuration

The initial role is defined by the list returned by Gateway.Accept().

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/membership.go | 77 +++++++++++++++++++++------------------
 1 file changed, 42 insertions(+), 35 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 30a41ff4ae..cd708d84fa 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -231,12 +231,20 @@ func Accept(state *state.State, gateway *Gateway, name, address string, schema,
 //
 // The cert parameter must contain the keypair/CA material of the cluster being
 // joined.
-func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name string, raftNodes []db.RaftNode) error {
+func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name string, nodes []client.NodeInfo) error {
 	// Check parameters
 	if name == "" {
 		return fmt.Errorf("node name must not be empty")
 	}
 
+	// Figure out which are the voters.
+	voters := make([]db.RaftNode, 0)
+	for _, node := range nodes {
+		if node.Role == client.Voter {
+			voters = append(voters, db.RaftNode{ID: int64(node.ID), Address: node.Address})
+		}
+	}
+
 	var address string
 	err := state.Node.Transaction(func(tx *db.NodeTx) error {
 		// Fetch current network address and raft nodes
@@ -253,7 +261,7 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
 		}
 
 		// Set the raft nodes list to the one that was returned by Accept().
-		err = tx.RaftNodesReplace(raftNodes)
+		err = tx.RaftNodesReplace(voters)
 		if err != nil {
 			return errors.Wrap(err, "failed to set raft nodes")
 		}
@@ -327,40 +335,39 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
 		return errors.Wrap(err, "failed to re-initialize gRPC SQL gateway")
 	}
 
-	// If we are listed among the database nodes, join the raft cluster.
-	id := ""
-	var target *db.RaftNode
-	for _, node := range raftNodes {
+	// Figure out our role.
+	var info *client.NodeInfo
+	var target *client.NodeInfo
+	for i, node := range nodes {
 		if node.Address == address {
-			id = strconv.Itoa(int(node.ID))
+			info = &nodes[i]
 		} else {
-			target = &node
+			target = &nodes[i]
 		}
 	}
-	if id != "" {
-		if target == nil {
-			panic("no other node found")
-		}
-		logger.Info(
-			"Joining dqlite raft cluster",
-			log15.Ctx{"id": id, "address": address, "target": target.Address})
-		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-		defer cancel()
-		client, err := client.FindLeader(
-			ctx, gateway.NodeStore(),
-			client.WithDialFunc(gateway.raftDial()),
-			client.WithLogFunc(DqliteLog),
-		)
-		if err != nil {
-			return errors.Wrap(err, "Failed to connect to cluster leader")
-		}
-		defer client.Close()
-		err = client.Add(ctx, gateway.raft.info)
-		if err != nil {
-			return errors.Wrap(err, "Failed to join cluster")
-		}
-	} else {
-		logger.Info("Joining cluster as non-database node")
+	if info == nil {
+		panic("joining node found")
+	}
+	if target == nil {
+		panic("no other node found")
+	}
+	logger.Info(
+		"Joining dqlite cluster",
+		log15.Ctx{"id": info.ID, "address": address, "target": target.Address, "role": info.Role})
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+	cli, err := client.FindLeader(
+		ctx, gateway.NodeStore(),
+		client.WithDialFunc(gateway.raftDial()),
+		client.WithLogFunc(DqliteLog),
+	)
+	if err != nil {
+		return errors.Wrap(err, "Failed to connect to cluster leader")
+	}
+	defer cli.Close()
+	err = cli.Add(ctx, *info)
+	if err != nil {
+		return errors.Wrap(err, "Failed to join cluster")
 	}
 
 	// Make sure we can actually connect to the cluster database through
@@ -450,7 +457,7 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
 		}
 
 		// Update our role list if needed.
-		if id != "" {
+		if info.Role == client.Voter {
 			err = tx.NodeAddRole(node.ID, db.ClusterRoleDatabase)
 			if err != nil {
 				return errors.Wrapf(err, "Failed to add database role for the node")
@@ -459,10 +466,10 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
 
 		// Generate partial heartbeat request containing just a raft node list.
 		hbState := &APIHeartbeat{}
-		hbState.Update(false, raftNodes, []db.NodeInfo{}, offlineThreshold)
+		hbState.Update(false, voters, []db.NodeInfo{}, offlineThreshold)
 
 		// Attempt to send a heartbeat to all other raft nodes to notify them of new node.
-		for _, raftNode := range raftNodes {
+		for _, raftNode := range voters {
 			if raftNode.ID == node.ID {
 				continue
 			}

From f9c5eb8c5a6df16366ae9047e99e2b7c181222db Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:34:19 +0000
Subject: [PATCH 10/15] Gateway.Rebalance: look at the full list of nodes, not
 only voters

Later on we'll also promote/demote to stand-by or spare.

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/membership.go | 41 +++++++++++++++++----------------------
 1 file changed, 18 insertions(+), 23 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index cd708d84fa..3c9485acf7 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -490,23 +490,26 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
 //
 // If there's such spare node, return its address as well as the new list of
 // raft nodes.
-func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, error) {
+func Rebalance(state *state.State, gateway *Gateway) (string, []client.NodeInfo, error) {
 	// First get the current raft members, since this method should be
 	// called after a node has left.
-	currentRaftNodes, err := gateway.currentRaftVoters()
+	currentNodes, err := gateway.currentDqliteNodes()
 	if err != nil {
 		return "", nil, errors.Wrap(err, "failed to get current raft nodes")
 	}
-	if len(currentRaftNodes) >= membershipMaxRaftNodes || len(currentRaftNodes) == 1 {
+
+	// Find the addresses of the voters
+	voters := []string{}
+	for _, node := range currentNodes {
+		if node.Role == client.Voter {
+			voters = append(voters, node.Address)
+		}
+	}
+	if len(voters) >= membershipMaxRaftNodes || len(currentNodes) == 2 {
 		// We're already at full capacity or would have a two-member cluster.
 		return "", nil, nil
 	}
 
-	currentRaftAddresses := make([]string, len(currentRaftNodes))
-	for i, node := range currentRaftNodes {
-		currentRaftAddresses[i] = node.Address
-	}
-
 	// Check if we have a spare node that we can turn into a database one.
 	address := ""
 	err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
@@ -520,7 +523,7 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
 		}
 		// Find a node that is not part of the raft cluster yet.
 		for _, node := range nodes {
-			if shared.StringInSlice(node.Address, currentRaftAddresses) {
+			if shared.StringInSlice(node.Address, voters) {
 				continue // This is already a database node
 			}
 			if node.IsOffline(config.OfflineThreshold()) {
@@ -540,25 +543,17 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
 
 	if address == "" {
 		// No node to promote
-		return "", currentRaftNodes, nil
+		return "", currentNodes, nil
 	}
 
-	// Figure out the next ID in the raft_nodes table
-	var updatedRaftNodes []db.RaftNode
-	err = gateway.db.Transaction(func(tx *db.NodeTx) error {
-		id, err := tx.RaftNodeAdd(address)
-		if err != nil {
-			return errors.Wrap(err, "Failed to add new raft node")
+	// Update the node role.
+	for i := range currentNodes {
+		if currentNodes[i].Address == address {
+			currentNodes[i].Role = client.Voter
 		}
-
-		updatedRaftNodes = append(currentRaftNodes, db.RaftNode{ID: id, Address: address})
-		return nil
-	})
-	if err != nil {
-		return "", nil, err
 	}
 
-	return address, updatedRaftNodes, nil
+	return address, currentNodes, nil
 }
 
 // Promote makes a LXD node which is not a database node, become part of the

From ede83b840707d9bb890021478fe94c3b7c008638 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:35:21 +0000
Subject: [PATCH 11/15] Gateway.Promote: use client.Assign() instead of
 client.Add()

The node being promoted is already in the raft configuration, only its role
needs to be changed.

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/membership.go | 24 +++++++++++++++---------
 1 file changed, 15 insertions(+), 9 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 3c9485acf7..eb1a134d75 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -558,7 +558,7 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []client.NodeInfo,
 
 // Promote makes a LXD node which is not a database node, become part of the
 // raft cluster.
-func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
+func Promote(state *state.State, gateway *Gateway, nodes []client.NodeInfo) error {
 	logger.Info("Promote node to database node")
 
 	// Sanity check that this is not already a database node
@@ -587,11 +587,11 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
 
 	// Figure out our raft node ID, and an existing target raft node that
 	// we'll contact to add ourselves as member.
-	id := int64(-1)
+	var info *client.NodeInfo
 	target := ""
-	for _, node := range nodes {
+	for i, node := range nodes {
 		if node.Address == address {
-			id = node.ID
+			info = &nodes[i]
 		} else {
 			target = node.Address
 		}
@@ -599,15 +599,21 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
 
 	// Sanity check that our address was actually included in the given
 	// list of raft nodes.
-	if id == -1 {
+	if info == nil {
 		return fmt.Errorf("this node is not included in the given list of database nodes")
 	}
 
 	// Replace our local list of raft nodes with the given one (which
 	// includes ourselves). This will make the gateway start a raft node
 	// when restarted.
+	voters := []db.RaftNode{}
+	for _, node := range nodes {
+		if node.Role == client.Voter {
+			voters = append(voters, db.RaftNode{ID: int64(node.ID), Address: node.Address})
+		}
+	}
 	err = state.Node.Transaction(func(tx *db.NodeTx) error {
-		err = tx.RaftNodesReplace(nodes)
+		err = tx.RaftNodesReplace(voters)
 		if err != nil {
 			return errors.Wrap(err, "failed to set raft nodes")
 		}
@@ -642,7 +648,7 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
 
 	logger.Info(
 		"Joining dqlite raft cluster",
-		log15.Ctx{"id": id, "address": address, "target": target})
+		log15.Ctx{"id": info.ID, "address": info.Address, "target": target, "role": info.Role})
 
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
@@ -653,9 +659,9 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
 	}
 	defer client.Close()
 
-	err = client.Add(ctx, gateway.raft.info)
+	err = client.Assign(ctx, info.ID, info.Role)
 	if err != nil {
-		return errors.Wrap(err, "Failed to join cluster")
+		return errors.Wrap(err, "Failed to promote node")
 	}
 
 	// Unlock regular access to our cluster database and add the database role.

From 7900dde15c48cff555b23d0b07f12146f22c6991 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:36:37 +0000
Subject: [PATCH 12/15] Drop legacy test

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/membership_test.go | 78 ----------------------------------
 1 file changed, 78 deletions(-)

diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index 37c28f8dde..555681cd72 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -356,84 +356,6 @@ func TestJoin(t *testing.T) {
 	assert.Len(t, members, 1)
 }
 
-func FLAKY_TestPromote(t *testing.T) {
-	// Setup a target node running as leader of a cluster.
-	targetCert := shared.TestingKeyPair()
-	targetMux := http.NewServeMux()
-	targetServer := newServer(targetCert, targetMux)
-	defer targetServer.Close()
-
-	targetState, cleanup := state.NewTestState(t)
-	defer cleanup()
-
-	targetGateway := newGateway(t, targetState.Node, targetCert)
-	defer targetGateway.Shutdown()
-
-	for path, handler := range targetGateway.HandlerFuncs(nil) {
-		targetMux.HandleFunc(path, handler)
-	}
-
-	targetAddress := targetServer.Listener.Addr().String()
-	var err error
-	require.NoError(t, targetState.Cluster.Close())
-	store := targetGateway.NodeStore()
-	dialFunc := targetGateway.DialFunc()
-	targetState.Cluster, err = db.OpenCluster(
-		"db.bin", store, targetAddress, "/unused/db/dir", 5*time.Second, nil,
-		driver.WithDialFunc(dialFunc))
-	require.NoError(t, err)
-	targetF := &membershipFixtures{t: t, state: targetState}
-	targetF.ClusterAddress(targetAddress)
-
-	err = cluster.Bootstrap(targetState, targetGateway, "buzz")
-	require.NoError(t, err)
-
-	// Setup a node to be promoted.
-	mux := http.NewServeMux()
-	server := newServer(targetCert, mux) // Use the same cert, as we're already part of the cluster
-	defer server.Close()
-
-	state, cleanup := state.NewTestState(t)
-	defer cleanup()
-
-	address := server.Listener.Addr().String()
-	targetF.ClusterNode(address) // Add the non database node to the cluster database
-	f := &membershipFixtures{t: t, state: state}
-	f.ClusterAddress(address)
-	f.RaftNode(targetAddress) // Insert the leader in our local list of database nodes
-
-	gateway := newGateway(t, state.Node, targetCert)
-	defer gateway.Shutdown()
-
-	for path, handler := range gateway.HandlerFuncs(nil) {
-		mux.HandleFunc(path, handler)
-	}
-
-	// Promote the node.
-	targetF.RaftNode(address) // Add the address of the node to be promoted in the leader's db
-	raftNodes := targetF.RaftNodes()
-	err = cluster.Promote(state, gateway, raftNodes)
-	require.NoError(t, err)
-
-	// The leader now returns an updated list of raft nodes.
-	raftNodes, err = targetGateway.RaftNodes()
-	require.NoError(t, err)
-	assert.Len(t, raftNodes, 2)
-	assert.Equal(t, int64(1), raftNodes[0].ID)
-	assert.Equal(t, targetAddress, raftNodes[0].Address)
-	assert.Equal(t, int64(2), raftNodes[1].ID)
-	assert.Equal(t, address, raftNodes[1].Address)
-
-	// The List function returns all nodes in the cluster.
-	nodes, err := cluster.List(state)
-	require.NoError(t, err)
-	assert.Len(t, nodes, 2)
-	assert.Equal(t, "Online", nodes[0].Status)
-	assert.Equal(t, "Online", nodes[1].Status)
-	assert.True(t, nodes[0].Database)
-	assert.True(t, nodes[1].Database)
-}
-
 // Helper for setting fixtures for Bootstrap tests.
 type membershipFixtures struct {
 	t     *testing.T

From 91f54f26e57736e8a429c749213bc8b31ffb35c5 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:36:44 +0000
Subject: [PATCH 13/15] Change cluster unit tests to cope with non-voters

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/membership_test.go | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index 555681cd72..05a16e272e 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -8,6 +8,7 @@ import (
 	"testing"
 	"time"
 
+	client "github.com/canonical/go-dqlite/client"
 	"github.com/canonical/go-dqlite/driver"
 	"github.com/lxc/lxd/lxd/cluster"
 	"github.com/lxc/lxd/lxd/db"
@@ -227,9 +228,13 @@ func TestAccept_ClusterHasOneMember(t *testing.T) {
 	nodes, err := cluster.Accept(
 		state, gateway, "buzz", "5.6.7.8:666", cluster.SchemaVersion, len(version.APIExtensions))
 	assert.NoError(t, err)
-	require.Len(t, nodes, 1)
-	assert.Equal(t, int64(1), nodes[0].ID)
+	require.Len(t, nodes, 2)
+	assert.Equal(t, uint64(1), nodes[0].ID)
 	assert.Equal(t, "1.2.3.4:666", nodes[0].Address)
+	assert.Equal(t, client.Voter, nodes[0].Role)
+	assert.Equal(t, uint64(2), nodes[1].ID)
+	assert.Equal(t, "5.6.7.8:666", nodes[1].Address)
+	assert.Equal(t, client.Spare, nodes[1].Role)
 }
 
 func TestJoin(t *testing.T) {
@@ -314,11 +319,11 @@ func TestJoin(t *testing.T) {
 
 	// The leader now returns an updated list of raft nodes.
 	// The new node is not included to ensure distributed consensus.
-	raftNodes, err = targetGateway.RaftNodes()
+	newNodes, err := targetGateway.RaftNodes()
 	require.NoError(t, err)
-	assert.Len(t, raftNodes, 1)
-	assert.Equal(t, int64(1), raftNodes[0].ID)
-	assert.Equal(t, targetAddress, raftNodes[0].Address)
+	assert.Len(t, newNodes, 1)
+	assert.Equal(t, int64(1), newNodes[0].ID)
+	assert.Equal(t, targetAddress, newNodes[0].Address)
 
 	// The List function returns all nodes in the cluster.
 	nodes, err := cluster.List(state)

From 389bd78fc1f2c10a12460c0c68953ed467f99f39 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:37:35 +0000
Subject: [PATCH 14/15] internal cluster APIs: exchange role information when
 joining

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/api_cluster.go | 38 +++++++++++++++++++++++++-------------
 1 file changed, 25 insertions(+), 13 deletions(-)

diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 10e762e030..2ea7359473 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -12,6 +12,7 @@ import (
 	"strings"
 	"time"
 
+	"github.com/canonical/go-dqlite/client"
 	dqlitedriver "github.com/canonical/go-dqlite/driver"
 	"github.com/gorilla/mux"
 	"github.com/pkg/errors"
@@ -371,7 +372,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
 		}
 
 		// Connect to the target cluster node.
-		client, err := lxd.ConnectLXD(fmt.Sprintf("https://%s", req.ClusterAddress), args)
+		cli, err := lxd.ConnectLXD(fmt.Sprintf("https://%s", req.ClusterAddress), args)
 		if err != nil {
 			return err
 		}
@@ -387,7 +388,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
 				return errors.Wrap(err, "Failed to connect to local LXD")
 			}
 
-			err = clusterInitMember(d, client, req.MemberConfig)
+			err = clusterInitMember(d, cli, req.MemberConfig)
 			if err != nil {
 				return errors.Wrap(err, "Failed to initialize member")
 			}
@@ -425,7 +426,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
 
 		// Now request for this node to be added to the list of cluster nodes.
 		info, err := clusterAcceptMember(
-			client, req.ServerName, address, cluster.SchemaVersion,
+			cli, req.ServerName, address, cluster.SchemaVersion,
 			version.APIExtensionsCount(), pools, networks)
 		if err != nil {
 			return errors.Wrap(err, "Failed request to add member")
@@ -444,10 +445,11 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
 
 		// Update local setup and possibly join the raft dqlite
 		// cluster.
-		nodes := make([]db.RaftNode, len(info.RaftNodes))
+		nodes := make([]client.NodeInfo, len(info.RaftNodes))
 		for i, node := range info.RaftNodes {
-			nodes[i].ID = node.ID
+			nodes[i].ID = uint64(node.ID)
 			nodes[i].Address = node.Address
+			nodes[i].Role = client.NodeRole(node.Role)
 		}
 
 		// The default timeout when non-clustered is one minute, let's
@@ -596,7 +598,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
 
 			for f, ps := range imageProjectInfo {
 				go func(fingerprint string, projects []string) {
-					err := imageImport(client, fingerprint, projects)
+					err := imageImport(cli, fingerprint, projects)
 					if err != nil {
 						logger.Errorf("Failed to import an image %s from %s: %v", fingerprint, leader, err)
 					}
@@ -607,12 +609,12 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
 		// Add the cluster flag from the agent
 		version.UserAgentFeatures([]string{"cluster"})
 
-		client, err = cluster.Connect(req.ClusterAddress, d.endpoints.NetworkCert(), true)
+		cli, err = cluster.Connect(req.ClusterAddress, d.endpoints.NetworkCert(), true)
 		if err != nil {
 			return err
 		}
 
-		err = clusterRebalance(client)
+		err = clusterRebalance(cli)
 		if err != nil {
 			return err
 		}
@@ -1056,8 +1058,9 @@ func internalClusterPostAccept(d *Daemon, r *http.Request) response.Response {
 		PrivateKey: d.endpoints.NetworkPrivateKey(),
 	}
 	for i, node := range nodes {
-		accepted.RaftNodes[i].ID = node.ID
+		accepted.RaftNodes[i].ID = int64(node.ID)
 		accepted.RaftNodes[i].Address = node.Address
+		accepted.RaftNodes[i].Role = int(node.Role)
 	}
 	return response.SyncResponse(true, accepted)
 }
@@ -1133,8 +1136,15 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response
 			return response.InternalError(err)
 		}
 
+		voters := []db.RaftNode{}
+		for _, node := range nodes {
+			if node.Role == client.Voter {
+				voters = append(voters, db.RaftNode{ID: int64(node.ID), Address: node.Address})
+			}
+		}
+
 		hbState := &cluster.APIHeartbeat{}
-		hbState.Update(false, nodes, []db.NodeInfo{}, offlineThreshold)
+		hbState.Update(false, voters, []db.NodeInfo{}, offlineThreshold)
 
 		cert, err := util.LoadCert(d.os.VarDir)
 		if err != nil {
@@ -1156,8 +1166,9 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response
 	post := &internalClusterPostPromoteRequest{}
 	for _, node := range nodes {
 		post.RaftNodes = append(post.RaftNodes, internalRaftNode{
-			ID:      node.ID,
+			ID:      int64(node.ID),
 			Address: node.Address,
+			Role:    int(node.Role),
 		})
 	}
 
@@ -1190,10 +1201,11 @@ func internalClusterPostPromote(d *Daemon, r *http.Request) response.Response {
 		return response.BadRequest(fmt.Errorf("No raft members provided"))
 	}
 
-	nodes := make([]db.RaftNode, len(req.RaftNodes))
+	nodes := make([]client.NodeInfo, len(req.RaftNodes))
 	for i, node := range req.RaftNodes {
-		nodes[i].ID = node.ID
+		nodes[i].ID = uint64(node.ID)
 		nodes[i].Address = node.Address
+		nodes[i].Role = client.NodeRole(node.Role)
 	}
 	err = cluster.Promote(d.State(), d.gateway, nodes)
 	if err != nil {

From 4b0822acae9dd16329758739047110c58223170d Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 16:47:53 +0000
Subject: [PATCH 15/15] Fix notification of new voters

The heartbeat that was sent by joining nodes was empty. Also, we send the
heartbeat notification also upon promotion of an existing node.

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/membership.go | 40 +++++++++++++++++++++++++--------------
 1 file changed, 26 insertions(+), 14 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index eb1a134d75..998e8538a3 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -464,17 +464,7 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
 			}
 		}
 
-		// Generate partial heartbeat request containing just a raft node list.
-		hbState := &APIHeartbeat{}
-		hbState.Update(false, voters, []db.NodeInfo{}, offlineThreshold)
-
-		// Attempt to send a heartbeat to all other raft nodes to notify them of new node.
-		for _, raftNode := range voters {
-			if raftNode.ID == node.ID {
-				continue
-			}
-			go HeartbeatNode(context.Background(), raftNode.Address, cert, hbState)
-		}
+		notifyNewVoter(voters, node.ID, cert)
 
 		return nil
 	})
@@ -485,6 +475,24 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
 	return nil
 }
 
+// Attempt to send a heartbeat to all other voters to notify them of the new voter.
+func notifyNewVoter(voters []db.RaftNode, id int64, cert *shared.CertInfo) {
+	// Generate partial heartbeat request containing just a raft node list.
+	hbState := &APIHeartbeat{}
+	nodes := make([]db.NodeInfo, len(voters))
+	for i, voter := range voters {
+		nodes[i].ID = voter.ID
+		nodes[i].Address = voter.Address
+	}
+	hbState.Update(false, voters, nodes, 0)
+	for _, voter := range voters {
+		if voter.ID == id {
+			continue
+		}
+		go HeartbeatNode(context.Background(), voter.Address, cert, hbState)
+	}
+}
+
 // Rebalance the raft cluster, trying to see if we have a spare online node
 // that we can promote to database node if we are below membershipMaxRaftNodes.
 //
@@ -653,13 +661,13 @@ func Promote(state *state.State, gateway *Gateway, nodes []client.NodeInfo) erro
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 
-	client, err := client.FindLeader(ctx, gateway.NodeStore(), client.WithDialFunc(gateway.raftDial()))
+	cli, err := client.FindLeader(ctx, gateway.NodeStore(), client.WithDialFunc(gateway.raftDial()))
 	if err != nil {
 		return errors.Wrap(err, "Failed to connect to cluster leader")
 	}
-	defer client.Close()
+	defer cli.Close()
 
-	err = client.Assign(ctx, info.ID, info.Role)
+	err = cli.Assign(ctx, info.ID, info.Role)
 	if err != nil {
 		return errors.Wrap(err, "Failed to promote node")
 	}
@@ -676,6 +684,10 @@ func Promote(state *state.State, gateway *Gateway, nodes []client.NodeInfo) erro
 		return errors.Wrap(err, "cluster database initialization failed")
 	}
 
+	if info.Role == client.Voter {
+		notifyNewVoter(voters, int64(info.ID), gateway.cert)
+	}
+
 	return nil
 }
 


More information about the lxc-devel mailing list