[lxc-devel] [lxd/master] Cluster heartbeat format change

tomponline on Github lxc-bot at linuxcontainers.org
Thu Jul 18 10:51:54 UTC 2019


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 1001 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190718/2efc693f/attachment.bin>
-------------- next part --------------
From 4767b74dfdeddf8aa43d61addcab23df66cb37c5 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Thu, 18 Jul 2019 11:32:10 +0100
Subject: [PATCH 1/3] cluster/hearbeat: Adds new Nodes field to heartbeat
 struct

This new field is a map keyed by node address rather than node ID.

This is to avoid issues with raft_nodes and nodes tables not having the same IDs for the same node.

For backwards compatiblity with non-upgraded nodes, the old Members map (keyed by Node ID) is still sent.

Due to the way Go unmarshals JSON, if old nodes that do not have the Nodes field in the heartbeat struct will just ignore it.

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/cluster/gateway.go   | 11 +++++--
 lxd/cluster/heartbeat.go | 67 +++++++++++++++++++++++++---------------
 2 files changed, 50 insertions(+), 28 deletions(-)

diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 78a9264a46..2d048c0112 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -178,10 +178,15 @@ func (g *Gateway) HandlerFuncs(nodeRefreshTask func(*APIHeartbeat)) map[string]h
 			}
 
 			nodes := make([]db.RaftNode, 0)
-			for _, node := range heartbeatData.Members {
-				if node.Raft {
+			for _, node := range heartbeatData.Nodes {
+				if node.RaftID > 0 || node.Raft {
+					// Prefer RaftID over ID (but deal with legacy heartbeats).
+					nodeID := node.RaftID
+					if nodeID == 0 {
+						nodeID = node.ID
+					}
 					nodes = append(nodes, db.RaftNode{
-						ID:      node.ID,
+						ID:      nodeID,
 						Address: node.Address,
 					})
 				}
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index 45bca1d908..3eda81eecd 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -19,12 +19,13 @@ import (
 
 // APIHeartbeatMember contains specific cluster node info.
 type APIHeartbeatMember struct {
-	ID            int64
-	Address       string
-	Raft          bool
-	LastHeartbeat time.Time
-	Online        bool // Calculated from offline threshold and LastHeatbeat time.
-	updated       bool // Has node been updated during this heartbeat run. Not sent to nodes.
+	ID            int64     // ID field value in nodes table.
+	Address       string    // Host and Port of node.
+	RaftID        int64     // ID field value in raft_nodes table, zero if non-raft node.
+	Raft          bool      // Deprecated, use non-zero RaftID instead to indicate raft node.
+	LastHeartbeat time.Time // Last time we received a successful response from node.
+	Online        bool      // Calculated from offline threshold and LastHeatbeat time.
+	updated       bool      // Has node been updated during this heartbeat run. Not sent to nodes.
 }
 
 // APIHeartbeatVersion contains max versions for all nodes in cluster.
@@ -35,8 +36,9 @@ type APIHeartbeatVersion struct {
 
 // APIHeartbeat contains data sent to nodes in heartbeat.
 type APIHeartbeat struct {
-	sync.Mutex // Used to control access to Members maps.
-	Members    map[int64]APIHeartbeatMember
+	sync.Mutex                              // Used to control access to Members maps.
+	Members    map[int64]APIHeartbeatMember // Deprecated, use Nodes instead.
+	Nodes      map[string]APIHeartbeatMember
 	Version    APIHeartbeatVersion
 	Time       time.Time
 
@@ -52,33 +54,32 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, raftNodes []db.RaftNode,
 	var maxSchemaVersion, maxAPIExtensionsVersion int
 	hbState.Time = time.Now()
 
-	if hbState.Members == nil {
-		hbState.Members = make(map[int64]APIHeartbeatMember)
+	if hbState.Nodes == nil {
+		hbState.Nodes = make(map[string]APIHeartbeatMember)
 	}
 
 	// If we've been supplied a fresh set of node states, this is a full state list.
 	hbState.FullStateList = fullStateList
 
-	// Add raft nodes first with the raft flag set to true, but missing LastHeartbeat time.
+	// Add raft nodes first with the raft ID and flag set to true, but missing LastHeartbeat time.
 	for _, node := range raftNodes {
-		member, exists := hbState.Members[node.ID]
+		member, exists := hbState.Nodes[node.Address]
 		if !exists {
 			member = APIHeartbeatMember{
-				ID:      node.ID,
 				Address: node.Address,
 			}
 		}
 
+		member.RaftID = node.ID
 		member.Raft = true
-		hbState.Members[node.ID] = member
+		hbState.Nodes[node.Address] = member
 	}
 
 	// Add remaining nodes, and when if existing node is found, update status.
 	for _, node := range allNodes {
-		member, exists := hbState.Members[node.ID]
+		member, exists := hbState.Nodes[node.Address]
 		if !exists {
 			member = APIHeartbeatMember{
-				ID:      node.ID,
 				Address: node.Address,
 			}
 		}
@@ -87,8 +88,9 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, raftNodes []db.RaftNode,
 			member.LastHeartbeat = node.Heartbeat
 		}
 
+		member.ID = node.ID
 		member.Online = !member.LastHeartbeat.Before(time.Now().Add(-offlineThreshold))
-		hbState.Members[node.ID] = member
+		hbState.Nodes[node.Address] = member
 
 		// Keep a record of highest APIExtensions and Schema version seen in all nodes.
 		if node.APIExtensions > maxAPIExtensionsVersion {
@@ -105,13 +107,28 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, raftNodes []db.RaftNode,
 		APIExtensions: maxAPIExtensionsVersion,
 	}
 
+	// Convert the Nodes map to a legacy Members map for non-upgraded nodes.
+	// The legacy format is keyed by node ID, but this didn't work well as the IDs in the nodes
+	// and raft_nodes tables are not guaranteed to be the same.
+	hbState.Members = make(map[int64]APIHeartbeatMember)
+
+	for _, node := range hbState.Nodes {
+		// Check if node is raft node, if so we send the Raft ID as Node ID for consistency
+		// with raft_nodes table on receiving side.
+		if node.RaftID > 0 {
+			node.ID = node.RaftID
+		}
+
+		hbState.Members[node.ID] = node
+	}
+
 	return
 }
 
 // Send sends heartbeat requests to the nodes supplied and updates heartbeat state.
 func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, localAddress string, nodes []db.NodeInfo, delay bool) {
 	heartbeatsWg := sync.WaitGroup{}
-	sendHeartbeat := func(nodeID int64, address string, delay bool, heartbeatData *APIHeartbeat) {
+	sendHeartbeat := func(address string, delay bool, heartbeatData *APIHeartbeat) {
 		defer heartbeatsWg.Done()
 
 		if delay {
@@ -125,7 +142,7 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, lo
 		if err == nil {
 			hbState.Lock()
 			// Ensure only update nodes that exist in Members already.
-			hbNode, existing := hbState.Members[nodeID]
+			hbNode, existing := hbState.Nodes[address]
 			if !existing {
 				return
 			}
@@ -133,7 +150,7 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, lo
 			hbNode.LastHeartbeat = time.Now()
 			hbNode.Online = true
 			hbNode.updated = true
-			hbState.Members[nodeID] = hbNode
+			hbState.Nodes[address] = hbNode
 			hbState.Unlock()
 			logger.Debugf("Successful heartbeat for %s", address)
 		} else {
@@ -145,18 +162,18 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, lo
 		// Special case for the local node - just record the time now.
 		if node.Address == localAddress {
 			hbState.Lock()
-			hbNode := hbState.Members[node.ID]
+			hbNode := hbState.Nodes[node.Address]
 			hbNode.LastHeartbeat = time.Now()
 			hbNode.Online = true
 			hbNode.updated = true
-			hbState.Members[node.ID] = hbNode
+			hbState.Nodes[node.Address] = hbNode
 			hbState.Unlock()
 			continue
 		}
 
 		// Parallelize the rest.
 		heartbeatsWg.Add(1)
-		go sendHeartbeat(node.ID, node.Address, delay, hbState)
+		go sendHeartbeat(node.Address, delay, hbState)
 	}
 	heartbeatsWg.Wait()
 }
@@ -281,7 +298,7 @@ func (g *Gateway) heartbeat(ctx context.Context, initialHeartbeat bool) {
 	for _, currentNode := range currentNodes {
 		existing := false
 		for _, node := range allNodes {
-			if node.Address == currentNode.Address && node.ID == currentNode.ID {
+			if node.Address == currentNode.Address {
 				existing = true
 				break
 			}
@@ -307,7 +324,7 @@ func (g *Gateway) heartbeat(ctx context.Context, initialHeartbeat bool) {
 	}
 
 	err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
-		for _, node := range hbState.Members {
+		for _, node := range hbState.Nodes {
 			if !node.updated {
 				continue
 			}

From c2084c8b3b6a04df2762929eed709f1a2e0b080c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Thu, 18 Jul 2019 04:37:28 -0400
Subject: [PATCH 2/3] lxd/cluster/membership: Fix new DB server id
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This effectively reverts commit 1b52ef853533efb7e0837a1aae731f925043617b.

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/cluster/membership.go | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 671cd4a886..8a1b186c1f 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -468,7 +468,6 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
 
 	// Check if we have a spare node that we can turn into a database one.
 	address := ""
-	id := int64(-1)
 	err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
 		config, err := ConfigLoad(tx)
 		if err != nil {
@@ -489,7 +488,6 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
 			logger.Debugf(
 				"Found spare node %s (%s) to be promoted as database node", node.Name, node.Address)
 			address = node.Address
-			id = node.ID
 			break
 		}
 
@@ -504,9 +502,20 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err
 		return "", currentRaftNodes, nil
 	}
 
-	// Update the local raft_table adding the new member and building a new
-	// list.
-	updatedRaftNodes := append(currentRaftNodes, db.RaftNode{ID: id, Address: address})
+	// Figure out the next ID in the raft_nodes table
+	var updatedRaftNodes []db.RaftNode
+	err = gateway.db.Transaction(func(tx *db.NodeTx) error {
+		id, err := tx.RaftNodeAdd(address)
+		if err != nil {
+			return errors.Wrap(err, "Failed to add new raft node")
+		}
+
+		updatedRaftNodes = append(currentRaftNodes, db.RaftNode{ID: id, Address: address})
+		return nil
+	})
+	if err != nil {
+		return "", nil, err
+	}
 
 	return address, updatedRaftNodes, nil
 }

From db8f8728c4209d2d440eb904faf84c684d2c94f0 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Thu, 18 Jul 2019 11:48:13 +0100
Subject: [PATCH 3/3] daemon: Updates use of heartbeat data Members field to
 Nodes field

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/daemon.go   | 12 ++++++------
 lxd/networks.go |  2 +-
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/lxd/daemon.go b/lxd/daemon.go
index 6f6e1ef9fd..e494ee39dc 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -1330,13 +1330,13 @@ func (d *Daemon) hasNodeListChanged(heartbeatData *cluster.APIHeartbeat) bool {
 	}
 
 	// Member count has changed.
-	if len(d.lastNodeList.Members) != len(heartbeatData.Members) {
+	if len(d.lastNodeList.Nodes) != len(heartbeatData.Nodes) {
 		return true
 	}
 
-	// Check for node address changes.
-	for lastMemberID, lastMember := range d.lastNodeList.Members {
-		if heartbeatData.Members[lastMemberID].Address != lastMember.Address {
+	// Check for node ID changes.
+	for lastMemberAddress, lastMember := range d.lastNodeList.Nodes {
+		if heartbeatData.Nodes[lastMemberAddress].ID != lastMember.ID {
 			return true
 		}
 	}
@@ -1358,12 +1358,12 @@ func (d *Daemon) NodeRefreshTask(heartbeatData *cluster.APIHeartbeat) {
 
 	// Only refresh forkdns peers if the full state list has been generated.
 	if heartbeatData.FullStateList {
-		for i, node := range heartbeatData.Members {
+		for i, node := range heartbeatData.Nodes {
 			// Exclude nodes that the leader considers offline.
 			// This is to avoid forkdns delaying results by querying an offline node.
 			if !node.Online {
 				logger.Warnf("Excluding offline node from refresh: %+v", node)
-				delete(heartbeatData.Members, i)
+				delete(heartbeatData.Nodes, i)
 			}
 		}
 
diff --git a/lxd/networks.go b/lxd/networks.go
index 9fa46146c5..7dc1de70b4 100644
--- a/lxd/networks.go
+++ b/lxd/networks.go
@@ -2147,7 +2147,7 @@ func (n *network) refreshForkdnsServerAddresses(heartbeatData *cluster.APIHeartb
 	logger.Infof("Refreshing forkdns peers for %v", n.name)
 
 	cert := n.state.Endpoints.NetworkCert()
-	for _, node := range heartbeatData.Members {
+	for _, node := range heartbeatData.Nodes {
 		if node.Address == localAddress {
 			// No need to query ourselves.
 			continue


More information about the lxc-devel mailing list