[lxc-devel] [lxd/master] Use cluster heartbeats to notify of needed LXD update

tomponline on Github lxc-bot at linuxcontainers.org
Mon Jul 8 14:16:28 UTC 2019


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190708/99114365/attachment.bin>
-------------- next part --------------
From dc2ad1a612a83e66617d0c2c1c0353c760336a00 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Mon, 8 Jul 2019 15:14:32 +0100
Subject: [PATCH 1/3] cluster/heartbeat: Adds new heartbeat request format

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/cluster/heartbeat.go | 53 +++++++++++++++++++++++++++++++++++++---
 1 file changed, 50 insertions(+), 3 deletions(-)

diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index 2833bc92b2..15ea30f0ac 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -17,6 +17,23 @@ import (
 	"golang.org/x/net/context"
 )
 
+type apiHeartbeatMember struct {
+	ID      int64
+	Address string
+	Raft    bool
+	Online  bool
+}
+
+type apiHeartbeatVersion struct {
+	Schema        int64
+	APIExtensions int64
+}
+
+type apiHeartbeat struct {
+	Members map[int64]apiHeartbeatMember
+	Version apiHeartbeatVersion
+}
+
 // Heartbeat returns a task function that performs leader-initiated heartbeat
 // checks against all LXD nodes in the cluster.
 //
@@ -77,6 +94,35 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
 		heartbeatsLock := sync.Mutex{}
 		heartbeatsWg := sync.WaitGroup{}
 
+		// Craft a heartbeat request.
+		heartbeatData := apiHeartbeat{
+			Members: make(map[int64]apiHeartbeatMember),
+		}
+
+		for _, node := range raftNodes {
+			member := apiHeartbeatMember{
+				ID:      node.ID,
+				Address: node.Address,
+				Raft:    true,
+				Online:  false, //TODO
+			}
+			heartbeatData.Members[node.ID] = member
+		}
+
+		for _, node := range nodes {
+			if _, exists := heartbeatData.Members[node.ID]; exists {
+				continue
+			}
+
+			member := apiHeartbeatMember{
+				ID:      node.ID,
+				Address: node.Address,
+				Raft:    false,
+				Online:  false, //TODO
+			}
+			heartbeatData.Members[node.ID] = member
+		}
+
 		sendHeartbeat := func(id int64, address string, delay bool) {
 			defer heartbeatsWg.Done()
 
@@ -86,7 +132,7 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
 			}
 			logger.Debugf("Sending heartbeat to %s", address)
 
-			err := heartbeatNode(ctx, address, gateway.cert, raftNodes)
+			err := heartbeatNode(ctx, address, gateway.cert, &heartbeatData)
 			if err == nil {
 				heartbeatsLock.Lock()
 				heartbeats[id] = true
@@ -193,7 +239,7 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
 const heartbeatInterval = 10
 
 // Perform a single heartbeat request against the node with the given address.
-func heartbeatNode(taskCtx context.Context, address string, cert *shared.CertInfo, raftNodes []db.RaftNode) error {
+func heartbeatNode(taskCtx context.Context, address string, cert *shared.CertInfo, heartbeatData *apiHeartbeat) error {
 	logger.Debugf("Sending heartbeat request to %s", address)
 
 	config, err := tlsClientConfig(cert)
@@ -204,7 +250,8 @@ func heartbeatNode(taskCtx context.Context, address string, cert *shared.CertInf
 	client := &http.Client{Transport: &http.Transport{TLSClientConfig: config}}
 
 	buffer := bytes.Buffer{}
-	err = json.NewEncoder(&buffer).Encode(raftNodes)
+	logger.Errorf("tomp heartbeatNode: %+v", heartbeatData)
+	err = json.NewEncoder(&buffer).Encode(heartbeatData)
 	if err != nil {
 		return err
 	}

From b5e487f4bac2823c8f1fba9dbc0a42ce3164b117 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Mon, 8 Jul 2019 15:15:09 +0100
Subject: [PATCH 2/3] cluster/membership: Updates Join to send new heartbeat
 format

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/cluster/membership.go | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 9458682d69..1ca28f8df9 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -415,9 +415,24 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
 			return errors.Wrapf(err, "failed to unmark the node as pending")
 		}
 
+		// Craft a heartbeat request.
+		heartbeatData := apiHeartbeat{
+			Members: make(map[int64]apiHeartbeatMember),
+		}
+
+		for _, node := range nodes {
+			member := apiHeartbeatMember{
+				ID:      node.ID,
+				Address: node.Address,
+				Raft:    true,
+				Online:  false, //TODO
+			}
+			heartbeatData.Members[node.ID] = member
+		}
+
 		// Attempt to send a heartbeat to all other nodes
 		for _, node := range nodes {
-			go heartbeatNode(context.Background(), node.Address, cert, nodes)
+			go heartbeatNode(context.Background(), node.Address, cert, &heartbeatData)
 		}
 
 		return nil

From 349bdd5efb89d0a9f0525f0ca5214a48330de908 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Mon, 8 Jul 2019 15:15:34 +0100
Subject: [PATCH 3/3] cluster/gateway: Updates to receive new heartbeat format

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/cluster/gateway.go | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 7ec49e7e27..a0aa0f9c4f 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -163,12 +163,24 @@ func (g *Gateway) HandlerFuncs() map[string]http.HandlerFunc {
 
 		// Handle heatbeats.
 		if r.Method == "PUT" {
-			var nodes []db.RaftNode
-			err := shared.ReadToJSON(r.Body, &nodes)
+			var heartbeatData apiHeartbeat
+			err := shared.ReadToJSON(r.Body, &heartbeatData)
 			if err != nil {
-				http.Error(w, "400 invalid raft nodes payload", http.StatusBadRequest)
+				logger.Errorf("Error decoding heartbeat: %v", err)
+				http.Error(w, "400 invalid heartbeat payload", http.StatusBadRequest)
 				return
 			}
+
+			nodes := make([]db.RaftNode, 0)
+			for _, node := range heartbeatData.Members {
+				if node.Raft {
+					nodes = append(nodes, db.RaftNode{
+						ID:      node.ID,
+						Address: node.Address,
+					})
+				}
+			}
+
 			logger.Debugf("Replace current raft nodes with %+v", nodes)
 			err = g.db.Transaction(func(tx *db.NodeTx) error {
 				return tx.RaftNodesReplace(nodes)


More information about the lxc-devel mailing list