[lxc-devel] [lxd/master] Promote database node upon delete
freeekanayaka on Github
lxc-bot at linuxcontainers.org
Mon Apr 16 13:13:24 UTC 2018
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 429 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180416/ba605737/attachment.bin>
-------------- next part --------------
From 5483c49a90c94e1d262737327729ba1de4ed3b98 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Mon, 16 Apr 2018 10:44:46 +0000
Subject: [PATCH 1/4] Add new cluster.Promote function to turn a non-database
node into a database one
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/api_cluster.go | 2 +-
lxd/cluster/gateway.go | 5 ++
lxd/cluster/membership.go | 159 ++++++++++++++++++++++++++++++++++-------
lxd/cluster/membership_test.go | 89 +++++++++++++++++++++++
lxd/db/cluster/open.go | 2 +-
5 files changed, 230 insertions(+), 27 deletions(-)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 2c1327f6f..8fb21efe9 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -441,7 +441,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) Response {
// First check that the node is clear from containers and images and
// make it leave the database cluster, if it's part of it.
name := mux.Vars(r)["name"]
- address, err := cluster.Leave(d.State(), d.gateway, name, force == 1)
+ address, _, err := cluster.Leave(d.State(), d.gateway, name, force == 1)
if err != nil {
return SmartError(err)
}
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index e0ffce294..07546c170 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -217,6 +217,11 @@ func (g *Gateway) WaitUpgradeNotification() {
<-g.upgradeCh
}
+// IsDatabaseNode returns true if this gateway also run acts a raft database node.
+func (g *Gateway) IsDatabaseNode() bool {
+ return g.raft != nil
+}
+
// Dialer returns a gRPC dial function that can be used to connect to one of
// the dqlite nodes via gRPC.
func (g *Gateway) Dialer() grpcsql.Dialer {
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 33ae69122..b21c87d06 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -7,8 +7,8 @@ import (
"strconv"
"time"
- "github.com/CanonicalLtd/raft-http"
- "github.com/CanonicalLtd/raft-membership"
+ rafthttp "github.com/CanonicalLtd/raft-http"
+ raftmembership "github.com/CanonicalLtd/raft-membership"
"github.com/hashicorp/raft"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/db/cluster"
@@ -432,6 +432,111 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
return nil
}
+// Promote makes a LXD node which is not a database node, become part of the
+// raft cluster.
+func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
+ logger.Info("Promote node to database node")
+
+ // Sanity check that this is not already a database node
+ if gateway.IsDatabaseNode() {
+ return fmt.Errorf("this node is already a database node")
+ }
+
+ // Figure out our own address.
+ address := ""
+ err := state.Cluster.Transaction(func(tx *db.ClusterTx) error {
+ var err error
+ address, err = tx.NodeAddress()
+ if err != nil {
+ return errors.Wrap(err, "failed to fetch the address of this node")
+ }
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+
+ // Sanity check that we actually have an address.
+ if address == "" {
+ return fmt.Errorf("node is not exposed on the network")
+ }
+
+ // Figure out our raft node ID, and an existing target raft node that
+ // we'll contact to add ourselves as member.
+ id := ""
+ target := ""
+ for _, node := range nodes {
+ if node.Address == address {
+ id = strconv.Itoa(int(node.ID))
+ } else {
+ target = node.Address
+ }
+ }
+
+ // Sanity check that our address was actually included in the given
+ // list of raft nodes.
+ if id == "" {
+ return fmt.Errorf("this node is not included in the given list of database nodes")
+ }
+
+ // Replace our local list of raft nodes with the given one (which
+ // includes ourselves). This will make the gateway start a raft node
+ // when restarted.
+ err = state.Node.Transaction(func(tx *db.NodeTx) error {
+ err = tx.RaftNodesReplace(nodes)
+ if err != nil {
+ return errors.Wrap(err, "failed to set raft nodes")
+ }
+
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+
+ // Lock regular access to the cluster database since we don't want any
+ // other database code to run while we're reconfiguring raft.
+ err = state.Cluster.EnterExclusive()
+ if err != nil {
+ return errors.Wrap(err, "failed to acquire cluster database lock")
+ }
+
+ // Wipe all existing raft data, for good measure (perhaps they were
+ // somehow leftover).
+ err = os.RemoveAll(filepath.Join(state.OS.VarDir, "raft"))
+ if err != nil {
+ return errors.Wrap(err, "failed to remove existing raft data")
+ }
+
+ // Re-initialize the gateway. This will create a new raft factory an
+ // dqlite driver instance, which will be exposed over gRPC by the
+ // gateway handlers.
+ err = gateway.init()
+ if err != nil {
+ return errors.Wrap(err, "failed to re-initialize gRPC SQL gateway")
+ }
+
+ logger.Info(
+ "Joining dqlite raft cluster",
+ log15.Ctx{"id": id, "address": address, "target": target})
+ changer := gateway.raft.MembershipChanger()
+ err = changer.Join(raft.ServerID(id), raft.ServerAddress(target), 5*time.Second)
+ if err != nil {
+ return err
+ }
+
+ // Unlock regular access to our cluster database, and make sure our
+ // gateway still works correctly.
+ err = state.Cluster.ExitExclusive(func(tx *db.ClusterTx) error {
+ _, err := tx.Nodes()
+ return err
+ })
+ if err != nil {
+ return errors.Wrap(err, "cluster database initialization failed")
+ }
+ return nil
+}
+
// Leave a cluster.
//
// If the force flag is true, the node will leave even if it still has
@@ -469,19 +574,17 @@ func Leave(state *state.State, gateway *Gateway, name string, force bool) (strin
}
// If the node is a database node, leave the raft cluster too.
- id := ""
- target := ""
+ var raftNodes []db.RaftNode // Current raft nodes
+ raftNodeRemoveIndex := -1 // Index of the raft node to remove, if any.
err = state.Node.Transaction(func(tx *db.NodeTx) error {
- nodes, err := tx.RaftNodes()
+ var err error
+ raftNodes, err = tx.RaftNodes()
if err != nil {
- return err
+ return errors.Wrap(err, "failed to get current database nodes")
}
- for i, node := range nodes {
+ for i, node := range raftNodes {
if node.Address == address {
- id = strconv.Itoa(int(node.ID))
- // Save the address of another database node,
- // we'll use it to leave the raft cluster.
- target = nodes[(i+1)%len(nodes)].Address
+ raftNodeRemoveIndex = i
break
}
}
@@ -491,22 +594,28 @@ func Leave(state *state.State, gateway *Gateway, name string, force bool) (strin
return "", err
}
- if target != "" {
- logger.Info(
- "Remove node from dqlite raft cluster",
- log15.Ctx{"id": id, "address": address, "target": target})
- dial, err := raftDial(gateway.cert)
- if err != nil {
- return "", err
- }
- err = rafthttp.ChangeMembership(
- raftmembership.LeaveRequest, raftEndpoint, dial,
- raft.ServerID(id), address, target, 5*time.Second)
- if err != nil {
- return "", err
- }
+ if raftNodeRemoveIndex == -1 {
+ // The node was not part of the raft cluster, nothing left to
+ // do.
+ return address, nil
}
+ id := strconv.Itoa(int(raftNodes[raftNodeRemoveIndex].ID))
+ // Get the address of another database node,
+ target := raftNodes[(raftNodeRemoveIndex+1)%len(raftNodes)].Address
+ logger.Info(
+ "Remove node from dqlite raft cluster",
+ log15.Ctx{"id": id, "address": address, "target": target})
+ dial, err := raftDial(gateway.cert)
+ if err != nil {
+ return "", err
+ }
+ err = rafthttp.ChangeMembership(
+ raftmembership.LeaveRequest, raftEndpoint, dial,
+ raft.ServerID(id), address, target, 5*time.Second)
+ if err != nil {
+ return "", err
+ }
return address, nil
}
diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index b9907cfdc..5c260841c 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -348,6 +348,83 @@ func TestJoin(t *testing.T) {
assert.Equal(t, 1, count)
}
+func TestPromote(t *testing.T) {
+ // Setup a target node running as leader of a cluster.
+ targetCert := shared.TestingKeyPair()
+ targetMux := http.NewServeMux()
+ targetServer := newServer(targetCert, targetMux)
+ defer targetServer.Close()
+
+ targetState, cleanup := state.NewTestState(t)
+ defer cleanup()
+
+ targetGateway := newGateway(t, targetState.Node, targetCert)
+ defer targetGateway.Shutdown()
+
+ for path, handler := range targetGateway.HandlerFuncs() {
+ targetMux.HandleFunc(path, handler)
+ }
+
+ targetAddress := targetServer.Listener.Addr().String()
+ var err error
+ require.NoError(t, targetState.Cluster.Close())
+ targetState.Cluster, err = db.OpenCluster("db.bin", targetGateway.Dialer(), targetAddress)
+ require.NoError(t, err)
+ targetF := &membershipFixtures{t: t, state: targetState}
+ targetF.NetworkAddress(targetAddress)
+
+ err = cluster.Bootstrap(targetState, targetGateway, "buzz")
+ require.NoError(t, err)
+
+ // Setup a node to be promoted.
+ mux := http.NewServeMux()
+ server := newServer(targetCert, mux) // Use the same cert, as we're already part of the cluster
+ defer server.Close()
+
+ state, cleanup := state.NewTestState(t)
+ defer cleanup()
+
+ address := server.Listener.Addr().String()
+ targetF.ClusterNode(address) // Add the non database node to the cluster database
+ f := &membershipFixtures{t: t, state: state}
+ f.NetworkAddress(address)
+ f.RaftNode(targetAddress) // Insert the leader in our local list of database nodes
+
+ gateway := newGateway(t, state.Node, targetCert)
+ defer gateway.Shutdown()
+
+ for path, handler := range gateway.HandlerFuncs() {
+ mux.HandleFunc(path, handler)
+ }
+
+ state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address)
+ require.NoError(t, err)
+
+ // Promote the node.
+ targetF.RaftNode(address) // Add the address of the node to be promoted in the leader's db
+ raftNodes := targetF.RaftNodes()
+ err = cluster.Promote(state, gateway, raftNodes)
+ require.NoError(t, err)
+
+ // The leader now returns an updated list of raft nodes.
+ raftNodes, err = targetGateway.RaftNodes()
+ require.NoError(t, err)
+ assert.Len(t, raftNodes, 2)
+ assert.Equal(t, int64(1), raftNodes[0].ID)
+ assert.Equal(t, targetAddress, raftNodes[0].Address)
+ assert.Equal(t, int64(2), raftNodes[1].ID)
+ assert.Equal(t, address, raftNodes[1].Address)
+
+ // The List function returns all nodes in the cluster.
+ nodes, err := cluster.List(state)
+ require.NoError(t, err)
+ assert.Len(t, nodes, 2)
+ assert.Equal(t, "Online", nodes[0].Status)
+ assert.Equal(t, "Online", nodes[1].Status)
+ assert.True(t, nodes[0].Database)
+ assert.True(t, nodes[1].Database)
+}
+
// Helper for setting fixtures for Bootstrap tests.
type membershipFixtures struct {
t *testing.T
@@ -374,6 +451,18 @@ func (h *membershipFixtures) RaftNode(address string) {
require.NoError(h.t, err)
}
+// Get the current list of the raft nodes in the raft_nodes table.
+func (h *membershipFixtures) RaftNodes() []db.RaftNode {
+ var nodes []db.RaftNode
+ err := h.state.Node.Transaction(func(tx *db.NodeTx) error {
+ var err error
+ nodes, err = tx.RaftNodes()
+ return err
+ })
+ require.NoError(h.t, err)
+ return nodes
+}
+
// Add the given address to the nodes table of the cluster database.
func (h *membershipFixtures) ClusterNode(address string) {
err := h.state.Cluster.Transaction(func(tx *db.ClusterTx) error {
diff --git a/lxd/db/cluster/open.go b/lxd/db/cluster/open.go
index 0ca5d50af..498e5d923 100644
--- a/lxd/db/cluster/open.go
+++ b/lxd/db/cluster/open.go
@@ -74,7 +74,7 @@ func EnsureSchema(db *sql.DB, address string) (bool, error) {
// Update the schema and api_extension columns of ourselves.
err = updateNodeVersion(tx, address, apiExtensions)
if err != nil {
- return err
+ return errors.Wrap(err, "failed to update node version info")
}
err = checkClusterIsUpgradable(tx, [2]int{len(updates), apiExtensions})
From 4ea558b71cab3f2d0201aed61688fa83a94ac240 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Mon, 16 Apr 2018 12:34:50 +0000
Subject: [PATCH 2/4] Add new cluster.Rebalance function to check if we need to
add database nodes
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/cluster/membership.go | 79 +++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 79 insertions(+)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index b21c87d06..8bea809ad 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -432,6 +432,85 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
return nil
}
+// Rebalance the raft cluster, trying to see if we have a spare online node
+// that we can promote to database node if we are below membershipMaxRaftNodes.
+//
+// If there's such spare node, return its address as well as the new list of
+// raft nodes.
+func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, error) {
+ // First get the current raft members, since this method should be
+ // called after a node has left.
+ currentRaftNodes, err := gateway.currentRaftNodes()
+ if err != nil {
+ return "", nil, errors.Wrap(err, "failed to get current raft nodes")
+ }
+ if len(currentRaftNodes) >= membershipMaxRaftNodes {
+ // We're already at full capacity.
+ return "", nil, nil
+ }
+
+ currentRaftAddresses := make([]string, len(currentRaftNodes))
+ for i, node := range currentRaftNodes {
+ currentRaftAddresses[i] = node.Address
+ }
+
+ // Check if we have a spare node that we can turn into a database one.
+ address := ""
+ err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
+ config, err := ConfigLoad(tx)
+ if err != nil {
+ return errors.Wrap(err, "failed load cluster configuration")
+ }
+ nodes, err := tx.Nodes()
+ if err != nil {
+ return errors.Wrap(err, "failed to get cluster nodes")
+ }
+ // Find a node that is not part of the raft cluster yet.
+ for _, node := range nodes {
+ if shared.StringInSlice(node.Address, currentRaftAddresses) {
+ continue // This is already a database node
+ }
+ if node.IsOffline(config.OfflineThreshold()) {
+ continue // This node is offline
+ }
+ logger.Debugf(
+ "Found spare node %s (%s) to be promoted as database node", node.Name, node.Address)
+ address = node.Address
+ break
+ }
+
+ return nil
+ })
+ if err != nil {
+ return "", nil, err
+ }
+
+ if address == "" {
+ // No node to promote
+ return "", nil, nil
+ }
+
+ // Update the local raft_table adding the new member and building a new
+ // list.
+ updatedRaftNodes := currentRaftNodes
+ err = gateway.db.Transaction(func(tx *db.NodeTx) error {
+ id, err := tx.RaftNodeAdd(address)
+ if err != nil {
+ return errors.Wrap(err, "failed to add new raft node")
+ }
+ updatedRaftNodes = append(updatedRaftNodes, db.RaftNode{ID: id, Address: address})
+ err = tx.RaftNodesReplace(updatedRaftNodes)
+ if err != nil {
+ return errors.Wrap(err, "failed to update raft nodes")
+ }
+ return nil
+ })
+ if err != nil {
+ return "", nil, err
+ }
+ return address, updatedRaftNodes, nil
+}
+
// Promote makes a LXD node which is not a database node, become part of the
// raft cluster.
func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
From 11d1b65a1dc6ed9343b32625f121e730548e181b Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Mon, 16 Apr 2018 12:35:12 +0000
Subject: [PATCH 3/4] Notify the cluster leader after a node removal, so it can
rebalance
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/api_cluster.go | 125 +++++++++++++++++++++++++++++++++++++++++++++++-
lxd/api_cluster_test.go | 35 ++++++++++++++
lxd/api_internal.go | 2 +
3 files changed, 160 insertions(+), 2 deletions(-)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 8fb21efe9..0fd89ed3b 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -438,10 +438,12 @@ func clusterNodeDelete(d *Daemon, r *http.Request) Response {
force = 0
}
+ name := mux.Vars(r)["name"]
+ logger.Debugf("Delete node %s from cluster (force=%d)", name, force)
+
// First check that the node is clear from containers and images and
// make it leave the database cluster, if it's part of it.
- name := mux.Vars(r)["name"]
- address, _, err := cluster.Leave(d.State(), d.gateway, name, force == 1)
+ address, err := cluster.Leave(d.State(), d.gateway, name, force == 1)
if err != nil {
return SmartError(err)
}
@@ -483,6 +485,12 @@ func clusterNodeDelete(d *Daemon, r *http.Request) Response {
if err != nil {
return SmartError(errors.Wrap(err, "failed to remove node from database"))
}
+ // Try to notify the leader.
+ err = tryClusterRebalance(d)
+ if err != nil {
+ // This is not a fatal error, so let's just log it.
+ logger.Errorf("Failed to rebalance cluster: %v", err)
+ }
if force != 1 {
// Try to gracefully reset the database on the node.
@@ -502,6 +510,26 @@ func clusterNodeDelete(d *Daemon, r *http.Request) Response {
return EmptySyncResponse
}
+// This function is used to notify the leader that a node was removed, it will
+// decide whether to promote a new node as database node.
+func tryClusterRebalance(d *Daemon) error {
+ leader, err := d.gateway.LeaderAddress()
+ if err != nil {
+ // This is not a fatal error, so let's just log it.
+ return errors.Wrap(err, "failed to get current leader node")
+ }
+ cert := d.endpoints.NetworkCert()
+ client, err := cluster.Connect(leader, cert, true)
+ if err != nil {
+ return errors.Wrap(err, "failed to connect to leader node")
+ }
+ _, _, err = client.RawQuery("POST", "/internal/cluster/rebalance", nil, "")
+ if err != nil {
+ return errors.Wrap(err, "request to rebalance cluster failed")
+ }
+ return nil
+}
+
var internalClusterAcceptCmd = Command{name: "cluster/accept", post: internalClusterPostAccept}
func internalClusterPostAccept(d *Daemon, r *http.Request) Response {
@@ -586,6 +614,99 @@ type internalRaftNode struct {
Address string `json:"address" yaml:"address"`
}
+var internalClusterRebalanceCmd = Command{name: "cluster/rebalance", post: internalClusterPostRebalance}
+
+// Used to update the cluster after a database node has been removed, and
+// possibly promote another one as database node.
+func internalClusterPostRebalance(d *Daemon, r *http.Request) Response {
+ // Redirect all requests to the leader, which is the one with with
+ // up-to-date knowledge of what nodes are part of the raft cluster.
+ localAddress, err := node.HTTPSAddress(d.db)
+ if err != nil {
+ return SmartError(err)
+ }
+ leader, err := d.gateway.LeaderAddress()
+ if err != nil {
+ return InternalError(err)
+ }
+ if localAddress != leader {
+ logger.Debugf("Redirect cluster rebalance request to %s", leader)
+ url := &url.URL{
+ Scheme: "https",
+ Path: "/internal/cluster/rebalance",
+ Host: leader,
+ }
+ return SyncResponseRedirect(url.String())
+ }
+
+ logger.Debugf("Rebalance cluster")
+
+ // Check if we have a spare node to promote.
+ address, nodes, err := cluster.Rebalance(d.State(), d.gateway)
+ if err != nil {
+ return SmartError(err)
+ }
+ if address == "" {
+ return SyncResponse(true, nil) // Nothing to change
+ }
+
+ // Tell the node to promote itself.
+ post := &internalClusterPostPromoteRequest{}
+ for _, node := range nodes {
+ post.RaftNodes = append(post.RaftNodes, internalRaftNode{
+ ID: node.ID,
+ Address: node.Address,
+ })
+ }
+
+ cert := d.endpoints.NetworkCert()
+ client, err := cluster.Connect(address, cert, false)
+ if err != nil {
+ return SmartError(err)
+ }
+ _, _, err = client.RawQuery("POST", "/internal/cluster/promote", post, "")
+ if err != nil {
+ return SmartError(err)
+ }
+
+ return SyncResponse(true, nil)
+}
+
+var internalClusterPromoteCmd = Command{name: "cluster/promote", post: internalClusterPostPromote}
+
+// Used to promote the local non-database node to be a database one.
+func internalClusterPostPromote(d *Daemon, r *http.Request) Response {
+ req := internalClusterPostPromoteRequest{}
+
+ // Parse the request
+ err := json.NewDecoder(r.Body).Decode(&req)
+ if err != nil {
+ return BadRequest(err)
+ }
+
+ // Sanity checks
+ if len(req.RaftNodes) == 0 {
+ return BadRequest(fmt.Errorf("No raft nodes provided"))
+ }
+
+ nodes := make([]db.RaftNode, len(req.RaftNodes))
+ for i, node := range req.RaftNodes {
+ nodes[i].ID = node.ID
+ nodes[i].Address = node.Address
+ }
+ err = cluster.Promote(d.State(), d.gateway, nodes)
+ if err != nil {
+ return SmartError(err)
+ }
+
+ return SyncResponse(true, nil)
+}
+
+// A request for the /internal/cluster/promote endpoint.
+type internalClusterPostPromoteRequest struct {
+ RaftNodes []internalRaftNode `json:"raft_nodes" yaml:"raft_nodes"`
+}
+
func clusterCheckStoragePoolsMatch(cluster *db.Cluster, reqPools []api.StoragePool) error {
poolNames, err := cluster.StoragePoolsNotPending()
if err != nil && err != db.ErrNoSuchObject {
diff --git a/lxd/api_cluster_test.go b/lxd/api_cluster_test.go
index 569ffb520..4dd76393e 100644
--- a/lxd/api_cluster_test.go
+++ b/lxd/api_cluster_test.go
@@ -294,6 +294,41 @@ func TestCluster_LeaveForce(t *testing.T) {
assert.Equal(t, []string{}, images)
}
+// If a spare non-database node is available after a nodes leaves, it gets
+// promoted as database node.
+func TestCluster_LeaveAndPromote(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping cluster promote test in short mode.")
+ }
+ daemons, cleanup := newDaemons(t, 4)
+ defer cleanup()
+
+ f := clusterFixture{t: t}
+ f.FormCluster(daemons)
+
+ // The first three nodes are database nodes, the fourth is not.
+ client := f.ClientUnix(daemons[0])
+ nodes, err := client.GetClusterMembers()
+ assert.Len(t, nodes, 4)
+ assert.True(t, nodes[0].Database)
+ assert.True(t, nodes[1].Database)
+ assert.True(t, nodes[2].Database)
+ assert.False(t, nodes[3].Database)
+
+ client = f.ClientUnix(daemons[1])
+ err = client.DeleteClusterMember("rusp-0", false)
+ require.NoError(t, err)
+
+ // Only three nodes are left, and they are all database nodes.
+ client = f.ClientUnix(daemons[0])
+ nodes, err = client.GetClusterMembers()
+ require.NoError(t, err)
+ assert.Len(t, nodes, 3)
+ assert.True(t, nodes[0].Database)
+ assert.True(t, nodes[1].Database)
+ assert.True(t, nodes[2].Database)
+}
+
// A LXD node can be renamed.
func TestCluster_NodeRename(t *testing.T) {
daemon, cleanup := newDaemon(t)
diff --git a/lxd/api_internal.go b/lxd/api_internal.go
index 5f234537d..6f0443240 100644
--- a/lxd/api_internal.go
+++ b/lxd/api_internal.go
@@ -30,6 +30,8 @@ var apiInternal = []Command{
internalContainersCmd,
internalSQLCmd,
internalClusterAcceptCmd,
+ internalClusterRebalanceCmd,
+ internalClusterPromoteCmd,
internalClusterContainerMovedCmd,
}
From c67471e0ccb11dcd4a19c4ee031cc1f7396db099 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Mon, 16 Apr 2018 12:56:41 +0000
Subject: [PATCH 4/4] Add integration test
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
test/suites/clustering.sh | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index 579bfb3cd..7ff994672 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -74,39 +74,44 @@ test_clustering_membership() {
LXD_DIR="${LXD_ONE_DIR}" lxc remote set-url cluster https://10.1.1.102:8443
lxc network list cluster: | grep -q "${bridge}"
- # Shutdown a non-database node, and wait a few seconds so it will be
+ # Shutdown a database node, and wait a few seconds so it will be
# detected as down.
LXD_DIR="${LXD_ONE_DIR}" lxc config set cluster.offline_threshold 5
- LXD_DIR="${LXD_FIVE_DIR}" lxd shutdown
+ LXD_DIR="${LXD_THREE_DIR}" lxd shutdown
sleep 10
- LXD_DIR="${LXD_THREE_DIR}" lxc cluster list | grep "node5" | grep -q "OFFLINE"
+ LXD_DIR="${LXD_TWO_DIR}" lxc cluster list | grep "node3" | grep -q "OFFLINE"
LXD_DIR="${LXD_TWO_DIR}" lxc config set cluster.offline_threshold 20
# Trying to delete the preseeded network now fails, because a node is degraded.
! LXD_DIR="${LXD_TWO_DIR}" lxc network delete "${bridge}"
# Force the removal of the degraded node.
- LXD_DIR="${LXD_THREE_DIR}" lxc cluster remove node5 --force
+ LXD_DIR="${LXD_TWO_DIR}" lxc cluster remove node3 --force
+
+ # Sleep a bit to let a heartbeat occur and update the list of raft nodes
+ # everywhere, showing that node 4 has been promoted to database node.
+ sleep 8
+ LXD_DIR="${LXD_TWO_DIR}" lxc cluster list | grep "node4" | grep -q "YES"
# Now the preseeded network can be deleted, and all nodes are
# notified.
LXD_DIR="${LXD_TWO_DIR}" lxc network delete "${bridge}"
# Rename a node using the pre-existing name.
- LXD_DIR="${LXD_THREE_DIR}" lxc cluster rename node4 node5
+ LXD_DIR="${LXD_ONE_DIR}" lxc cluster rename node4 node3
# Trying to delete a container which is the only one with a copy of
# an image results in an error
LXD_DIR="${LXD_FOUR_DIR}" ensure_import_testimage
- ! LXD_DIR="${LXD_FOUR_DIR}" lxc cluster remove node5
+ ! LXD_DIR="${LXD_FOUR_DIR}" lxc cluster remove node3
LXD_DIR="${LXD_TWO_DIR}" lxc image delete testimage
# Remove a node gracefully.
- LXD_DIR="${LXD_FOUR_DIR}" lxc cluster remove node5
+ LXD_DIR="${LXD_FOUR_DIR}" lxc cluster remove node3
! LXD_DIR="${LXD_FOUR_DIR}" lxc cluster list
+ LXD_DIR="${LXD_FIVE_DIR}" lxd shutdown
LXD_DIR="${LXD_FOUR_DIR}" lxd shutdown
- LXD_DIR="${LXD_THREE_DIR}" lxd shutdown
LXD_DIR="${LXD_TWO_DIR}" lxd shutdown
LXD_DIR="${LXD_ONE_DIR}" lxd shutdown
sleep 2
More information about the lxc-devel
mailing list