[lxc-devel] [lxd/master] cluster/heartbeat: Stops using node ID for comparison of nodes
tomponline on Github
lxc-bot at linuxcontainers.org
Thu Jul 18 08:46:57 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 471 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190718/0495fbd3/attachment.bin>
-------------- next part --------------
From 7ee10ecb3b52c744ad596da9b9fde521d2eda365 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Thu, 18 Jul 2019 09:42:52 +0100
Subject: [PATCH] cluster/heartbeat: Stops using node ID for comparison of
nodes
raft_nodes and nodes tables do not share the same IDs and it is not safe to assume they will always match.
Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
lxd/cluster/gateway.go | 4 ++--
lxd/cluster/heartbeat.go | 43 ++++++++++++++++++++--------------------
2 files changed, 23 insertions(+), 24 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 5051dc8174..afa029c5be 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -174,9 +174,9 @@ func (g *Gateway) HandlerFuncs(nodeRefreshTask func(*APIHeartbeat)) map[string]h
nodes := make([]db.RaftNode, 0)
for _, node := range heartbeatData.Members {
- if node.Raft {
+ if node.RaftNodeID > 0 {
nodes = append(nodes, db.RaftNode{
- ID: node.ID,
+ ID: node.RaftNodeID,
Address: node.Address,
})
}
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index d246db668e..f73b4aaa7e 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -19,12 +19,12 @@ import (
// APIHeartbeatMember contains specific cluster node info.
type APIHeartbeatMember struct {
- ID int64
- Address string
- Raft bool
- LastHeartbeat time.Time
- Online bool // Calculated from offline threshold and LastHeatbeat time.
- updated bool // Has node been updated during this heartbeat run. Not sent to nodes.
+ LXDNodeID int64 // The ID field value in the nodes table.
+ RaftNodeID int64 // The ID field value in the raft_nodes table (zero if non-raft node).
+ Address string // The IP address and port of the node.
+ LastHeartbeat time.Time // The last time a positive heartbeat response was received.
+ Online bool // Calculated from offline threshold and LastHeatbeat time.
+ updated bool // Has node been updated during this heartbeat run. Not sent to nodes.
}
// APIHeartbeatVersion contains max versions for all nodes in cluster.
@@ -36,7 +36,7 @@ type APIHeartbeatVersion struct {
// APIHeartbeat contains data sent to nodes in heartbeat.
type APIHeartbeat struct {
sync.Mutex // Used to control access to Members maps.
- Members map[int64]APIHeartbeatMember
+ Members map[string]APIHeartbeatMember
Version APIHeartbeatVersion
Time time.Time
@@ -53,7 +53,7 @@ func (hbState *APIHeartbeat) update(fullStateList bool, raftNodes []db.RaftNode,
hbState.Time = time.Now()
if hbState.Members == nil {
- hbState.Members = make(map[int64]APIHeartbeatMember)
+ hbState.Members = make(map[string]APIHeartbeatMember)
}
// If we've been supplied a fresh set of node states, this is a full state list.
@@ -61,24 +61,22 @@ func (hbState *APIHeartbeat) update(fullStateList bool, raftNodes []db.RaftNode,
// Add raft nodes first with the raft flag set to true, but missing LastHeartbeat time.
for _, node := range raftNodes {
- member, exists := hbState.Members[node.ID]
+ member, exists := hbState.Members[node.Address]
if !exists {
member = APIHeartbeatMember{
- ID: node.ID,
Address: node.Address,
}
}
- member.Raft = true
- hbState.Members[node.ID] = member
+ member.RaftNodeID = node.ID
+ hbState.Members[node.Address] = member
}
// Add remaining nodes, and when if existing node is found, update status.
for _, node := range allNodes {
- member, exists := hbState.Members[node.ID]
+ member, exists := hbState.Members[node.Address]
if !exists {
member = APIHeartbeatMember{
- ID: node.ID,
Address: node.Address,
}
}
@@ -87,8 +85,9 @@ func (hbState *APIHeartbeat) update(fullStateList bool, raftNodes []db.RaftNode,
member.LastHeartbeat = node.Heartbeat
}
+ member.LXDNodeID = node.ID
member.Online = !member.LastHeartbeat.Before(time.Now().Add(-offlineThreshold))
- hbState.Members[node.ID] = member
+ hbState.Members[node.Address] = member
// Keep a record of highest APIExtensions and Schema version seen in all nodes.
if node.APIExtensions > maxAPIExtensionsVersion {
@@ -111,7 +110,7 @@ func (hbState *APIHeartbeat) update(fullStateList bool, raftNodes []db.RaftNode,
// sends heartbeat requests to the nodes supplied and updates heartbeat state.
func (hbState *APIHeartbeat) send(ctx context.Context, cert *shared.CertInfo, localAddress string, nodes []db.NodeInfo, delay bool) {
heartbeatsWg := sync.WaitGroup{}
- sendHeartbeat := func(nodeID int64, address string, delay bool, heartbeatData *APIHeartbeat) {
+ sendHeartbeat := func(address string, delay bool, heartbeatData *APIHeartbeat) {
defer heartbeatsWg.Done()
if delay {
@@ -125,7 +124,7 @@ func (hbState *APIHeartbeat) send(ctx context.Context, cert *shared.CertInfo, lo
if err == nil {
hbState.Lock()
// Ensure only update nodes that exist in Members already.
- hbNode, existing := hbState.Members[nodeID]
+ hbNode, existing := hbState.Members[address]
if !existing {
return
}
@@ -133,7 +132,7 @@ func (hbState *APIHeartbeat) send(ctx context.Context, cert *shared.CertInfo, lo
hbNode.LastHeartbeat = time.Now()
hbNode.Online = true
hbNode.updated = true
- hbState.Members[nodeID] = hbNode
+ hbState.Members[address] = hbNode
hbState.Unlock()
logger.Debugf("Successful heartbeat for %s", address)
} else {
@@ -145,18 +144,18 @@ func (hbState *APIHeartbeat) send(ctx context.Context, cert *shared.CertInfo, lo
// Special case for the local node - just record the time now.
if node.Address == localAddress {
hbState.Lock()
- hbNode := hbState.Members[node.ID]
+ hbNode := hbState.Members[node.Address]
hbNode.LastHeartbeat = time.Now()
hbNode.Online = true
hbNode.updated = true
- hbState.Members[node.ID] = hbNode
+ hbState.Members[node.Address] = hbNode
hbState.Unlock()
continue
}
// Parallelize the rest.
heartbeatsWg.Add(1)
- go sendHeartbeat(node.ID, node.Address, delay, hbState)
+ go sendHeartbeat(node.Address, delay, hbState)
}
heartbeatsWg.Wait()
}
@@ -264,7 +263,7 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster, nodeRefreshTask func(*APIH
for _, currentNode := range currentNodes {
existing := false
for _, node := range allNodes {
- if node.ID == currentNode.ID {
+ if node.Address == currentNode.Address {
existing = true
break
}
More information about the lxc-devel
mailing list