[lxc-devel] [lxd/master] lxd/cluster: Use hook for initial heartbeat
stgraber on Github
lxc-bot at linuxcontainers.org
Wed Jul 17 21:13:44 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 354 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190717/79f077f1/attachment.bin>
-------------- next part --------------
From b01afcfd04afa2cb1d5a298436b50a536bb79506 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 17 Jul 2019 17:11:57 -0400
Subject: [PATCH] lxd/cluster: Use hook for initial heartbeat
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/cluster/gateway.go | 15 +++
lxd/cluster/heartbeat.go | 272 +++++++++++++++++++--------------------
lxd/daemon.go | 10 +-
3 files changed, 155 insertions(+), 142 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 5051dc8174..2b4e3edac5 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -21,6 +21,7 @@ import (
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/eagain"
+ log "github.com/lxc/lxd/shared/log15"
"github.com/lxc/lxd/shared/logger"
"github.com/pkg/errors"
)
@@ -102,6 +103,10 @@ type Gateway struct {
// detected a peer with an higher version.
upgradeTriggered bool
+ // Used for the heartbeat handler
+ Cluster *db.Cluster
+ HeartbeatNodeHook func(*APIHeartbeat)
+
// ServerStore wrapper.
store *dqliteServerStore
@@ -541,6 +546,7 @@ func (g *Gateway) init() error {
}
options := []dqlite.ServerOption{
dqlite.WithServerLogFunc(DqliteLog),
+ dqlite.WithServerWatchFunc(g.watchFunc),
}
if raft.info.Address == "1" {
@@ -804,6 +810,15 @@ func dqliteNetworkDial(ctx context.Context, addr string, g *Gateway, checkLeader
return cUnix, nil
}
+func (g *Gateway) watchFunc(oldState int, newState int) {
+ if newState == dqlite.Leader && g.raft != nil {
+ logger.Info("Node was elected as dqlite leader", log.Ctx{"id": g.raft.info.ID, "address": g.raft.info.Address})
+
+ // Trigger an immediate full hearbeat run
+ g.heartbeat(context.Background(), true)
+ }
+}
+
// Create a dial function that connects to the given listener.
func dqliteMemoryDial(listener net.Listener) dqlite.DialFunc {
return func(ctx context.Context, address string) (net.Conn, error) {
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index aa1dcaee97..45bca1d908 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -108,7 +108,7 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, raftNodes []db.RaftNode,
return
}
-// sends heartbeat requests to the nodes supplied and updates heartbeat state.
+// Send sends heartbeat requests to the nodes supplied and updates heartbeat state.
func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, localAddress string, nodes []db.NodeInfo, delay bool) {
heartbeatsWg := sync.WaitGroup{}
sendHeartbeat := func(nodeID int64, address string, delay bool, heartbeatData *APIHeartbeat) {
@@ -161,177 +161,175 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, lo
heartbeatsWg.Wait()
}
-// Heartbeat returns a task function that performs leader-initiated heartbeat
+// HeartbeatTask returns a task function that performs leader-initiated heartbeat
// checks against all LXD nodes in the cluster.
//
// It will update the heartbeat timestamp column of the nodes table
// accordingly, and also notify them of the current list of database nodes.
-func Heartbeat(gateway *Gateway, cluster *db.Cluster, nodeRefreshTask func(*APIHeartbeat), lastLeaderHeartbeat *time.Time) (task.Func, task.Schedule) {
- heartbeat := func(ctx context.Context) {
- if gateway.server == nil || gateway.memoryDial != nil {
- // We're not a raft node or we're not clustered
- return
+func HeartbeatTask(gateway *Gateway) (task.Func, task.Schedule) {
+ // Since the database APIs are blocking we need to wrap the core logic
+ // and run it in a goroutine, so we can abort as soon as the context expires.
+ heartbeatWrapper := func(ctx context.Context) {
+ ch := make(chan struct{})
+ go func() {
+ gateway.heartbeat(ctx, false)
+ ch <- struct{}{}
+ }()
+ select {
+ case <-ch:
+ case <-ctx.Done():
}
+ }
- raftNodes, err := gateway.currentRaftNodes()
- if err == errNotLeader {
- return
- }
+ schedule := task.Every(time.Duration(heartbeatInterval) * time.Second)
- logger.Debugf("Starting heartbeat round")
- if err != nil {
- logger.Warnf("Failed to get current raft nodes: %v", err)
- return
- }
+ return heartbeatWrapper, schedule
+}
- // Replace the local raft_nodes table immediately because it
- // might miss a row containing ourselves, since we might have
- // been elected leader before the former leader had chance to
- // send us a fresh update through the heartbeat pool.
- logger.Debugf("Heartbeat updating local raft nodes to %+v", raftNodes)
- err = gateway.db.Transaction(func(tx *db.NodeTx) error {
- return tx.RaftNodesReplace(raftNodes)
- })
- if err != nil {
- logger.Warnf("Failed to replace local raft nodes: %v", err)
- return
- }
+func (g *Gateway) heartbeat(ctx context.Context, initialHeartbeat bool) {
+ if g.Cluster == nil || g.server == nil || g.memoryDial != nil {
+ // We're not a raft node or we're not clustered
+ return
+ }
- var allNodes []db.NodeInfo
- var localAddress string // Address of this node
- var offlineThreshold time.Duration
- err = cluster.Transaction(func(tx *db.ClusterTx) error {
- var err error
- allNodes, err = tx.Nodes()
- if err != nil {
- return err
- }
+ raftNodes, err := g.currentRaftNodes()
+ if err == errNotLeader {
+ return
+ }
- localAddress, err = tx.NodeAddress()
- if err != nil {
- return err
- }
+ logger.Debugf("Starting heartbeat round")
+ if err != nil {
+ logger.Warnf("Failed to get current raft nodes: %v", err)
+ return
+ }
- offlineThreshold, err = tx.NodeOfflineThreshold()
- if err != nil {
- return err
- }
+ // Replace the local raft_nodes table immediately because it
+ // might miss a row containing ourselves, since we might have
+ // been elected leader before the former leader had chance to
+ // send us a fresh update through the heartbeat pool.
+ logger.Debugf("Heartbeat updating local raft nodes to %+v", raftNodes)
+ err = g.db.Transaction(func(tx *db.NodeTx) error {
+ return tx.RaftNodesReplace(raftNodes)
+ })
+ if err != nil {
+ logger.Warnf("Failed to replace local raft nodes: %v", err)
+ return
+ }
- return nil
- })
+ var allNodes []db.NodeInfo
+ var localAddress string // Address of this node
+ var offlineThreshold time.Duration
+ err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
+ var err error
+ allNodes, err = tx.Nodes()
if err != nil {
- logger.Warnf("Failed to get current cluster nodes: %v", err)
- return
+ return err
}
- // Cumulative set of node states (will be written back to database once done).
- hbState := &APIHeartbeat{}
-
- // If this leader node hasn't sent a heartbeat recently, then its node state records
- // are likely out of date, this can happen when a node becomes a leader.
- // Send stale set to all nodes in database to get a fresh set of active nodes.
- if lastLeaderHeartbeat.IsZero() || time.Since(*lastLeaderHeartbeat) >= offlineThreshold {
- logger.Warnf("Leader has not initiated heartbeat since '%v', doing initial heartbeat rounds", *lastLeaderHeartbeat)
- hbState.Update(false, raftNodes, allNodes, offlineThreshold)
- hbState.Send(ctx, gateway.cert, localAddress, allNodes, false)
- logger.Debugf("Completed initial heartbeat round phase 1")
- // We have the latest set of node states now, lets send that state set to all nodes.
- hbState.Update(true, raftNodes, allNodes, offlineThreshold)
- hbState.Send(ctx, gateway.cert, localAddress, allNodes, false)
- logger.Debugf("Completed initial heartbeat round phase 2")
- } else {
- hbState.Update(true, raftNodes, allNodes, offlineThreshold)
- hbState.Send(ctx, gateway.cert, localAddress, allNodes, true)
+ localAddress, err = tx.NodeAddress()
+ if err != nil {
+ return err
}
- // Look for any new node which appeared since sending last heartbeat.
- var currentNodes []db.NodeInfo
- err = cluster.Transaction(func(tx *db.ClusterTx) error {
- var err error
- currentNodes, err = tx.Nodes()
- if err != nil {
- return err
- }
-
- return nil
- })
+ offlineThreshold, err = tx.NodeOfflineThreshold()
if err != nil {
- logger.Warnf("Failed to get current cluster nodes: %v", err)
- return
+ return err
}
- newNodes := []db.NodeInfo{}
- for _, currentNode := range currentNodes {
- existing := false
- for _, node := range allNodes {
- if node.Address == currentNode.Address && node.ID == currentNode.ID {
- existing = true
- break
- }
- }
-
- if !existing {
- // We found a new node
- allNodes = append(allNodes, currentNode)
- newNodes = append(newNodes, currentNode)
- }
- }
+ return nil
+ })
+ if err != nil {
+ logger.Warnf("Failed to get current cluster nodes: %v", err)
+ return
+ }
- // If any new nodes found, send heartbeat to just them (with full node state).
- if len(newNodes) > 0 {
- hbState.Update(true, raftNodes, allNodes, offlineThreshold)
- hbState.Send(ctx, gateway.cert, localAddress, newNodes, false)
- }
+ // Cumulative set of node states (will be written back to database once done).
+ hbState := &APIHeartbeat{}
+
+ // If this leader node hasn't sent a heartbeat recently, then its node state records
+ // are likely out of date, this can happen when a node becomes a leader.
+ // Send stale set to all nodes in database to get a fresh set of active nodes.
+ if initialHeartbeat {
+ hbState.Update(false, raftNodes, allNodes, offlineThreshold)
+ hbState.Send(ctx, g.cert, localAddress, allNodes, false)
+
+ // We have the latest set of node states now, lets send that state set to all nodes.
+ hbState.Update(true, raftNodes, allNodes, offlineThreshold)
+ hbState.Send(ctx, g.cert, localAddress, allNodes, false)
+ } else {
+ hbState.Update(true, raftNodes, allNodes, offlineThreshold)
+ hbState.Send(ctx, g.cert, localAddress, allNodes, true)
+ }
- // If the context has been cancelled, return immediately.
- if ctx.Err() != nil {
- logger.Debugf("Aborting heartbeat round")
- return
+ // Look for any new node which appeared since sending last heartbeat.
+ var currentNodes []db.NodeInfo
+ err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
+ var err error
+ currentNodes, err = tx.Nodes()
+ if err != nil {
+ return err
}
- err = cluster.Transaction(func(tx *db.ClusterTx) error {
- for _, node := range hbState.Members {
- if !node.updated {
- continue
- }
+ return nil
+ })
+ if err != nil {
+ logger.Warnf("Failed to get current cluster nodes: %v", err)
+ return
+ }
- err := tx.NodeHeartbeat(node.Address, time.Now())
- if err != nil {
- return err
- }
+ newNodes := []db.NodeInfo{}
+ for _, currentNode := range currentNodes {
+ existing := false
+ for _, node := range allNodes {
+ if node.Address == currentNode.Address && node.ID == currentNode.ID {
+ existing = true
+ break
}
- return nil
- })
- if err != nil {
- logger.Warnf("Failed to update heartbeat: %v", err)
}
- // If full node state was sent and node refresh task is specified, run it async.
- if nodeRefreshTask != nil {
- go nodeRefreshTask(hbState)
+
+ if !existing {
+ // We found a new node
+ allNodes = append(allNodes, currentNode)
+ newNodes = append(newNodes, currentNode)
}
+ }
- // Update last leader heartbeat time so next time a full node state list can be sent (if not this time).
- *lastLeaderHeartbeat = time.Now()
- logger.Debugf("Completed heartbeat round")
+ // If any new nodes found, send heartbeat to just them (with full node state).
+ if len(newNodes) > 0 {
+ hbState.Update(true, raftNodes, allNodes, offlineThreshold)
+ hbState.Send(ctx, g.cert, localAddress, newNodes, false)
}
- // Since the database APIs are blocking we need to wrap the core logic
- // and run it in a goroutine, so we can abort as soon as the context expires.
- heartbeatWrapper := func(ctx context.Context) {
- ch := make(chan struct{})
- go func() {
- heartbeat(ctx)
- ch <- struct{}{}
- }()
- select {
- case <-ch:
- case <-ctx.Done():
+ // If the context has been cancelled, return immediately.
+ if ctx.Err() != nil {
+ logger.Debugf("Aborting heartbeat round")
+ return
+ }
+
+ err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
+ for _, node := range hbState.Members {
+ if !node.updated {
+ continue
+ }
+
+ err := tx.NodeHeartbeat(node.Address, time.Now())
+ if err != nil {
+ return err
+ }
}
+ return nil
+ })
+ if err != nil {
+ logger.Warnf("Failed to update heartbeat: %v", err)
}
- schedule := task.Every(time.Duration(heartbeatInterval) * time.Second)
+ // If full node state was sent and node refresh task is specified, run it async.
+ if g.HeartbeatNodeHook != nil {
+ go g.HeartbeatNodeHook(hbState)
+ }
- return heartbeatWrapper, schedule
+ // Update last leader heartbeat time so next time a full node state list can be sent (if not this time).
+ logger.Debugf("Completed heartbeat round")
}
// heartbeatInterval Number of seconds to wait between to heartbeat rounds.
diff --git a/lxd/daemon.go b/lxd/daemon.go
index 2c7fe5e450..6f6e1ef9fd 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -76,9 +76,6 @@ type Daemon struct {
externalAuth *externalAuth
- // Stores the last time this node initiated a leader heartbeat run.
- lastLeaderHeartbeat time.Time
-
// Stores last heartbeat node information to detect node changes.
lastNodeList *cluster.APIHeartbeat
}
@@ -623,6 +620,7 @@ func (d *Daemon) init() error {
if err != nil {
return err
}
+ d.gateway.HeartbeatNodeHook = d.NodeRefreshTask
/* Setup some mounts (nice to have) */
if !d.os.MockMode {
@@ -710,7 +708,7 @@ func (d *Daemon) init() error {
// The only thing we want to still do on this node is
// to run the heartbeat task, in case we are the raft
// leader.
- stop, _ := task.Start(cluster.Heartbeat(d.gateway, d.cluster, d.NodeRefreshTask, &d.lastLeaderHeartbeat))
+ stop, _ := task.Start(cluster.HeartbeatTask(d.gateway))
d.gateway.WaitUpgradeNotification()
stop(time.Second)
@@ -720,6 +718,7 @@ func (d *Daemon) init() error {
}
return errors.Wrap(err, "failed to open cluster database")
}
+
err = cluster.NotifyUpgradeCompleted(d.State(), certInfo)
if err != nil {
// Ignore the error, since it's not fatal for this particular
@@ -727,6 +726,7 @@ func (d *Daemon) init() error {
// offline.
logger.Debugf("Could not notify all nodes of database upgrade: %v", err)
}
+ d.gateway.Cluster = d.cluster
/* Migrate the node local data to the cluster database, if needed */
if dump != nil {
@@ -915,7 +915,7 @@ func (d *Daemon) init() error {
func (d *Daemon) startClusterTasks() {
// Heartbeats
- d.clusterTasks.Add(cluster.Heartbeat(d.gateway, d.cluster, d.NodeRefreshTask, &d.lastLeaderHeartbeat))
+ d.clusterTasks.Add(cluster.HeartbeatTask(d.gateway))
// Events
d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, eventForward))
More information about the lxc-devel
mailing list