[lxc-devel] [lxd/master] Plug go-dqlite roles logic into LXD
freeekanayaka on Github
lxc-bot at linuxcontainers.org
Wed Jul 1 07:51:31 UTC 2020
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 951 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20200701/db43763c/attachment-0001.bin>
-------------- next part --------------
From 761015a70981723a15d7445f49ba1950f3f480c6 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 30 Jun 2020 09:25:47 +0200
Subject: [PATCH 1/4] lxd/cluster: Only look up raft_nodes for resolving the
address of node 1
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/gateway.go | 33 ++++++++++++++-------------------
lxd/cluster/membership.go | 10 +++-------
2 files changed, 17 insertions(+), 26 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 42c2ae3eee..4846f14138 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -403,14 +403,11 @@ func (g *Gateway) DialFunc() client.DialFunc {
// Dial function for establishing raft connections.
func (g *Gateway) raftDial() client.DialFunc {
return func(ctx context.Context, address string) (net.Conn, error) {
- if address == "1" {
- addr, err := g.raftAddress(1)
- if err != nil {
- return nil, err
- }
- address = string(addr)
+ nodeAddress, err := g.nodeAddress(address)
+ if err != nil {
+ return nil, err
}
- conn, err := dqliteNetworkDial(ctx, address, g)
+ conn, err := dqliteNetworkDial(ctx, nodeAddress, g)
if err != nil {
return nil, err
}
@@ -480,7 +477,7 @@ func (g *Gateway) TransferLeadership() error {
if server.ID == g.info.ID || server.Role != db.RaftVoter {
continue
}
- address, err := g.raftAddress(server.ID)
+ address, err := g.nodeAddress(server.Address)
if err != nil {
return err
}
@@ -851,27 +848,25 @@ func (g *Gateway) currentRaftNodes() ([]db.RaftNode, error) {
return nil, err
}
for i, server := range servers {
- address, err := g.raftAddress(server.ID)
+ address, err := g.nodeAddress(server.Address)
if err != nil {
- if err != db.ErrNoSuchObject {
- return nil, errors.Wrap(err, "Failed to fetch raft server address")
- }
- // Use the initial address as fallback. This is an edge
- // case that happens when a new leader is elected and
- // its raft_nodes table is not fully up-to-date yet.
- address = server.Address
+ return nil, errors.Wrap(err, "Failed to fetch raft server address")
}
servers[i].Address = address
}
return servers, nil
}
-// Look up a server address in the raft_nodes table.
-func (g *Gateway) raftAddress(databaseID uint64) (string, error) {
+// Translate a raft address to a node address. They are always the same except
+// for the bootstrap node, which has address "1".
+func (g *Gateway) nodeAddress(raftAddress string) (string, error) {
+ if raftAddress != "1" {
+ return raftAddress, nil
+ }
var address string
err := g.db.Transaction(func(tx *db.NodeTx) error {
var err error
- address, err = tx.GetRaftNodeAddress(int64(databaseID))
+ address, err = tx.GetRaftNodeAddress(1)
return err
})
if err != nil {
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 1d260d4295..cf335c3dd1 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -938,13 +938,9 @@ func List(state *state.State, gateway *Gateway) ([]api.ClusterMember, error) {
}
raftRoles := map[string]client.NodeRole{} // Address to role
for _, node := range raftNodes {
- address := node.Address
- if address == "1" {
- addr, err := gateway.raftAddress(1)
- if err != nil {
- return nil, err
- }
- address = string(addr)
+ address, err := gateway.nodeAddress(node.Address)
+ if err != nil {
+ return nil, err
}
raftRoles[address] = node.Role
}
From 292ed405c3e5a2c9bda31ee945896826c232057e Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 30 Jun 2020 10:48:27 +0200
Subject: [PATCH 2/4] lxd/cluster: Leverage RolesChanges.Handover() to choose
handover target
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/.dir-locals.el | 2 +-
lxd/cluster/membership.go | 78 +++++++++++++++++++++++++++++----------
2 files changed, 60 insertions(+), 20 deletions(-)
diff --git a/lxd/.dir-locals.el b/lxd/.dir-locals.el
index 77b750661f..b72bdc482e 100644
--- a/lxd/.dir-locals.el
+++ b/lxd/.dir-locals.el
@@ -1,7 +1,7 @@
;;; Directory Local Variables
;;; For more information see (info "(emacs) Directory Variables")
((go-mode
- . ((go-test-args . "-tags libsqlite3 -timeout 90s")
+ . ((go-test-args . "-tags libsqlite3 -timeout 120s")
(eval
. (set
(make-local-variable 'flycheck-go-build-tags)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index cf335c3dd1..45ee7df16b 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -7,6 +7,7 @@ import (
"path/filepath"
"time"
+ "github.com/canonical/go-dqlite/app"
"github.com/canonical/go-dqlite/client"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/db/cluster"
@@ -839,40 +840,79 @@ func Leave(state *state.State, gateway *Gateway, name string, force bool) (strin
func Handover(state *state.State, gateway *Gateway, address string) (string, []db.RaftNode, error) {
nodes, err := gateway.currentRaftNodes()
if err != nil {
- return "", nil, errors.Wrap(err, "Failed to get current raft nodes")
+ return "", nil, errors.Wrap(err, "Get current raft nodes")
}
- // If the member which is shutting down is not a voter, there's nothing
- // to do.
- found := false
+ var nodeID uint64
for _, node := range nodes {
- if node.Address != address {
- continue
- }
- if node.Role != db.RaftVoter {
- return "", nil, nil
+ if node.Address == address {
+ nodeID = node.ID
}
- found = true
- break
+
}
- if !found {
+ if nodeID == 0 {
return "", nil, errors.Wrapf(err, "No dqlite node has address %s", address)
}
+ roles, err := newRolesChanges(state, gateway, nodes)
+ if err != nil {
+ return "", nil, err
+ }
+ role, candidates := roles.Handover(nodeID)
+
+ if role != db.RaftVoter {
+ return "", nil, nil
+ }
+
for i, node := range nodes {
- if node.Role == db.RaftVoter || node.Address == address {
- continue
+ if node.Address == candidates[0].Address {
+ nodes[i].Role = role
+ return node.Address, nodes, nil
}
- if !hasConnectivity(gateway.cert, node.Address) {
- continue
- }
- nodes[i].Role = db.RaftVoter
- return node.Address, nodes, nil
}
return "", nil, nil
}
+// Build an app.RolesChanges object feeded with the current cluster state.
+func newRolesChanges(state *state.State, gateway *Gateway, nodes []db.RaftNode) (*app.RolesChanges, error) {
+ var maxVoters int
+ var maxStandBy int
+ err := state.Cluster.Transaction(func(tx *db.ClusterTx) error {
+ config, err := ConfigLoad(tx)
+ if err != nil {
+ return errors.Wrap(err, "Load cluster configuration")
+ }
+ maxVoters = int(config.MaxVoters())
+ maxStandBy = int(config.MaxStandBy())
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ cluster := map[client.NodeInfo]*client.NodeMetadata{}
+
+ for _, node := range nodes {
+ if hasConnectivity(gateway.cert, node.Address) {
+ cluster[node] = &client.NodeMetadata{}
+ } else {
+ cluster[node] = nil
+ }
+
+ }
+
+ roles := &app.RolesChanges{
+ Config: app.RolesConfig{
+ Voters: maxVoters,
+ StandBys: maxStandBy,
+ },
+ State: cluster,
+ }
+
+ return roles, nil
+}
+
// Purge removes a node entirely from the cluster database.
func Purge(cluster *db.Cluster, name string) error {
logger.Debugf("Remove node %s from the database", name)
From baae60bbd34e0f7b2bd48f14ecd692f159fae412 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 30 Jun 2020 10:54:46 +0200
Subject: [PATCH 3/4] lxd/cluster: Skip unncessary loading of nodes from
database in Rebalance()
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership.go | 20 ++++----------------
1 file changed, 4 insertions(+), 16 deletions(-)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 45ee7df16b..c2b859bde9 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -495,7 +495,6 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
// Fetch the nodes from the database, to get their last heartbeat
// timestamp and check whether they are offline.
- nodesByAddress := map[string]db.NodeInfo{}
var maxVoters int64
var maxStandBy int64
err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
@@ -505,13 +504,6 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
}
maxVoters = config.MaxVoters()
maxStandBy = config.MaxStandBy()
- nodes, err := tx.GetNodes()
- if err != nil {
- return errors.Wrap(err, "failed to get cluster nodes")
- }
- for _, node := range nodes {
- nodesByAddress[node.Address] = node
- }
return nil
})
if err != nil {
@@ -524,8 +516,7 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
standbys := make([]string, 0)
candidates := make([]string, 0)
for i, info := range currentRaftNodes {
- node := nodesByAddress[info.Address]
- if !hasConnectivity(gateway.cert, node.Address) {
+ if !hasConnectivity(gateway.cert, info.Address) {
if info.Role != db.RaftSpare {
client, err := gateway.getClient()
if err != nil {
@@ -534,8 +525,7 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- logger.Infof(
- "Demote offline node %s (%s) to spare", node.Name, node.Address)
+ logger.Infof("Demote offline node %s to spare", info.Address)
err = client.Assign(ctx, info.ID, db.RaftSpare)
if err != nil {
return "", nil, errors.Wrap(err, "Failed to demote offline node")
@@ -572,10 +562,8 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
// Check if we have a spare node that we can promote to the missing role.
address := ""
for _, candidate := range candidates {
- node := nodesByAddress[candidate]
- logger.Infof(
- "Found spare node %s (%s) to be promoted to %s", node.Name, node.Address, role)
- address = node.Address
+ logger.Infof("Found spare node %s to be promoted to %s", candidate, role)
+ address = candidate
break
}
From d9e7be738732a6ce5e02b585b25eca79df543a54 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 30 Jun 2020 12:51:51 +0200
Subject: [PATCH 4/4] lxd/cluster: Leverage RolesChanges.Adjust() to choose
rebalance target
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/api_cluster.go | 24 +++++++++--
lxd/cluster/connect.go | 4 +-
lxd/cluster/gateway.go | 20 ++++++++-
lxd/cluster/membership.go | 91 ++++++---------------------------------
lxd/cluster/notify.go | 2 +-
test/suites/clustering.sh | 13 +++---
6 files changed, 61 insertions(+), 93 deletions(-)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index fbc61dd979..04bf1c6bb3 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -1199,9 +1199,7 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response
// Check if there's a dqlite node whose role should be changed, and post a
// change role request if so.
func rebalanceMemberRoles(d *Daemon) error {
- logger.Debugf("Rebalance cluster")
-
- // Check if we have a spare node to promote.
+again:
address, nodes, err := cluster.Rebalance(d.State(), d.gateway)
if err != nil {
return err
@@ -1212,13 +1210,31 @@ func rebalanceMemberRoles(d *Daemon) error {
return nil
}
+ // Process demotions of offline nodes immediatelly.
+ for _, node := range nodes {
+ if node.Address != address || node.Role != db.RaftSpare {
+ continue
+ }
+
+ if cluster.HasConnectivity(d.endpoints.NetworkCert(), address) {
+ break
+ }
+
+ err := d.gateway.DemoteOfflineNode(node.ID)
+ if err != nil {
+ return errors.Wrapf(err, "Demote offline node %s", node.Address)
+ }
+
+ goto again
+ }
+
// Tell the node to promote itself.
err = changeMemberRole(d, address, nodes)
if err != nil {
return err
}
- return nil
+ goto again
}
// Post a change role request to the member with the given address. The nodes
diff --git a/lxd/cluster/connect.go b/lxd/cluster/connect.go
index 92eda9bba4..8fdb8a8390 100644
--- a/lxd/cluster/connect.go
+++ b/lxd/cluster/connect.go
@@ -168,8 +168,8 @@ func SetupTrust(cert, targetAddress, targetCert, targetPassword string) error {
return nil
}
-// Probe network connectivity to the member with the given address.
-func hasConnectivity(cert *shared.CertInfo, address string) bool {
+// HasConnectivity probes the member with the given address for connectivity.
+func HasConnectivity(cert *shared.CertInfo, address string) bool {
config, err := tlsClientConfig(cert)
if err != nil {
return false
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 4846f14138..a4a49f57b3 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -481,7 +481,7 @@ func (g *Gateway) TransferLeadership() error {
if err != nil {
return err
}
- if !hasConnectivity(g.cert, address) {
+ if !HasConnectivity(g.cert, address) {
continue
}
id = server.ID
@@ -498,6 +498,24 @@ func (g *Gateway) TransferLeadership() error {
return client.Transfer(ctx, id)
}
+// DemoteOfflineNode force demoting an offline node.
+func (g *Gateway) DemoteOfflineNode(raftID uint64) error {
+ cli, err := g.getClient()
+ if err != nil {
+ return errors.Wrap(err, "Connect to local dqlite node")
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ err = cli.Assign(ctx, raftID, db.RaftSpare)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
// Shutdown this gateway, stopping the gRPC server and possibly the raft factory.
func (g *Gateway) Shutdown() error {
logger.Infof("Stop database gateway")
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index c2b859bde9..2b019c7ed4 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -486,100 +486,35 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
return "", nil, nil
}
- // First get the current raft members, since this method should be
- // called after a node has left.
- currentRaftNodes, err := gateway.currentRaftNodes()
+ nodes, err := gateway.currentRaftNodes()
if err != nil {
- return "", nil, errors.Wrap(err, "failed to get current raft nodes")
+ return "", nil, errors.Wrap(err, "Get current raft nodes")
}
- // Fetch the nodes from the database, to get their last heartbeat
- // timestamp and check whether they are offline.
- var maxVoters int64
- var maxStandBy int64
- err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
- config, err := ConfigLoad(tx)
- if err != nil {
- return errors.Wrap(err, "failed load cluster configuration")
- }
- maxVoters = config.MaxVoters()
- maxStandBy = config.MaxStandBy()
- return nil
- })
+ roles, err := newRolesChanges(state, gateway, nodes)
if err != nil {
return "", nil, err
}
- // Group by role. If a node is offline, we'll try to demote it right
- // away.
- voters := make([]string, 0)
- standbys := make([]string, 0)
- candidates := make([]string, 0)
- for i, info := range currentRaftNodes {
- if !hasConnectivity(gateway.cert, info.Address) {
- if info.Role != db.RaftSpare {
- client, err := gateway.getClient()
- if err != nil {
- return "", nil, errors.Wrap(err, "Failed to connect to local dqlite node")
- }
- defer client.Close()
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- logger.Infof("Demote offline node %s to spare", info.Address)
- err = client.Assign(ctx, info.ID, db.RaftSpare)
- if err != nil {
- return "", nil, errors.Wrap(err, "Failed to demote offline node")
- }
- currentRaftNodes[i].Role = db.RaftSpare
- }
- continue
- }
-
- switch info.Role {
- case db.RaftVoter:
- voters = append(voters, info.Address)
- case db.RaftStandBy:
- standbys = append(standbys, info.Address)
- case db.RaftSpare:
- candidates = append(candidates, info.Address)
- }
- }
-
- var role db.RaftRole
+ role, candidates := roles.Adjust(gateway.info.ID)
- if len(voters) < int(maxVoters) && len(currentRaftNodes) >= 3 {
- role = db.RaftVoter
- // Include stand-by nodes among the ones that can be promoted,
- // preferring them over spare ones.
- candidates = append(standbys, candidates...)
- } else if len(standbys) < int(maxStandBy) {
- role = db.RaftStandBy
- } else {
- // We're already at full capacity or would have a two-member cluster.
- return "", nil, nil
+ if role == -1 {
+ // No node to promote
+ return "", nodes, nil
}
// Check if we have a spare node that we can promote to the missing role.
- address := ""
- for _, candidate := range candidates {
- logger.Infof("Found spare node %s to be promoted to %s", candidate, role)
- address = candidate
- break
- }
+ address := candidates[0].Address
+ logger.Infof("Found node %s whose role needs to be changed to %s", address, role)
- if address == "" {
- // No node to promote
- return "", currentRaftNodes, nil
- }
-
- for i, node := range currentRaftNodes {
+ for i, node := range nodes {
if node.Address == address {
- currentRaftNodes[i].Role = role
+ nodes[i].Role = role
break
}
}
- return address, currentRaftNodes, nil
+ return address, nodes, nil
}
// Assign a new role to the local dqlite node.
@@ -882,7 +817,7 @@ func newRolesChanges(state *state.State, gateway *Gateway, nodes []db.RaftNode)
cluster := map[client.NodeInfo]*client.NodeMetadata{}
for _, node := range nodes {
- if hasConnectivity(gateway.cert, node.Address) {
+ if HasConnectivity(gateway.cert, node.Address) {
cluster[node] = &client.NodeMetadata{}
} else {
cluster[node] = nil
diff --git a/lxd/cluster/notify.go b/lxd/cluster/notify.go
index 603f814093..28f5b44dff 100644
--- a/lxd/cluster/notify.go
+++ b/lxd/cluster/notify.go
@@ -73,7 +73,7 @@ func NewNotifier(state *state.State, cert *shared.CertInfo, policy NotifierPolic
// enough, let's try to connect to the node, just in
// case the heartbeat is lagging behind for some reason
// and the node is actually up.
- if !hasConnectivity(cert, node.Address) {
+ if !HasConnectivity(cert, node.Address) {
switch policy {
case NotifyAll:
return nil, fmt.Errorf("peer node %s is down", node.Address)
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index a5e07476ca..49631e8d8f 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -1673,7 +1673,7 @@ test_clustering_handover() {
LXD_DIR="${LXD_THREE_DIR}" lxc cluster list
- # Respawn the first node, which is now a stand-by, and the second node, which
+ # Respawn the first node, which is now a spare, and the second node, which
# is still a voter.
respawn_lxd_cluster_member "${ns1}" "${LXD_ONE_DIR}"
respawn_lxd_cluster_member "${ns2}" "${LXD_TWO_DIR}"
@@ -1687,14 +1687,13 @@ test_clustering_handover() {
wait "$pid1"
wait "$pid2"
- # Wait some time to allow for a leadership change.
- sleep 10
+ # Bringing back one of them restore the quorum.
+ respawn_lxd_cluster_member "${ns2}" "${LXD_TWO_DIR}"
- # The first node has been promoted back to voter, and since the fourth node is
- # still up, the cluster is online.
LXD_DIR="${LXD_ONE_DIR}" lxc cluster list
LXD_DIR="${LXD_ONE_DIR}" lxd shutdown
+ LXD_DIR="${LXD_TWO_DIR}" lxd shutdown
LXD_DIR="${LXD_FOUR_DIR}" lxd shutdown
sleep 0.5
rm -f "${LXD_ONE_DIR}/unix.socket"
@@ -1757,8 +1756,8 @@ test_clustering_rebalance() {
LXD_DIR="${LXD_ONE_DIR}" lxc config set cluster.offline_threshold 12
kill -9 "$(cat "${LXD_TWO_DIR}/lxd.pid")"
- # Wait for the second node to be considered offline. We need at most 2 full
- # hearbeats.
+ # Wait for the second node to be considered offline and be replaced by the
+ # fourth node.
sleep 25
# The second node is offline and has been demoted.
More information about the lxc-devel
mailing list