[lxc-devel] [lxd/master] Plug go-dqlite roles logic into LXD

freeekanayaka on Github lxc-bot at linuxcontainers.org
Wed Jul 1 07:51:31 UTC 2020


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 951 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20200701/db43763c/attachment-0001.bin>
-------------- next part --------------
From 761015a70981723a15d7445f49ba1950f3f480c6 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 30 Jun 2020 09:25:47 +0200
Subject: [PATCH 1/4] lxd/cluster: Only look up raft_nodes for resolving the
 address of node 1

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/gateway.go    | 33 ++++++++++++++-------------------
 lxd/cluster/membership.go | 10 +++-------
 2 files changed, 17 insertions(+), 26 deletions(-)

diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 42c2ae3eee..4846f14138 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -403,14 +403,11 @@ func (g *Gateway) DialFunc() client.DialFunc {
 // Dial function for establishing raft connections.
 func (g *Gateway) raftDial() client.DialFunc {
 	return func(ctx context.Context, address string) (net.Conn, error) {
-		if address == "1" {
-			addr, err := g.raftAddress(1)
-			if err != nil {
-				return nil, err
-			}
-			address = string(addr)
+		nodeAddress, err := g.nodeAddress(address)
+		if err != nil {
+			return nil, err
 		}
-		conn, err := dqliteNetworkDial(ctx, address, g)
+		conn, err := dqliteNetworkDial(ctx, nodeAddress, g)
 		if err != nil {
 			return nil, err
 		}
@@ -480,7 +477,7 @@ func (g *Gateway) TransferLeadership() error {
 		if server.ID == g.info.ID || server.Role != db.RaftVoter {
 			continue
 		}
-		address, err := g.raftAddress(server.ID)
+		address, err := g.nodeAddress(server.Address)
 		if err != nil {
 			return err
 		}
@@ -851,27 +848,25 @@ func (g *Gateway) currentRaftNodes() ([]db.RaftNode, error) {
 		return nil, err
 	}
 	for i, server := range servers {
-		address, err := g.raftAddress(server.ID)
+		address, err := g.nodeAddress(server.Address)
 		if err != nil {
-			if err != db.ErrNoSuchObject {
-				return nil, errors.Wrap(err, "Failed to fetch raft server address")
-			}
-			// Use the initial address as fallback. This is an edge
-			// case that happens when a new leader is elected and
-			// its raft_nodes table is not fully up-to-date yet.
-			address = server.Address
+			return nil, errors.Wrap(err, "Failed to fetch raft server address")
 		}
 		servers[i].Address = address
 	}
 	return servers, nil
 }
 
-// Look up a server address in the raft_nodes table.
-func (g *Gateway) raftAddress(databaseID uint64) (string, error) {
+// Translate a raft address to a node address. They are always the same except
+// for the bootstrap node, which has address "1".
+func (g *Gateway) nodeAddress(raftAddress string) (string, error) {
+	if raftAddress != "1" {
+		return raftAddress, nil
+	}
 	var address string
 	err := g.db.Transaction(func(tx *db.NodeTx) error {
 		var err error
-		address, err = tx.GetRaftNodeAddress(int64(databaseID))
+		address, err = tx.GetRaftNodeAddress(1)
 		return err
 	})
 	if err != nil {
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 1d260d4295..cf335c3dd1 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -938,13 +938,9 @@ func List(state *state.State, gateway *Gateway) ([]api.ClusterMember, error) {
 	}
 	raftRoles := map[string]client.NodeRole{} // Address to role
 	for _, node := range raftNodes {
-		address := node.Address
-		if address == "1" {
-			addr, err := gateway.raftAddress(1)
-			if err != nil {
-				return nil, err
-			}
-			address = string(addr)
+		address, err := gateway.nodeAddress(node.Address)
+		if err != nil {
+			return nil, err
 		}
 		raftRoles[address] = node.Role
 	}

From 292ed405c3e5a2c9bda31ee945896826c232057e Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 30 Jun 2020 10:48:27 +0200
Subject: [PATCH 2/4] lxd/cluster: Leverage RolesChanges.Handover() to choose
 handover target

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/.dir-locals.el        |  2 +-
 lxd/cluster/membership.go | 78 +++++++++++++++++++++++++++++----------
 2 files changed, 60 insertions(+), 20 deletions(-)

diff --git a/lxd/.dir-locals.el b/lxd/.dir-locals.el
index 77b750661f..b72bdc482e 100644
--- a/lxd/.dir-locals.el
+++ b/lxd/.dir-locals.el
@@ -1,7 +1,7 @@
 ;;; Directory Local Variables
 ;;; For more information see (info "(emacs) Directory Variables")
 ((go-mode
-  . ((go-test-args . "-tags libsqlite3 -timeout 90s")
+  . ((go-test-args . "-tags libsqlite3 -timeout 120s")
      (eval
       . (set
 	 (make-local-variable 'flycheck-go-build-tags)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index cf335c3dd1..45ee7df16b 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -7,6 +7,7 @@ import (
 	"path/filepath"
 	"time"
 
+	"github.com/canonical/go-dqlite/app"
 	"github.com/canonical/go-dqlite/client"
 	"github.com/lxc/lxd/lxd/db"
 	"github.com/lxc/lxd/lxd/db/cluster"
@@ -839,40 +840,79 @@ func Leave(state *state.State, gateway *Gateway, name string, force bool) (strin
 func Handover(state *state.State, gateway *Gateway, address string) (string, []db.RaftNode, error) {
 	nodes, err := gateway.currentRaftNodes()
 	if err != nil {
-		return "", nil, errors.Wrap(err, "Failed to get current raft nodes")
+		return "", nil, errors.Wrap(err, "Get current raft nodes")
 	}
 
-	// If the member which is shutting down is not a voter, there's nothing
-	// to do.
-	found := false
+	var nodeID uint64
 	for _, node := range nodes {
-		if node.Address != address {
-			continue
-		}
-		if node.Role != db.RaftVoter {
-			return "", nil, nil
+		if node.Address == address {
+			nodeID = node.ID
 		}
-		found = true
-		break
+
 	}
-	if !found {
+	if nodeID == 0 {
 		return "", nil, errors.Wrapf(err, "No dqlite node has address %s", address)
 	}
 
+	roles, err := newRolesChanges(state, gateway, nodes)
+	if err != nil {
+		return "", nil, err
+	}
+	role, candidates := roles.Handover(nodeID)
+
+	if role != db.RaftVoter {
+		return "", nil, nil
+	}
+
 	for i, node := range nodes {
-		if node.Role == db.RaftVoter || node.Address == address {
-			continue
+		if node.Address == candidates[0].Address {
+			nodes[i].Role = role
+			return node.Address, nodes, nil
 		}
-		if !hasConnectivity(gateway.cert, node.Address) {
-			continue
-		}
-		nodes[i].Role = db.RaftVoter
-		return node.Address, nodes, nil
 	}
 
 	return "", nil, nil
 }
 
+// Build an app.RolesChanges object feeded with the current cluster state.
+func newRolesChanges(state *state.State, gateway *Gateway, nodes []db.RaftNode) (*app.RolesChanges, error) {
+	var maxVoters int
+	var maxStandBy int
+	err := state.Cluster.Transaction(func(tx *db.ClusterTx) error {
+		config, err := ConfigLoad(tx)
+		if err != nil {
+			return errors.Wrap(err, "Load cluster configuration")
+		}
+		maxVoters = int(config.MaxVoters())
+		maxStandBy = int(config.MaxStandBy())
+		return nil
+	})
+	if err != nil {
+		return nil, err
+	}
+
+	cluster := map[client.NodeInfo]*client.NodeMetadata{}
+
+	for _, node := range nodes {
+		if hasConnectivity(gateway.cert, node.Address) {
+			cluster[node] = &client.NodeMetadata{}
+		} else {
+			cluster[node] = nil
+		}
+
+	}
+
+	roles := &app.RolesChanges{
+		Config: app.RolesConfig{
+			Voters:   maxVoters,
+			StandBys: maxStandBy,
+		},
+		State: cluster,
+	}
+
+	return roles, nil
+}
+
 // Purge removes a node entirely from the cluster database.
 func Purge(cluster *db.Cluster, name string) error {
 	logger.Debugf("Remove node %s from the database", name)

From baae60bbd34e0f7b2bd48f14ecd692f159fae412 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 30 Jun 2020 10:54:46 +0200
Subject: [PATCH 3/4] lxd/cluster: Skip unncessary loading of nodes from
 database in Rebalance()

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/cluster/membership.go | 20 ++++----------------
 1 file changed, 4 insertions(+), 16 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 45ee7df16b..c2b859bde9 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -495,7 +495,6 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
 
 	// Fetch the nodes from the database, to get their last heartbeat
 	// timestamp and check whether they are offline.
-	nodesByAddress := map[string]db.NodeInfo{}
 	var maxVoters int64
 	var maxStandBy int64
 	err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
@@ -505,13 +504,6 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
 		}
 		maxVoters = config.MaxVoters()
 		maxStandBy = config.MaxStandBy()
-		nodes, err := tx.GetNodes()
-		if err != nil {
-			return errors.Wrap(err, "failed to get cluster nodes")
-		}
-		for _, node := range nodes {
-			nodesByAddress[node.Address] = node
-		}
 		return nil
 	})
 	if err != nil {
@@ -524,8 +516,7 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
 	standbys := make([]string, 0)
 	candidates := make([]string, 0)
 	for i, info := range currentRaftNodes {
-		node := nodesByAddress[info.Address]
-		if !hasConnectivity(gateway.cert, node.Address) {
+		if !hasConnectivity(gateway.cert, info.Address) {
 			if info.Role != db.RaftSpare {
 				client, err := gateway.getClient()
 				if err != nil {
@@ -534,8 +525,7 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
 				defer client.Close()
 				ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 				defer cancel()
-				logger.Infof(
-					"Demote offline node %s (%s) to spare", node.Name, node.Address)
+				logger.Infof("Demote offline node %s to spare", info.Address)
 				err = client.Assign(ctx, info.ID, db.RaftSpare)
 				if err != nil {
 					return "", nil, errors.Wrap(err, "Failed to demote offline node")
@@ -572,10 +562,8 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
 	// Check if we have a spare node that we can promote to the missing role.
 	address := ""
 	for _, candidate := range candidates {
-		node := nodesByAddress[candidate]
-		logger.Infof(
-			"Found spare node %s (%s) to be promoted to %s", node.Name, node.Address, role)
-		address = node.Address
+		logger.Infof("Found spare node %s to be promoted to %s", candidate, role)
+		address = candidate
 		break
 	}
 

From d9e7be738732a6ce5e02b585b25eca79df543a54 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 30 Jun 2020 12:51:51 +0200
Subject: [PATCH 4/4] lxd/cluster: Leverage RolesChanges.Adjust() to choose
 rebalance target

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/api_cluster.go        | 24 +++++++++--
 lxd/cluster/connect.go    |  4 +-
 lxd/cluster/gateway.go    | 20 ++++++++-
 lxd/cluster/membership.go | 91 ++++++---------------------------------
 lxd/cluster/notify.go     |  2 +-
 test/suites/clustering.sh | 13 +++---
 6 files changed, 61 insertions(+), 93 deletions(-)

diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index fbc61dd979..04bf1c6bb3 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -1199,9 +1199,7 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response
 // Check if there's a dqlite node whose role should be changed, and post a
 // change role request if so.
 func rebalanceMemberRoles(d *Daemon) error {
-	logger.Debugf("Rebalance cluster")
-
-	// Check if we have a spare node to promote.
+again:
 	address, nodes, err := cluster.Rebalance(d.State(), d.gateway)
 	if err != nil {
 		return err
@@ -1212,13 +1210,31 @@ func rebalanceMemberRoles(d *Daemon) error {
 		return nil
 	}
 
+	// Process demotions of offline nodes immediatelly.
+	for _, node := range nodes {
+		if node.Address != address || node.Role != db.RaftSpare {
+			continue
+		}
+
+		if cluster.HasConnectivity(d.endpoints.NetworkCert(), address) {
+			break
+		}
+
+		err := d.gateway.DemoteOfflineNode(node.ID)
+		if err != nil {
+			return errors.Wrapf(err, "Demote offline node %s", node.Address)
+		}
+
+		goto again
+	}
+
 	// Tell the node to promote itself.
 	err = changeMemberRole(d, address, nodes)
 	if err != nil {
 		return err
 	}
 
-	return nil
+	goto again
 }
 
 // Post a change role request to the member with the given address. The nodes
diff --git a/lxd/cluster/connect.go b/lxd/cluster/connect.go
index 92eda9bba4..8fdb8a8390 100644
--- a/lxd/cluster/connect.go
+++ b/lxd/cluster/connect.go
@@ -168,8 +168,8 @@ func SetupTrust(cert, targetAddress, targetCert, targetPassword string) error {
 	return nil
 }
 
-// Probe network connectivity to the member with the given address.
-func hasConnectivity(cert *shared.CertInfo, address string) bool {
+// HasConnectivity probes the member with the given address for connectivity.
+func HasConnectivity(cert *shared.CertInfo, address string) bool {
 	config, err := tlsClientConfig(cert)
 	if err != nil {
 		return false
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 4846f14138..a4a49f57b3 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -481,7 +481,7 @@ func (g *Gateway) TransferLeadership() error {
 		if err != nil {
 			return err
 		}
-		if !hasConnectivity(g.cert, address) {
+		if !HasConnectivity(g.cert, address) {
 			continue
 		}
 		id = server.ID
@@ -498,6 +498,24 @@ func (g *Gateway) TransferLeadership() error {
 	return client.Transfer(ctx, id)
 }
 
+// DemoteOfflineNode force demoting an offline node.
+func (g *Gateway) DemoteOfflineNode(raftID uint64) error {
+	cli, err := g.getClient()
+	if err != nil {
+		return errors.Wrap(err, "Connect to local dqlite node")
+	}
+
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+
+	err = cli.Assign(ctx, raftID, db.RaftSpare)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
 // Shutdown this gateway, stopping the gRPC server and possibly the raft factory.
 func (g *Gateway) Shutdown() error {
 	logger.Infof("Stop database gateway")
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index c2b859bde9..2b019c7ed4 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -486,100 +486,35 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
 		return "", nil, nil
 	}
 
-	// First get the current raft members, since this method should be
-	// called after a node has left.
-	currentRaftNodes, err := gateway.currentRaftNodes()
+	nodes, err := gateway.currentRaftNodes()
 	if err != nil {
-		return "", nil, errors.Wrap(err, "failed to get current raft nodes")
+		return "", nil, errors.Wrap(err, "Get current raft nodes")
 	}
 
-	// Fetch the nodes from the database, to get their last heartbeat
-	// timestamp and check whether they are offline.
-	var maxVoters int64
-	var maxStandBy int64
-	err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
-		config, err := ConfigLoad(tx)
-		if err != nil {
-			return errors.Wrap(err, "failed load cluster configuration")
-		}
-		maxVoters = config.MaxVoters()
-		maxStandBy = config.MaxStandBy()
-		return nil
-	})
+	roles, err := newRolesChanges(state, gateway, nodes)
 	if err != nil {
 		return "", nil, err
 	}
 
-	// Group by role. If a node is offline, we'll try to demote it right
-	// away.
-	voters := make([]string, 0)
-	standbys := make([]string, 0)
-	candidates := make([]string, 0)
-	for i, info := range currentRaftNodes {
-		if !hasConnectivity(gateway.cert, info.Address) {
-			if info.Role != db.RaftSpare {
-				client, err := gateway.getClient()
-				if err != nil {
-					return "", nil, errors.Wrap(err, "Failed to connect to local dqlite node")
-				}
-				defer client.Close()
-				ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-				defer cancel()
-				logger.Infof("Demote offline node %s to spare", info.Address)
-				err = client.Assign(ctx, info.ID, db.RaftSpare)
-				if err != nil {
-					return "", nil, errors.Wrap(err, "Failed to demote offline node")
-				}
-				currentRaftNodes[i].Role = db.RaftSpare
-			}
-			continue
-		}
-
-		switch info.Role {
-		case db.RaftVoter:
-			voters = append(voters, info.Address)
-		case db.RaftStandBy:
-			standbys = append(standbys, info.Address)
-		case db.RaftSpare:
-			candidates = append(candidates, info.Address)
-		}
-	}
-
-	var role db.RaftRole
+	role, candidates := roles.Adjust(gateway.info.ID)
 
-	if len(voters) < int(maxVoters) && len(currentRaftNodes) >= 3 {
-		role = db.RaftVoter
-		// Include stand-by nodes among the ones that can be promoted,
-		// preferring them over spare ones.
-		candidates = append(standbys, candidates...)
-	} else if len(standbys) < int(maxStandBy) {
-		role = db.RaftStandBy
-	} else {
-		// We're already at full capacity or would have a two-member cluster.
-		return "", nil, nil
+	if role == -1 {
+		// No node to promote
+		return "", nodes, nil
 	}
 
 	// Check if we have a spare node that we can promote to the missing role.
-	address := ""
-	for _, candidate := range candidates {
-		logger.Infof("Found spare node %s to be promoted to %s", candidate, role)
-		address = candidate
-		break
-	}
+	address := candidates[0].Address
+	logger.Infof("Found node %s whose role needs to be changed to %s", address, role)
 
-	if address == "" {
-		// No node to promote
-		return "", currentRaftNodes, nil
-	}
-
-	for i, node := range currentRaftNodes {
+	for i, node := range nodes {
 		if node.Address == address {
-			currentRaftNodes[i].Role = role
+			nodes[i].Role = role
 			break
 		}
 	}
 
-	return address, currentRaftNodes, nil
+	return address, nodes, nil
 }
 
 // Assign a new role to the local dqlite node.
@@ -882,7 +817,7 @@ func newRolesChanges(state *state.State, gateway *Gateway, nodes []db.RaftNode)
 	cluster := map[client.NodeInfo]*client.NodeMetadata{}
 
 	for _, node := range nodes {
-		if hasConnectivity(gateway.cert, node.Address) {
+		if HasConnectivity(gateway.cert, node.Address) {
 			cluster[node] = &client.NodeMetadata{}
 		} else {
 			cluster[node] = nil
diff --git a/lxd/cluster/notify.go b/lxd/cluster/notify.go
index 603f814093..28f5b44dff 100644
--- a/lxd/cluster/notify.go
+++ b/lxd/cluster/notify.go
@@ -73,7 +73,7 @@ func NewNotifier(state *state.State, cert *shared.CertInfo, policy NotifierPolic
 			// enough, let's try to connect to the node, just in
 			// case the heartbeat is lagging behind for some reason
 			// and the node is actually up.
-			if !hasConnectivity(cert, node.Address) {
+			if !HasConnectivity(cert, node.Address) {
 				switch policy {
 				case NotifyAll:
 					return nil, fmt.Errorf("peer node %s is down", node.Address)
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index a5e07476ca..49631e8d8f 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -1673,7 +1673,7 @@ test_clustering_handover() {
 
   LXD_DIR="${LXD_THREE_DIR}" lxc cluster list
 
-  # Respawn the first node, which is now a stand-by, and the second node, which
+  # Respawn the first node, which is now a spare, and the second node, which
   # is still a voter.
   respawn_lxd_cluster_member "${ns1}" "${LXD_ONE_DIR}"
   respawn_lxd_cluster_member "${ns2}" "${LXD_TWO_DIR}"
@@ -1687,14 +1687,13 @@ test_clustering_handover() {
   wait "$pid1"
   wait "$pid2"
 
-  # Wait some time to allow for a leadership change.
-  sleep 10
+  # Bringing back one of them restore the quorum.
+  respawn_lxd_cluster_member "${ns2}" "${LXD_TWO_DIR}"
 
-  # The first node has been promoted back to voter, and since the fourth node is
-  # still up, the cluster is online.
   LXD_DIR="${LXD_ONE_DIR}" lxc cluster list
 
   LXD_DIR="${LXD_ONE_DIR}" lxd shutdown
+  LXD_DIR="${LXD_TWO_DIR}" lxd shutdown
   LXD_DIR="${LXD_FOUR_DIR}" lxd shutdown
   sleep 0.5
   rm -f "${LXD_ONE_DIR}/unix.socket"
@@ -1757,8 +1756,8 @@ test_clustering_rebalance() {
   LXD_DIR="${LXD_ONE_DIR}" lxc config set cluster.offline_threshold 12
   kill -9 "$(cat "${LXD_TWO_DIR}/lxd.pid")"
 
-  # Wait for the second node to be considered offline. We need at most 2 full
-  # hearbeats.
+  # Wait for the second node to be considered offline and be replaced by the
+  # fourth node.
   sleep 25
 
   # The second node is offline and has been demoted.


More information about the lxc-devel mailing list