[lxc-devel] [lxd/master] Support stand-by cluster members
freeekanayaka on Github
lxc-bot at linuxcontainers.org
Tue Jan 7 16:51:44 UTC 2020
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20200107/5dbcb0bc/attachment-0001.bin>
-------------- next part --------------
From 378ef995bac92be26e5b1b0fe34f414957ff0f8f Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 08:46:21 +0000
Subject: [PATCH 01/15] Fix typo
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/api_cluster.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index bf96d39774..dab233e518 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -800,7 +800,7 @@ func clusterInitMember(d, client lxd.InstanceServer, memberConfig []api.ClusterM
}
// Perform a request to the /internal/cluster/accept endpoint to check if a new
-// mode can be accepted into the cluster and obtain joining information such as
+// node can be accepted into the cluster and obtain joining information such as
// the cluster private certificate.
func clusterAcceptMember(
client lxd.InstanceServer,
From bfd2d4060566f00a82cbe1d307d57274a67885b4 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 09:37:47 +0000
Subject: [PATCH 02/15] Don't add accepted members to the local raft_nodes
table
It will be populated by heartbeats. Also, use the ID from the nodes table in the
cluster database, so Raft ID and Member ID will match.
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership.go | 16 ++++------------
1 file changed, 4 insertions(+), 12 deletions(-)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index f2bd48eb72..46c8661f3c 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -165,6 +165,8 @@ func Accept(state *state.State, gateway *Gateway, name, address string, schema,
return nil, fmt.Errorf("node address must not be empty")
}
+ var id int64
+
// Insert the new node into the nodes table.
err := state.Cluster.Transaction(func(tx *db.ClusterTx) error {
// Check that the node can be accepted with these parameters.
@@ -181,7 +183,7 @@ func Accept(state *state.State, gateway *Gateway, name, address string, schema,
}
// Add the new node
- id, err := tx.NodeAddWithArch(name, address, arch)
+ id, err = tx.NodeAddWithArch(name, address, arch)
if err != nil {
return errors.Wrap(err, "Failed to insert new node into the database")
}
@@ -211,17 +213,7 @@ func Accept(state *state.State, gateway *Gateway, name, address string, schema,
return nil, errors.Wrap(err, "Fetch cluster members count")
}
if count != 2 && len(nodes) < membershipMaxRaftNodes {
- err = state.Node.Transaction(func(tx *db.NodeTx) error {
- id, err := tx.RaftNodeAdd(address)
- if err != nil {
- return err
- }
- nodes = append(nodes, db.RaftNode{ID: id, Address: address})
- return nil
- })
- if err != nil {
- return nil, errors.Wrap(err, "Failed to insert new node into raft_nodes")
- }
+ nodes = append(nodes, db.RaftNode{ID: id, Address: address})
}
return nodes, nil
From 3548d27cd87fef8de1c013a8347420aead451f10 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 09:40:18 +0000
Subject: [PATCH 03/15] Fix TestAccept test: the second member does *not* get
the db role
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership_test.go | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index 5462f7fc3e..37c28f8dde 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -210,8 +210,9 @@ func TestAccept_UnmetPreconditions(t *testing.T) {
}
}
-// When a node gets accepted, it gets included in the raft nodes.
-func TestAccept(t *testing.T) {
+// When there is only one cluster member and a new member gets accepted, the
+// new member doesn't immediately get the database role.
+func TestAccept_ClusterHasOneMember(t *testing.T) {
state, cleanup := state.NewTestState(t)
defer cleanup()
@@ -221,16 +222,14 @@ func TestAccept(t *testing.T) {
f := &membershipFixtures{t: t, state: state}
f.RaftNode("1.2.3.4:666")
- f.ClusterNode("1.2.3.4:666")
+ f.ClusterFirstNode("1.2.3.4:666")
nodes, err := cluster.Accept(
state, gateway, "buzz", "5.6.7.8:666", cluster.SchemaVersion, len(version.APIExtensions))
assert.NoError(t, err)
- assert.Len(t, nodes, 2)
+ require.Len(t, nodes, 1)
assert.Equal(t, int64(1), nodes[0].ID)
- assert.Equal(t, int64(2), nodes[1].ID)
assert.Equal(t, "1.2.3.4:666", nodes[0].Address)
- assert.Equal(t, "5.6.7.8:666", nodes[1].Address)
}
func TestJoin(t *testing.T) {
@@ -492,3 +491,11 @@ func (h *membershipFixtures) ClusterNode(address string) {
})
require.NoError(h.t, err)
}
+
+// Set the address of the first node.
+func (h *membershipFixtures) ClusterFirstNode(address string) {
+ err := h.state.Cluster.Transaction(func(tx *db.ClusterTx) error {
+ return tx.NodeUpdate(1, "none", address)
+ })
+ require.NoError(h.t, err)
+}
From 930cc134828380fbb3aae877c1233834dd14a0ff Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 12:17:36 +0000
Subject: [PATCH 04/15] Gateway.currentRaftNodes: return only voter nodes
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/gateway.go | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index f16705ec43..38747ffc6f 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -705,19 +705,22 @@ func (g *Gateway) currentRaftNodes() ([]db.RaftNode, error) {
if !isLeader {
return nil, errNotLeader
}
- client, err := g.getClient()
+ cli, err := g.getClient()
if err != nil {
return nil, err
}
- defer client.Close()
+ defer cli.Close()
- servers, err := client.Cluster(context.Background())
+ servers, err := cli.Cluster(context.Background())
if err != nil {
return nil, err
}
provider := raftAddressProvider{db: g.db}
nodes := make([]db.RaftNode, len(servers))
for i, server := range servers {
+ if server.Role != client.Voter {
+ continue
+ }
address, err := provider.ServerAddr(int(server.ID))
if err != nil {
if err != db.ErrNoSuchObject {
From 29eb4f3830e632fe16b28a291c7b0ed80dabefe1 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 12:22:51 +0000
Subject: [PATCH 05/15] Rename currentRaftNodes to currentRaftVoters
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/gateway.go | 2 +-
lxd/cluster/gateway_export_test.go | 2 +-
lxd/cluster/heartbeat.go | 2 +-
lxd/cluster/membership.go | 4 ++--
4 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 38747ffc6f..574e05cacb 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -690,7 +690,7 @@ var errNotLeader = fmt.Errorf("Not leader")
// Return information about the LXD nodes that a currently part of the raft
// cluster, as configured in the raft log. It returns an error if this node is
// not the leader.
-func (g *Gateway) currentRaftNodes() ([]db.RaftNode, error) {
+func (g *Gateway) currentRaftVoters() ([]db.RaftNode, error) {
g.lock.RLock()
defer g.lock.RUnlock()
diff --git a/lxd/cluster/gateway_export_test.go b/lxd/cluster/gateway_export_test.go
index 2ee96d5bc1..88887f774d 100644
--- a/lxd/cluster/gateway_export_test.go
+++ b/lxd/cluster/gateway_export_test.go
@@ -17,5 +17,5 @@ func (g *Gateway) Cert() *shared.CertInfo {
// RaftNodes returns the nodes currently part of the raft cluster.
func (g *Gateway) RaftNodes() ([]db.RaftNode, error) {
- return g.currentRaftNodes()
+ return g.currentRaftVoters()
}
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index 6a8c1c0cd0..273a9c2555 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -191,7 +191,7 @@ func (g *Gateway) heartbeat(ctx context.Context, initialHeartbeat bool) {
return
}
- raftNodes, err := g.currentRaftNodes()
+ raftNodes, err := g.currentRaftVoters()
if err == errNotLeader {
return
}
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 46c8661f3c..88d2137f31 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -204,7 +204,7 @@ func Accept(state *state.State, gateway *Gateway, name, address string, schema,
// Possibly insert the new node into the raft_nodes table (if we have
// less than 3 database nodes).
- nodes, err := gateway.currentRaftNodes()
+ nodes, err := gateway.currentRaftVoters()
if err != nil {
return nil, errors.Wrap(err, "Failed to get raft nodes from the log")
}
@@ -481,7 +481,7 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, error) {
// First get the current raft members, since this method should be
// called after a node has left.
- currentRaftNodes, err := gateway.currentRaftNodes()
+ currentRaftNodes, err := gateway.currentRaftVoters()
if err != nil {
return "", nil, errors.Wrap(err, "failed to get current raft nodes")
}
From 51f33a1200b2b88eacdbe0686ba745eefb59bfd6 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 12:36:57 +0000
Subject: [PATCH 06/15] Extract currentDqliteNodes from currentRaftVoters
currentDqliteNodes return all dqlite nodes, not only voting ones
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/gateway.go | 36 ++++++++++++++++++++++++++----------
1 file changed, 26 insertions(+), 10 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 574e05cacb..88f2526d91 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -687,10 +687,9 @@ func (g *Gateway) isLeader() (bool, error) {
// Internal error signalling that a node not the leader.
var errNotLeader = fmt.Errorf("Not leader")
-// Return information about the LXD nodes that a currently part of the raft
-// cluster, as configured in the raft log. It returns an error if this node is
-// not the leader.
-func (g *Gateway) currentRaftVoters() ([]db.RaftNode, error) {
+// Return all current dqlite nodes. Each node has either the Voter, StandBy or
+// Spare role.
+func (g *Gateway) currentDqliteNodes() ([]client.NodeInfo, error) {
g.lock.RLock()
defer g.lock.RUnlock()
@@ -705,6 +704,7 @@ func (g *Gateway) currentRaftVoters() ([]db.RaftNode, error) {
if !isLeader {
return nil, errNotLeader
}
+
cli, err := g.getClient()
if err != nil {
return nil, err
@@ -716,11 +716,7 @@ func (g *Gateway) currentRaftVoters() ([]db.RaftNode, error) {
return nil, err
}
provider := raftAddressProvider{db: g.db}
- nodes := make([]db.RaftNode, len(servers))
for i, server := range servers {
- if server.Role != client.Voter {
- continue
- }
address, err := provider.ServerAddr(int(server.ID))
if err != nil {
if err != db.ErrNoSuchObject {
@@ -731,8 +727,28 @@ func (g *Gateway) currentRaftVoters() ([]db.RaftNode, error) {
// its raft_nodes table is not fully up-to-date yet.
address = server.Address
}
- nodes[i].ID = int64(server.ID)
- nodes[i].Address = string(address)
+ servers[i].Address = string(address)
+ }
+ return servers, nil
+}
+
+// Return information about the LXD nodes that a currently part of the raft
+// cluster, as configured in the raft log. It returns an error if this node is
+// not the leader.
+func (g *Gateway) currentRaftVoters() ([]db.RaftNode, error) {
+ servers, err := g.currentDqliteNodes()
+ if err != nil {
+ return nil, err
+ }
+ nodes := make([]db.RaftNode, 0)
+ for _, server := range servers {
+ if server.Role != client.Voter {
+ continue
+ }
+ node := db.RaftNode{}
+ node.ID = int64(server.ID)
+ node.Address = string(server.Address)
+ nodes = append(nodes, node)
}
return nodes, nil
}
From cdf295ada8b030d526e15f1825dc519ba95999f8 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 12:38:26 +0000
Subject: [PATCH 07/15] Add Role field to the internalRaftNode struct
It will be used by the accepting node to communicate to the joining node what
role it should have.
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/api_cluster.go | 1 +
1 file changed, 1 insertion(+)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index dab233e518..10e762e030 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -1082,6 +1082,7 @@ type internalClusterPostAcceptResponse struct {
type internalRaftNode struct {
ID int64 `json:"id" yaml:"id"`
Address string `json:"address" yaml:"address"`
+ Role int `json:"role" yaml:"role"`
}
// Used to update the cluster after a database node has been removed, and
From 64dea5bb937111e96f11b574012412e27b0469d5 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:28:48 +0000
Subject: [PATCH 08/15] Gateway.Accept: return a full list of nodes, not only
voters
This way we can account for different roles to assign to joining nodes
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership.go | 27 ++++++++++++++++-----------
1 file changed, 16 insertions(+), 11 deletions(-)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 88d2137f31..30a41ff4ae 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -5,7 +5,6 @@ import (
"fmt"
"os"
"path/filepath"
- "strconv"
"time"
"github.com/canonical/go-dqlite/client"
@@ -156,7 +155,7 @@ func Bootstrap(state *state.State, gateway *Gateway, name string) error {
//
// Return an updated list raft database nodes (possibly including the newly
// accepted node).
-func Accept(state *state.State, gateway *Gateway, name, address string, schema, api int) ([]db.RaftNode, error) {
+func Accept(state *state.State, gateway *Gateway, name, address string, schema, api int) ([]client.NodeInfo, error) {
// Check parameters
if name == "" {
return nil, fmt.Errorf("node name must not be empty")
@@ -202,20 +201,26 @@ func Accept(state *state.State, gateway *Gateway, name, address string, schema,
return nil, err
}
- // Possibly insert the new node into the raft_nodes table (if we have
- // less than 3 database nodes).
- nodes, err := gateway.currentRaftVoters()
+ // Get the current nodes and figure what database role the new node
+ // should have.
+ nodes, err := gateway.currentDqliteNodes()
if err != nil {
return nil, errors.Wrap(err, "Failed to get raft nodes from the log")
}
- count, err := Count(state)
- if err != nil {
- return nil, errors.Wrap(err, "Fetch cluster members count")
+ voters := 0
+ for _, node := range nodes {
+ if node.Role == client.Voter {
+ voters++
+ }
}
- if count != 2 && len(nodes) < membershipMaxRaftNodes {
- nodes = append(nodes, db.RaftNode{ID: id, Address: address})
+ node := client.NodeInfo{ID: uint64(id), Address: address}
+ count := len(nodes) + 1 // Total nodes count
+ if count != 2 && voters < membershipMaxRaftNodes {
+ node.Role = client.Voter
+ } else {
+ node.Role = client.Spare
}
-
+ nodes = append(nodes, node)
return nodes, nil
}
From b9dd3ba3eb8ca4cfeb96a9bd1c2b3cafbac2ab30 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:30:14 +0000
Subject: [PATCH 09/15] Gateway.Join: always add the joining node to the raft
configuration
The initial role is defined by the list returned by Gateway.Accept().
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership.go | 77 +++++++++++++++++++++------------------
1 file changed, 42 insertions(+), 35 deletions(-)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 30a41ff4ae..cd708d84fa 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -231,12 +231,20 @@ func Accept(state *state.State, gateway *Gateway, name, address string, schema,
//
// The cert parameter must contain the keypair/CA material of the cluster being
// joined.
-func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name string, raftNodes []db.RaftNode) error {
+func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name string, nodes []client.NodeInfo) error {
// Check parameters
if name == "" {
return fmt.Errorf("node name must not be empty")
}
+ // Figure out which are the voters.
+ voters := make([]db.RaftNode, 0)
+ for _, node := range nodes {
+ if node.Role == client.Voter {
+ voters = append(voters, db.RaftNode{ID: int64(node.ID), Address: node.Address})
+ }
+ }
+
var address string
err := state.Node.Transaction(func(tx *db.NodeTx) error {
// Fetch current network address and raft nodes
@@ -253,7 +261,7 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
}
// Set the raft nodes list to the one that was returned by Accept().
- err = tx.RaftNodesReplace(raftNodes)
+ err = tx.RaftNodesReplace(voters)
if err != nil {
return errors.Wrap(err, "failed to set raft nodes")
}
@@ -327,40 +335,39 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
return errors.Wrap(err, "failed to re-initialize gRPC SQL gateway")
}
- // If we are listed among the database nodes, join the raft cluster.
- id := ""
- var target *db.RaftNode
- for _, node := range raftNodes {
+ // Figure out our role.
+ var info *client.NodeInfo
+ var target *client.NodeInfo
+ for i, node := range nodes {
if node.Address == address {
- id = strconv.Itoa(int(node.ID))
+ info = &nodes[i]
} else {
- target = &node
+ target = &nodes[i]
}
}
- if id != "" {
- if target == nil {
- panic("no other node found")
- }
- logger.Info(
- "Joining dqlite raft cluster",
- log15.Ctx{"id": id, "address": address, "target": target.Address})
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- client, err := client.FindLeader(
- ctx, gateway.NodeStore(),
- client.WithDialFunc(gateway.raftDial()),
- client.WithLogFunc(DqliteLog),
- )
- if err != nil {
- return errors.Wrap(err, "Failed to connect to cluster leader")
- }
- defer client.Close()
- err = client.Add(ctx, gateway.raft.info)
- if err != nil {
- return errors.Wrap(err, "Failed to join cluster")
- }
- } else {
- logger.Info("Joining cluster as non-database node")
+ if info == nil {
+ panic("joining node found")
+ }
+ if target == nil {
+ panic("no other node found")
+ }
+ logger.Info(
+ "Joining dqlite cluster",
+ log15.Ctx{"id": info.ID, "address": address, "target": target.Address, "role": info.Role})
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ cli, err := client.FindLeader(
+ ctx, gateway.NodeStore(),
+ client.WithDialFunc(gateway.raftDial()),
+ client.WithLogFunc(DqliteLog),
+ )
+ if err != nil {
+ return errors.Wrap(err, "Failed to connect to cluster leader")
+ }
+ defer cli.Close()
+ err = cli.Add(ctx, *info)
+ if err != nil {
+ return errors.Wrap(err, "Failed to join cluster")
}
// Make sure we can actually connect to the cluster database through
@@ -450,7 +457,7 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
}
// Update our role list if needed.
- if id != "" {
+ if info.Role == client.Voter {
err = tx.NodeAddRole(node.ID, db.ClusterRoleDatabase)
if err != nil {
return errors.Wrapf(err, "Failed to add database role for the node")
@@ -459,10 +466,10 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
// Generate partial heartbeat request containing just a raft node list.
hbState := &APIHeartbeat{}
- hbState.Update(false, raftNodes, []db.NodeInfo{}, offlineThreshold)
+ hbState.Update(false, voters, []db.NodeInfo{}, offlineThreshold)
// Attempt to send a heartbeat to all other raft nodes to notify them of new node.
- for _, raftNode := range raftNodes {
+ for _, raftNode := range voters {
if raftNode.ID == node.ID {
continue
}
From f9c5eb8c5a6df16366ae9047e99e2b7c181222db Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:34:19 +0000
Subject: [PATCH 10/15] Gateway.Rebalance: look at the full list of nodes, not
only voters
Later on we'll also promote/demote to stand-by or spare.
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership.go | 41 +++++++++++++++++----------------------
1 file changed, 18 insertions(+), 23 deletions(-)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index cd708d84fa..3c9485acf7 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -490,23 +490,26 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
//
// If there's such spare node, return its address as well as the new list of
// raft nodes.
-func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, error) {
+func Rebalance(state *state.State, gateway *Gateway) (string, []client.NodeInfo, error) {
// First get the current raft members, since this method should be
// called after a node has left.
- currentRaftNodes, err := gateway.currentRaftVoters()
+ currentNodes, err := gateway.currentDqliteNodes()
if err != nil {
return "", nil, errors.Wrap(err, "failed to get current raft nodes")
}
- if len(currentRaftNodes) >= membershipMaxRaftNodes || len(currentRaftNodes) == 1 {
+
+ // Find the addresses of the voters
+ voters := []string{}
+ for _, node := range currentNodes {
+ if node.Role == client.Voter {
+ voters = append(voters, node.Address)
+ }
+ }
+ if len(voters) >= membershipMaxRaftNodes || len(currentNodes) == 2 {
// We're already at full capacity or would have a two-member cluster.
return "", nil, nil
}
- currentRaftAddresses := make([]string, len(currentRaftNodes))
- for i, node := range currentRaftNodes {
- currentRaftAddresses[i] = node.Address
- }
-
// Check if we have a spare node that we can turn into a database one.
address := ""
err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
@@ -520,7 +523,7 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
}
// Find a node that is not part of the raft cluster yet.
for _, node := range nodes {
- if shared.StringInSlice(node.Address, currentRaftAddresses) {
+ if shared.StringInSlice(node.Address, voters) {
continue // This is already a database node
}
if node.IsOffline(config.OfflineThreshold()) {
@@ -540,25 +543,17 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
if address == "" {
// No node to promote
- return "", currentRaftNodes, nil
+ return "", currentNodes, nil
}
- // Figure out the next ID in the raft_nodes table
- var updatedRaftNodes []db.RaftNode
- err = gateway.db.Transaction(func(tx *db.NodeTx) error {
- id, err := tx.RaftNodeAdd(address)
- if err != nil {
- return errors.Wrap(err, "Failed to add new raft node")
+ // Update the node role.
+ for i := range currentNodes {
+ if currentNodes[i].Address == address {
+ currentNodes[i].Role = client.Voter
}
-
- updatedRaftNodes = append(currentRaftNodes, db.RaftNode{ID: id, Address: address})
- return nil
- })
- if err != nil {
- return "", nil, err
}
- return address, updatedRaftNodes, nil
+ return address, currentNodes, nil
}
// Promote makes a LXD node which is not a database node, become part of the
From ede83b840707d9bb890021478fe94c3b7c008638 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:35:21 +0000
Subject: [PATCH 11/15] Gateway.Promote: use client.Assign() instead of
client.Add()
The node being promoted is already in the raft configuration, only its role
needs to be changed.
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership.go | 24 +++++++++++++++---------
1 file changed, 15 insertions(+), 9 deletions(-)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 3c9485acf7..eb1a134d75 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -558,7 +558,7 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []client.NodeInfo,
// Promote makes a LXD node which is not a database node, become part of the
// raft cluster.
-func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
+func Promote(state *state.State, gateway *Gateway, nodes []client.NodeInfo) error {
logger.Info("Promote node to database node")
// Sanity check that this is not already a database node
@@ -587,11 +587,11 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
// Figure out our raft node ID, and an existing target raft node that
// we'll contact to add ourselves as member.
- id := int64(-1)
+ var info *client.NodeInfo
target := ""
- for _, node := range nodes {
+ for i, node := range nodes {
if node.Address == address {
- id = node.ID
+ info = &nodes[i]
} else {
target = node.Address
}
@@ -599,15 +599,21 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
// Sanity check that our address was actually included in the given
// list of raft nodes.
- if id == -1 {
+ if info == nil {
return fmt.Errorf("this node is not included in the given list of database nodes")
}
// Replace our local list of raft nodes with the given one (which
// includes ourselves). This will make the gateway start a raft node
// when restarted.
+ voters := []db.RaftNode{}
+ for _, node := range nodes {
+ if node.Role == client.Voter {
+ voters = append(voters, db.RaftNode{ID: int64(node.ID), Address: node.Address})
+ }
+ }
err = state.Node.Transaction(func(tx *db.NodeTx) error {
- err = tx.RaftNodesReplace(nodes)
+ err = tx.RaftNodesReplace(voters)
if err != nil {
return errors.Wrap(err, "failed to set raft nodes")
}
@@ -642,7 +648,7 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
logger.Info(
"Joining dqlite raft cluster",
- log15.Ctx{"id": id, "address": address, "target": target})
+ log15.Ctx{"id": info.ID, "address": info.Address, "target": target, "role": info.Role})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@@ -653,9 +659,9 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
}
defer client.Close()
- err = client.Add(ctx, gateway.raft.info)
+ err = client.Assign(ctx, info.ID, info.Role)
if err != nil {
- return errors.Wrap(err, "Failed to join cluster")
+ return errors.Wrap(err, "Failed to promote node")
}
// Unlock regular access to our cluster database and add the database role.
From 7900dde15c48cff555b23d0b07f12146f22c6991 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:36:37 +0000
Subject: [PATCH 12/15] Drop legacy test
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership_test.go | 78 ----------------------------------
1 file changed, 78 deletions(-)
diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index 37c28f8dde..555681cd72 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -356,84 +356,6 @@ func TestJoin(t *testing.T) {
assert.Len(t, members, 1)
}
-func FLAKY_TestPromote(t *testing.T) {
- // Setup a target node running as leader of a cluster.
- targetCert := shared.TestingKeyPair()
- targetMux := http.NewServeMux()
- targetServer := newServer(targetCert, targetMux)
- defer targetServer.Close()
-
- targetState, cleanup := state.NewTestState(t)
- defer cleanup()
-
- targetGateway := newGateway(t, targetState.Node, targetCert)
- defer targetGateway.Shutdown()
-
- for path, handler := range targetGateway.HandlerFuncs(nil) {
- targetMux.HandleFunc(path, handler)
- }
-
- targetAddress := targetServer.Listener.Addr().String()
- var err error
- require.NoError(t, targetState.Cluster.Close())
- store := targetGateway.NodeStore()
- dialFunc := targetGateway.DialFunc()
- targetState.Cluster, err = db.OpenCluster(
- "db.bin", store, targetAddress, "/unused/db/dir", 5*time.Second, nil,
- driver.WithDialFunc(dialFunc))
- require.NoError(t, err)
- targetF := &membershipFixtures{t: t, state: targetState}
- targetF.ClusterAddress(targetAddress)
-
- err = cluster.Bootstrap(targetState, targetGateway, "buzz")
- require.NoError(t, err)
-
- // Setup a node to be promoted.
- mux := http.NewServeMux()
- server := newServer(targetCert, mux) // Use the same cert, as we're already part of the cluster
- defer server.Close()
-
- state, cleanup := state.NewTestState(t)
- defer cleanup()
-
- address := server.Listener.Addr().String()
- targetF.ClusterNode(address) // Add the non database node to the cluster database
- f := &membershipFixtures{t: t, state: state}
- f.ClusterAddress(address)
- f.RaftNode(targetAddress) // Insert the leader in our local list of database nodes
-
- gateway := newGateway(t, state.Node, targetCert)
- defer gateway.Shutdown()
-
- for path, handler := range gateway.HandlerFuncs(nil) {
- mux.HandleFunc(path, handler)
- }
-
- // Promote the node.
- targetF.RaftNode(address) // Add the address of the node to be promoted in the leader's db
- raftNodes := targetF.RaftNodes()
- err = cluster.Promote(state, gateway, raftNodes)
- require.NoError(t, err)
-
- // The leader now returns an updated list of raft nodes.
- raftNodes, err = targetGateway.RaftNodes()
- require.NoError(t, err)
- assert.Len(t, raftNodes, 2)
- assert.Equal(t, int64(1), raftNodes[0].ID)
- assert.Equal(t, targetAddress, raftNodes[0].Address)
- assert.Equal(t, int64(2), raftNodes[1].ID)
- assert.Equal(t, address, raftNodes[1].Address)
-
- // The List function returns all nodes in the cluster.
- nodes, err := cluster.List(state)
- require.NoError(t, err)
- assert.Len(t, nodes, 2)
- assert.Equal(t, "Online", nodes[0].Status)
- assert.Equal(t, "Online", nodes[1].Status)
- assert.True(t, nodes[0].Database)
- assert.True(t, nodes[1].Database)
-}
-
// Helper for setting fixtures for Bootstrap tests.
type membershipFixtures struct {
t *testing.T
From 91f54f26e57736e8a429c749213bc8b31ffb35c5 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:36:44 +0000
Subject: [PATCH 13/15] Change cluster unit tests to cope with non-voters
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership_test.go | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index 555681cd72..05a16e272e 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -8,6 +8,7 @@ import (
"testing"
"time"
+ client "github.com/canonical/go-dqlite/client"
"github.com/canonical/go-dqlite/driver"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
@@ -227,9 +228,13 @@ func TestAccept_ClusterHasOneMember(t *testing.T) {
nodes, err := cluster.Accept(
state, gateway, "buzz", "5.6.7.8:666", cluster.SchemaVersion, len(version.APIExtensions))
assert.NoError(t, err)
- require.Len(t, nodes, 1)
- assert.Equal(t, int64(1), nodes[0].ID)
+ require.Len(t, nodes, 2)
+ assert.Equal(t, uint64(1), nodes[0].ID)
assert.Equal(t, "1.2.3.4:666", nodes[0].Address)
+ assert.Equal(t, client.Voter, nodes[0].Role)
+ assert.Equal(t, uint64(2), nodes[1].ID)
+ assert.Equal(t, "5.6.7.8:666", nodes[1].Address)
+ assert.Equal(t, client.Spare, nodes[1].Role)
}
func TestJoin(t *testing.T) {
@@ -314,11 +319,11 @@ func TestJoin(t *testing.T) {
// The leader now returns an updated list of raft nodes.
// The new node is not included to ensure distributed consensus.
- raftNodes, err = targetGateway.RaftNodes()
+ newNodes, err := targetGateway.RaftNodes()
require.NoError(t, err)
- assert.Len(t, raftNodes, 1)
- assert.Equal(t, int64(1), raftNodes[0].ID)
- assert.Equal(t, targetAddress, raftNodes[0].Address)
+ assert.Len(t, newNodes, 1)
+ assert.Equal(t, int64(1), newNodes[0].ID)
+ assert.Equal(t, targetAddress, newNodes[0].Address)
// The List function returns all nodes in the cluster.
nodes, err := cluster.List(state)
From 389bd78fc1f2c10a12460c0c68953ed467f99f39 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 14:37:35 +0000
Subject: [PATCH 14/15] internal cluster APIs: exchange role information when
joining
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/api_cluster.go | 38 +++++++++++++++++++++++++-------------
1 file changed, 25 insertions(+), 13 deletions(-)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 10e762e030..2ea7359473 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -12,6 +12,7 @@ import (
"strings"
"time"
+ "github.com/canonical/go-dqlite/client"
dqlitedriver "github.com/canonical/go-dqlite/driver"
"github.com/gorilla/mux"
"github.com/pkg/errors"
@@ -371,7 +372,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
}
// Connect to the target cluster node.
- client, err := lxd.ConnectLXD(fmt.Sprintf("https://%s", req.ClusterAddress), args)
+ cli, err := lxd.ConnectLXD(fmt.Sprintf("https://%s", req.ClusterAddress), args)
if err != nil {
return err
}
@@ -387,7 +388,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
return errors.Wrap(err, "Failed to connect to local LXD")
}
- err = clusterInitMember(d, client, req.MemberConfig)
+ err = clusterInitMember(d, cli, req.MemberConfig)
if err != nil {
return errors.Wrap(err, "Failed to initialize member")
}
@@ -425,7 +426,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
// Now request for this node to be added to the list of cluster nodes.
info, err := clusterAcceptMember(
- client, req.ServerName, address, cluster.SchemaVersion,
+ cli, req.ServerName, address, cluster.SchemaVersion,
version.APIExtensionsCount(), pools, networks)
if err != nil {
return errors.Wrap(err, "Failed request to add member")
@@ -444,10 +445,11 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
// Update local setup and possibly join the raft dqlite
// cluster.
- nodes := make([]db.RaftNode, len(info.RaftNodes))
+ nodes := make([]client.NodeInfo, len(info.RaftNodes))
for i, node := range info.RaftNodes {
- nodes[i].ID = node.ID
+ nodes[i].ID = uint64(node.ID)
nodes[i].Address = node.Address
+ nodes[i].Role = client.NodeRole(node.Role)
}
// The default timeout when non-clustered is one minute, let's
@@ -596,7 +598,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
for f, ps := range imageProjectInfo {
go func(fingerprint string, projects []string) {
- err := imageImport(client, fingerprint, projects)
+ err := imageImport(cli, fingerprint, projects)
if err != nil {
logger.Errorf("Failed to import an image %s from %s: %v", fingerprint, leader, err)
}
@@ -607,12 +609,12 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
// Add the cluster flag from the agent
version.UserAgentFeatures([]string{"cluster"})
- client, err = cluster.Connect(req.ClusterAddress, d.endpoints.NetworkCert(), true)
+ cli, err = cluster.Connect(req.ClusterAddress, d.endpoints.NetworkCert(), true)
if err != nil {
return err
}
- err = clusterRebalance(client)
+ err = clusterRebalance(cli)
if err != nil {
return err
}
@@ -1056,8 +1058,9 @@ func internalClusterPostAccept(d *Daemon, r *http.Request) response.Response {
PrivateKey: d.endpoints.NetworkPrivateKey(),
}
for i, node := range nodes {
- accepted.RaftNodes[i].ID = node.ID
+ accepted.RaftNodes[i].ID = int64(node.ID)
accepted.RaftNodes[i].Address = node.Address
+ accepted.RaftNodes[i].Role = int(node.Role)
}
return response.SyncResponse(true, accepted)
}
@@ -1133,8 +1136,15 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response
return response.InternalError(err)
}
+ voters := []db.RaftNode{}
+ for _, node := range nodes {
+ if node.Role == client.Voter {
+ voters = append(voters, db.RaftNode{ID: int64(node.ID), Address: node.Address})
+ }
+ }
+
hbState := &cluster.APIHeartbeat{}
- hbState.Update(false, nodes, []db.NodeInfo{}, offlineThreshold)
+ hbState.Update(false, voters, []db.NodeInfo{}, offlineThreshold)
cert, err := util.LoadCert(d.os.VarDir)
if err != nil {
@@ -1156,8 +1166,9 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response
post := &internalClusterPostPromoteRequest{}
for _, node := range nodes {
post.RaftNodes = append(post.RaftNodes, internalRaftNode{
- ID: node.ID,
+ ID: int64(node.ID),
Address: node.Address,
+ Role: int(node.Role),
})
}
@@ -1190,10 +1201,11 @@ func internalClusterPostPromote(d *Daemon, r *http.Request) response.Response {
return response.BadRequest(fmt.Errorf("No raft members provided"))
}
- nodes := make([]db.RaftNode, len(req.RaftNodes))
+ nodes := make([]client.NodeInfo, len(req.RaftNodes))
for i, node := range req.RaftNodes {
- nodes[i].ID = node.ID
+ nodes[i].ID = uint64(node.ID)
nodes[i].Address = node.Address
+ nodes[i].Role = client.NodeRole(node.Role)
}
err = cluster.Promote(d.State(), d.gateway, nodes)
if err != nil {
From 4b0822acae9dd16329758739047110c58223170d Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 7 Jan 2020 16:47:53 +0000
Subject: [PATCH 15/15] Fix notification of new voters
The heartbeat that was sent by joining nodes was empty. Also, we send the
heartbeat notification also upon promotion of an existing node.
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership.go | 40 +++++++++++++++++++++++++--------------
1 file changed, 26 insertions(+), 14 deletions(-)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index eb1a134d75..998e8538a3 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -464,17 +464,7 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
}
}
- // Generate partial heartbeat request containing just a raft node list.
- hbState := &APIHeartbeat{}
- hbState.Update(false, voters, []db.NodeInfo{}, offlineThreshold)
-
- // Attempt to send a heartbeat to all other raft nodes to notify them of new node.
- for _, raftNode := range voters {
- if raftNode.ID == node.ID {
- continue
- }
- go HeartbeatNode(context.Background(), raftNode.Address, cert, hbState)
- }
+ notifyNewVoter(voters, node.ID, cert)
return nil
})
@@ -485,6 +475,24 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
return nil
}
+// Attempt to send a heartbeat to all other voters to notify them of the new voter.
+func notifyNewVoter(voters []db.RaftNode, id int64, cert *shared.CertInfo) {
+ // Generate partial heartbeat request containing just a raft node list.
+ hbState := &APIHeartbeat{}
+ nodes := make([]db.NodeInfo, len(voters))
+ for i, voter := range voters {
+ nodes[i].ID = voter.ID
+ nodes[i].Address = voter.Address
+ }
+ hbState.Update(false, voters, nodes, 0)
+ for _, voter := range voters {
+ if voter.ID == id {
+ continue
+ }
+ go HeartbeatNode(context.Background(), voter.Address, cert, hbState)
+ }
+}
+
// Rebalance the raft cluster, trying to see if we have a spare online node
// that we can promote to database node if we are below membershipMaxRaftNodes.
//
@@ -653,13 +661,13 @@ func Promote(state *state.State, gateway *Gateway, nodes []client.NodeInfo) erro
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- client, err := client.FindLeader(ctx, gateway.NodeStore(), client.WithDialFunc(gateway.raftDial()))
+ cli, err := client.FindLeader(ctx, gateway.NodeStore(), client.WithDialFunc(gateway.raftDial()))
if err != nil {
return errors.Wrap(err, "Failed to connect to cluster leader")
}
- defer client.Close()
+ defer cli.Close()
- err = client.Assign(ctx, info.ID, info.Role)
+ err = cli.Assign(ctx, info.ID, info.Role)
if err != nil {
return errors.Wrap(err, "Failed to promote node")
}
@@ -676,6 +684,10 @@ func Promote(state *state.State, gateway *Gateway, nodes []client.NodeInfo) erro
return errors.Wrap(err, "cluster database initialization failed")
}
+ if info.Role == client.Voter {
+ notifyNewVoter(voters, int64(info.ID), gateway.cert)
+ }
+
return nil
}
More information about the lxc-devel
mailing list