[lxc-devel] [lxd/master] Use cluster heartbeats to notify of needed LXD update
tomponline on Github
lxc-bot at linuxcontainers.org
Mon Jul 8 14:16:28 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190708/99114365/attachment.bin>
-------------- next part --------------
From dc2ad1a612a83e66617d0c2c1c0353c760336a00 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Mon, 8 Jul 2019 15:14:32 +0100
Subject: [PATCH 1/3] cluster/heartbeat: Adds new heartbeat request format
Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
lxd/cluster/heartbeat.go | 53 +++++++++++++++++++++++++++++++++++++---
1 file changed, 50 insertions(+), 3 deletions(-)
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index 2833bc92b2..15ea30f0ac 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -17,6 +17,23 @@ import (
"golang.org/x/net/context"
)
+type apiHeartbeatMember struct {
+ ID int64
+ Address string
+ Raft bool
+ Online bool
+}
+
+type apiHeartbeatVersion struct {
+ Schema int64
+ APIExtensions int64
+}
+
+type apiHeartbeat struct {
+ Members map[int64]apiHeartbeatMember
+ Version apiHeartbeatVersion
+}
+
// Heartbeat returns a task function that performs leader-initiated heartbeat
// checks against all LXD nodes in the cluster.
//
@@ -77,6 +94,35 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
heartbeatsLock := sync.Mutex{}
heartbeatsWg := sync.WaitGroup{}
+ // Craft a heartbeat request.
+ heartbeatData := apiHeartbeat{
+ Members: make(map[int64]apiHeartbeatMember),
+ }
+
+ for _, node := range raftNodes {
+ member := apiHeartbeatMember{
+ ID: node.ID,
+ Address: node.Address,
+ Raft: true,
+ Online: false, //TODO
+ }
+ heartbeatData.Members[node.ID] = member
+ }
+
+ for _, node := range nodes {
+ if _, exists := heartbeatData.Members[node.ID]; exists {
+ continue
+ }
+
+ member := apiHeartbeatMember{
+ ID: node.ID,
+ Address: node.Address,
+ Raft: false,
+ Online: false, //TODO
+ }
+ heartbeatData.Members[node.ID] = member
+ }
+
sendHeartbeat := func(id int64, address string, delay bool) {
defer heartbeatsWg.Done()
@@ -86,7 +132,7 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
}
logger.Debugf("Sending heartbeat to %s", address)
- err := heartbeatNode(ctx, address, gateway.cert, raftNodes)
+ err := heartbeatNode(ctx, address, gateway.cert, &heartbeatData)
if err == nil {
heartbeatsLock.Lock()
heartbeats[id] = true
@@ -193,7 +239,7 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
const heartbeatInterval = 10
// Perform a single heartbeat request against the node with the given address.
-func heartbeatNode(taskCtx context.Context, address string, cert *shared.CertInfo, raftNodes []db.RaftNode) error {
+func heartbeatNode(taskCtx context.Context, address string, cert *shared.CertInfo, heartbeatData *apiHeartbeat) error {
logger.Debugf("Sending heartbeat request to %s", address)
config, err := tlsClientConfig(cert)
@@ -204,7 +250,8 @@ func heartbeatNode(taskCtx context.Context, address string, cert *shared.CertInf
client := &http.Client{Transport: &http.Transport{TLSClientConfig: config}}
buffer := bytes.Buffer{}
- err = json.NewEncoder(&buffer).Encode(raftNodes)
+ logger.Errorf("tomp heartbeatNode: %+v", heartbeatData)
+ err = json.NewEncoder(&buffer).Encode(heartbeatData)
if err != nil {
return err
}
From b5e487f4bac2823c8f1fba9dbc0a42ce3164b117 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Mon, 8 Jul 2019 15:15:09 +0100
Subject: [PATCH 2/3] cluster/membership: Updates Join to send new heartbeat
format
Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
lxd/cluster/membership.go | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 9458682d69..1ca28f8df9 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -415,9 +415,24 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
return errors.Wrapf(err, "failed to unmark the node as pending")
}
+ // Craft a heartbeat request.
+ heartbeatData := apiHeartbeat{
+ Members: make(map[int64]apiHeartbeatMember),
+ }
+
+ for _, node := range nodes {
+ member := apiHeartbeatMember{
+ ID: node.ID,
+ Address: node.Address,
+ Raft: true,
+ Online: false, //TODO
+ }
+ heartbeatData.Members[node.ID] = member
+ }
+
// Attempt to send a heartbeat to all other nodes
for _, node := range nodes {
- go heartbeatNode(context.Background(), node.Address, cert, nodes)
+ go heartbeatNode(context.Background(), node.Address, cert, &heartbeatData)
}
return nil
From 349bdd5efb89d0a9f0525f0ca5214a48330de908 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Mon, 8 Jul 2019 15:15:34 +0100
Subject: [PATCH 3/3] cluster/gateway: Updates to receive new heartbeat format
Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
lxd/cluster/gateway.go | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 7ec49e7e27..a0aa0f9c4f 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -163,12 +163,24 @@ func (g *Gateway) HandlerFuncs() map[string]http.HandlerFunc {
// Handle heatbeats.
if r.Method == "PUT" {
- var nodes []db.RaftNode
- err := shared.ReadToJSON(r.Body, &nodes)
+ var heartbeatData apiHeartbeat
+ err := shared.ReadToJSON(r.Body, &heartbeatData)
if err != nil {
- http.Error(w, "400 invalid raft nodes payload", http.StatusBadRequest)
+ logger.Errorf("Error decoding heartbeat: %v", err)
+ http.Error(w, "400 invalid heartbeat payload", http.StatusBadRequest)
return
}
+
+ nodes := make([]db.RaftNode, 0)
+ for _, node := range heartbeatData.Members {
+ if node.Raft {
+ nodes = append(nodes, db.RaftNode{
+ ID: node.ID,
+ Address: node.Address,
+ })
+ }
+ }
+
logger.Debugf("Replace current raft nodes with %+v", nodes)
err = g.db.Transaction(func(tx *db.NodeTx) error {
return tx.RaftNodesReplace(nodes)
More information about the lxc-devel
mailing list