[lxc-devel] [lxd/master] Add clustering roles

stgraber on Github lxc-bot at linuxcontainers.org
Thu Sep 26 21:24:49 UTC 2019


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190926/83c4501a/attachment.bin>
-------------- next part --------------
From adb80a78d309599789a95f8eeb1e1fc1fd9dd94f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Tue, 24 Sep 2019 18:34:15 -0400
Subject: [PATCH 1/8] api: Add clustering_roles extension
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 doc/api-extensions.md | 4 ++++
 shared/version/api.go | 1 +
 2 files changed, 5 insertions(+)

diff --git a/doc/api-extensions.md b/doc/api-extensions.md
index 5708222afe..b67824ee19 100644
--- a/doc/api-extensions.md
+++ b/doc/api-extensions.md
@@ -838,3 +838,7 @@ Extends the disk resource API struct to include:
  - Block size
  - Firmware version
  - Serial number
+
+## clustering\_roles
+This adds a new `roles` attribute to cluster entries, exposing a list of
+roles that the member serves in the cluster.
diff --git a/shared/version/api.go b/shared/version/api.go
index d6d182821d..ac9350f740 100644
--- a/shared/version/api.go
+++ b/shared/version/api.go
@@ -167,6 +167,7 @@ var APIExtensions = []string{
 	"instances",
 	"image_types",
 	"resources_disk_sata",
+	"clustering_roles",
 }
 
 // APIExtensionsCount returns the number of available API extensions.

From d060931c1a245d75b47fbe3e125e733b21387722 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Tue, 24 Sep 2019 18:34:31 -0400
Subject: [PATCH 2/8] shared/api: Add clustering roles
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 shared/api/cluster.go | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/shared/api/cluster.go b/shared/api/cluster.go
index 9f707ed7d8..c773929c8d 100644
--- a/shared/api/cluster.go
+++ b/shared/api/cluster.go
@@ -57,4 +57,7 @@ type ClusterMember struct {
 	Database   bool   `json:"database" yaml:"database"`
 	Status     string `json:"status" yaml:"status"`
 	Message    string `json:"message" yaml:"message"`
+
+	// API extension: clustering_roles
+	Roles []string `json:"roles" yaml:"roles"`
 }

From 96e9d1c2ee868415b6150fb71a5a0edcfc8c6d9e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Tue, 24 Sep 2019 15:18:55 -0400
Subject: [PATCH 3/8] lxd/db: Add nodes_roles table
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/db/cluster/schema.go |  8 +++++++-
 lxd/db/cluster/update.go | 15 +++++++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/lxd/db/cluster/schema.go b/lxd/db/cluster/schema.go
index 78534e25b6..33fe82b2e0 100644
--- a/lxd/db/cluster/schema.go
+++ b/lxd/db/cluster/schema.go
@@ -306,6 +306,12 @@ CREATE TABLE nodes (
     UNIQUE (name),
     UNIQUE (address)
 );
+CREATE TABLE nodes_roles (
+    node_id INTEGER NOT NULL,
+    role INTEGER NOT NULL,
+    FOREIGN KEY (node_id) REFERENCES nodes (id) ON DELETE CASCADE,
+    UNIQUE (node_id, role)
+);
 CREATE TABLE "operations" (
     id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
     uuid TEXT NOT NULL,
@@ -481,5 +487,5 @@ CREATE TABLE storage_volumes_config (
     FOREIGN KEY (storage_volume_id) REFERENCES storage_volumes (id) ON DELETE CASCADE
 );
 
-INSERT INTO schema (version, updated_at) VALUES (17, strftime("%s"))
+INSERT INTO schema (version, updated_at) VALUES (18, strftime("%s"))
 `
diff --git a/lxd/db/cluster/update.go b/lxd/db/cluster/update.go
index 67e9a1ca20..461672ad67 100644
--- a/lxd/db/cluster/update.go
+++ b/lxd/db/cluster/update.go
@@ -52,6 +52,21 @@ var updates = map[int]schema.Update{
 	15: updateFromV14,
 	16: updateFromV15,
 	17: updateFromV16,
+	18: updateFromV17,
+}
+
+// Add nodes_roles table
+func updateFromV17(tx *sql.Tx) error {
+	stmts := `
+CREATE TABLE nodes_roles (
+    node_id INTEGER NOT NULL,
+    role INTEGER NOT NULL,
+    FOREIGN KEY (node_id) REFERENCES nodes (id) ON DELETE CASCADE,
+    UNIQUE (node_id, role)
+);
+`
+	_, err := tx.Exec(stmts)
+	return err
 }
 
 // Add image type column

From f935ae8093f15044e2cda99710e864dda2a2553e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Tue, 24 Sep 2019 18:34:50 -0400
Subject: [PATCH 4/8] lxd/db: Add support for clustering roles
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Closes #6172

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/cluster/membership.go |  1 +
 lxd/db/node.go            | 55 +++++++++++++++++++++++++++++++++++++--
 2 files changed, 54 insertions(+), 2 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 98744d8311..bf9fceffab 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -798,6 +798,7 @@ func List(state *state.State) ([]api.ClusterMember, error) {
 		result[i].ServerName = node.Name
 		result[i].URL = fmt.Sprintf("https://%s", node.Address)
 		result[i].Database = shared.StringInSlice(node.Address, addresses)
+		result[i].Roles = node.Roles
 		if node.IsOffline(offlineThreshold) {
 			result[i].Status = "Offline"
 			result[i].Message = fmt.Sprintf(
diff --git a/lxd/db/node.go b/lxd/db/node.go
index 563b158d4a..c01fe0ff32 100644
--- a/lxd/db/node.go
+++ b/lxd/db/node.go
@@ -13,6 +13,11 @@ import (
 	"github.com/pkg/errors"
 )
 
+// ClusterRoles maps role ids into human-readable names.
+var ClusterRoles = map[int]string{
+	0: "database",
+}
+
 // NodeInfo holds information about a single LXD instance in a cluster.
 type NodeInfo struct {
 	ID            int64     // Stable node identifier
@@ -22,6 +27,7 @@ type NodeInfo struct {
 	Schema        int       // Schema version of the LXD code running the node
 	APIExtensions int       // Number of API extensions of the LXD code running on the node
 	Heartbeat     time.Time // Timestamp of the last heartbeat
+	Roles         []string  // List of cluster roles
 }
 
 // IsOffline returns true if the last successful heartbeat time of the node is
@@ -207,6 +213,39 @@ func (c *ClusterTx) NodeRename(old, new string) error {
 
 // Nodes returns all LXD nodes part of the cluster.
 func (c *ClusterTx) nodes(pending bool, where string, args ...interface{}) ([]NodeInfo, error) {
+	// Get node roles
+	sql := "SELECT node_id, role FROM nodes_roles;"
+
+	rows, err := c.tx.Query(sql)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	nodeRoles := map[int64][]string{}
+	for i := 0; rows.Next(); i++ {
+		var nodeID int64
+		var role int
+		err := rows.Scan(&nodeID, &role)
+		if err != nil {
+			return nil, err
+		}
+
+		if nodeRoles[nodeID] == nil {
+			nodeRoles[nodeID] = []string{}
+		}
+
+		roleName := ClusterRoles[role]
+
+		nodeRoles[nodeID] = append(nodeRoles[nodeID], roleName)
+	}
+
+	err = rows.Err()
+	if err != nil {
+		return nil, err
+	}
+
+	// Process node entries
 	nodes := []NodeInfo{}
 	dest := func(i int) []interface{} {
 		nodes = append(nodes, NodeInfo{})
@@ -225,21 +264,33 @@ func (c *ClusterTx) nodes(pending bool, where string, args ...interface{}) ([]No
 	} else {
 		args = append([]interface{}{0}, args...)
 	}
-	sql := `
-SELECT id, name, address, description, schema, api_extensions, heartbeat FROM nodes WHERE pending=? `
+
+	// Get the node entries
+	sql = "SELECT id, name, address, description, schema, api_extensions, heartbeat FROM nodes WHERE pending=?"
 	if where != "" {
 		sql += fmt.Sprintf("AND %s ", where)
 	}
 	sql += "ORDER BY id"
+
 	stmt, err := c.tx.Prepare(sql)
 	if err != nil {
 		return nil, err
 	}
 	defer stmt.Close()
+
 	err = query.SelectObjects(stmt, dest, args...)
 	if err != nil {
 		return nil, errors.Wrap(err, "Failed to fetch nodes")
 	}
+
+	// Add the roles
+	for i, node := range nodes {
+		roles, ok := nodeRoles[node.ID]
+		if ok {
+			nodes[i].Roles = roles
+		}
+	}
+
 	return nodes, nil
 }
 

From 4617374ef2139e0ccd98154a08363f63ff7b0117 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 25 Sep 2019 15:26:11 -0400
Subject: [PATCH 5/8] lxd/db: Add NodeAddRole
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/db/node.go | 24 ++++++++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git a/lxd/db/node.go b/lxd/db/node.go
index c01fe0ff32..71fe7ae925 100644
--- a/lxd/db/node.go
+++ b/lxd/db/node.go
@@ -339,6 +339,30 @@ func (c *ClusterTx) NodeUpdate(id int64, name string, address string) error {
 	return nil
 }
 
+// NodeAddRole adds a role to the node.
+func (c *ClusterTx) NodeAddRole(id int64, role string) error {
+	// Translate role names to ids
+	roleID := -1
+	for k, v := range ClusterRoles {
+		if v == role {
+			roleID = k
+			break
+		}
+	}
+
+	if roleID < 0 {
+		return fmt.Errorf("Invalid role: %v", role)
+	}
+
+	// Update the database record
+	_, err := c.tx.Exec("INSERT INTO nodes_roles (node_id, role) VALUES (?, ?)", id, roleID)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
 // NodeRemove removes the node with the given id.
 func (c *ClusterTx) NodeRemove(id int64) error {
 	result, err := c.tx.Exec("DELETE FROM nodes WHERE id=?", id)

From dcc668491c9b85549c7ac9bb6ca49a7b2783a8d4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 25 Sep 2019 18:57:44 -0400
Subject: [PATCH 6/8] lxd/cluster: Switch to using roles
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/cluster/membership.go | 19 +++----------------
 1 file changed, 3 insertions(+), 16 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index bf9fceffab..863ae313c8 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -757,21 +757,7 @@ func Purge(cluster *db.Cluster, name string) error {
 
 // List the nodes of the cluster.
 func List(state *state.State) ([]api.ClusterMember, error) {
-	addresses := []string{} // Addresses of database nodes
-	err := state.Node.Transaction(func(tx *db.NodeTx) error {
-		nodes, err := tx.RaftNodes()
-		if err != nil {
-			return errors.Wrap(err, "failed to fetch current raft nodes")
-		}
-		for _, node := range nodes {
-			addresses = append(addresses, node.Address)
-		}
-		return nil
-	})
-	if err != nil {
-		return nil, err
-	}
-
+	var err error
 	var nodes []db.NodeInfo
 	var offlineThreshold time.Duration
 
@@ -780,6 +766,7 @@ func List(state *state.State) ([]api.ClusterMember, error) {
 		if err != nil {
 			return err
 		}
+
 		offlineThreshold, err = tx.NodeOfflineThreshold()
 		if err != nil {
 			return err
@@ -797,7 +784,7 @@ func List(state *state.State) ([]api.ClusterMember, error) {
 	for i, node := range nodes {
 		result[i].ServerName = node.Name
 		result[i].URL = fmt.Sprintf("https://%s", node.Address)
-		result[i].Database = shared.StringInSlice(node.Address, addresses)
+		result[i].Database = shared.StringInSlice("database", node.Roles)
 		result[i].Roles = node.Roles
 		if node.IsOffline(offlineThreshold) {
 			result[i].Status = "Offline"

From 28677e41c6f17c16a42b69be01b0dc58d68e67a7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Thu, 26 Sep 2019 16:52:47 -0400
Subject: [PATCH 7/8] lxc/cluster: Update roles on join/promote
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/cluster/membership.go | 30 ++++++++++++++++++++++++------
 1 file changed, 24 insertions(+), 6 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 863ae313c8..cf2413ca91 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -80,6 +80,12 @@ func Bootstrap(state *state.State, gateway *Gateway, name string) error {
 			return errors.Wrap(err, "failed to update cluster node")
 		}
 
+		// Update our role list.
+		err = tx.NodeAddRole(1, "database")
+		if err != nil {
+			return errors.Wrapf(err, "Failed to add database role for the node")
+		}
+
 		return nil
 	})
 	if err != nil {
@@ -432,6 +438,14 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
 			return errors.Wrapf(err, "failed to unmark the node as pending")
 		}
 
+		// Update our role list if needed.
+		if id != "" {
+			err = tx.NodeAddRole(node.ID, "database")
+			if err != nil {
+				return errors.Wrapf(err, "Failed to add database role for the node")
+			}
+		}
+
 		// Generate partial heartbeat request containing just a raft node list.
 		hbState := &APIHeartbeat{}
 		hbState.Update(false, raftNodes, []db.NodeInfo{}, offlineThreshold)
@@ -560,11 +574,11 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
 
 	// Figure out our raft node ID, and an existing target raft node that
 	// we'll contact to add ourselves as member.
-	id := ""
+	id := int64(-1)
 	target := ""
 	for _, node := range nodes {
 		if node.Address == address {
-			id = strconv.Itoa(int(node.ID))
+			id = node.ID
 		} else {
 			target = node.Address
 		}
@@ -572,7 +586,7 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
 
 	// Sanity check that our address was actually included in the given
 	// list of raft nodes.
-	if id == "" {
+	if id == -1 {
 		return fmt.Errorf("this node is not included in the given list of database nodes")
 	}
 
@@ -625,20 +639,24 @@ func Promote(state *state.State, gateway *Gateway, nodes []db.RaftNode) error {
 		return errors.Wrap(err, "Failed to connect to cluster leader")
 	}
 	defer client.Close()
+
 	err = client.Add(ctx, gateway.raft.info)
 	if err != nil {
 		return errors.Wrap(err, "Failed to join cluster")
 	}
 
-	// Unlock regular access to our cluster database, and make sure our
-	// gateway still works correctly.
+	// Unlock regular access to our cluster database and add the database role.
 	err = state.Cluster.ExitExclusive(func(tx *db.ClusterTx) error {
-		_, err := tx.Nodes()
+		err = tx.NodeAddRole(id, "database")
+		if err != nil {
+			return errors.Wrapf(err, "Failed to add database role for the node")
+		}
 		return err
 	})
 	if err != nil {
 		return errors.Wrap(err, "cluster database initialization failed")
 	}
+
 	return nil
 }
 

From 88579beac1d5687c8115f3741883b645626442a4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Thu, 26 Sep 2019 17:03:07 -0400
Subject: [PATCH 8/8] lxd/patches: Add database role
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/patches.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 48 insertions(+)

diff --git a/lxd/patches.go b/lxd/patches.go
index e5253902b4..28562e3411 100644
--- a/lxd/patches.go
+++ b/lxd/patches.go
@@ -72,6 +72,7 @@ var patches = []patch{
 	{name: "storage_api_rename_container_snapshots_dir_again", run: patchStorageApiRenameContainerSnapshotsDir},
 	{name: "storage_api_rename_container_snapshots_links_again", run: patchStorageApiUpdateContainerSnapshots},
 	{name: "storage_api_rename_container_snapshots_dir_again_again", run: patchStorageApiRenameContainerSnapshotsDir},
+	{name: "clustering_add_roles", run: patchClusteringAddRoles},
 }
 
 type patch struct {
@@ -3335,6 +3336,53 @@ func patchStorageApiUpdateContainerSnapshots(name string, d *Daemon) error {
 	return nil
 }
 
+func patchClusteringAddRoles(name string, d *Daemon) error {
+	addresses := []string{}
+	err := d.State().Node.Transaction(func(tx *db.NodeTx) error {
+		nodes, err := tx.RaftNodes()
+		if err != nil {
+			return errors.Wrap(err, "Failed to fetch current raft nodes")
+		}
+
+		for _, node := range nodes {
+			addresses = append(addresses, node.Address)
+		}
+
+		return nil
+	})
+	if err != nil {
+		return err
+	}
+
+	var nodes []db.NodeInfo
+	err = d.State().Cluster.Transaction(func(tx *db.ClusterTx) error {
+		nodes, err = tx.Nodes()
+		if err != nil {
+			return err
+		}
+
+		for _, node := range nodes {
+			if node.Address == "0.0.0.0" {
+				continue
+			}
+
+			if shared.StringInSlice(node.Address, addresses) && !shared.StringInSlice("database", node.Roles) {
+				err = tx.NodeAddRole(node.ID, "database")
+				if err != nil {
+					return err
+				}
+			}
+		}
+
+		return nil
+	})
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
 // Patches end here
 
 // Here are a couple of legacy patches that were originally in


More information about the lxc-devel mailing list