[lxc-devel] [lxd/master] lxd/cluster: Use hook for initial heartbeat

stgraber on Github lxc-bot at linuxcontainers.org
Wed Jul 17 21:13:44 UTC 2019


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 354 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190717/79f077f1/attachment.bin>
-------------- next part --------------
From b01afcfd04afa2cb1d5a298436b50a536bb79506 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 17 Jul 2019 17:11:57 -0400
Subject: [PATCH] lxd/cluster: Use hook for initial heartbeat
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/cluster/gateway.go   |  15 +++
 lxd/cluster/heartbeat.go | 272 +++++++++++++++++++--------------------
 lxd/daemon.go            |  10 +-
 3 files changed, 155 insertions(+), 142 deletions(-)

diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 5051dc8174..2b4e3edac5 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -21,6 +21,7 @@ import (
 	"github.com/lxc/lxd/lxd/util"
 	"github.com/lxc/lxd/shared"
 	"github.com/lxc/lxd/shared/eagain"
+	log "github.com/lxc/lxd/shared/log15"
 	"github.com/lxc/lxd/shared/logger"
 	"github.com/pkg/errors"
 )
@@ -102,6 +103,10 @@ type Gateway struct {
 	// detected a peer with an higher version.
 	upgradeTriggered bool
 
+	// Used for the heartbeat handler
+	Cluster           *db.Cluster
+	HeartbeatNodeHook func(*APIHeartbeat)
+
 	// ServerStore wrapper.
 	store *dqliteServerStore
 
@@ -541,6 +546,7 @@ func (g *Gateway) init() error {
 		}
 		options := []dqlite.ServerOption{
 			dqlite.WithServerLogFunc(DqliteLog),
+			dqlite.WithServerWatchFunc(g.watchFunc),
 		}
 
 		if raft.info.Address == "1" {
@@ -804,6 +810,15 @@ func dqliteNetworkDial(ctx context.Context, addr string, g *Gateway, checkLeader
 	return cUnix, nil
 }
 
+func (g *Gateway) watchFunc(oldState int, newState int) {
+	if newState == dqlite.Leader && g.raft != nil {
+		logger.Info("Node was elected as dqlite leader", log.Ctx{"id": g.raft.info.ID, "address": g.raft.info.Address})
+
+		// Trigger an immediate full hearbeat run
+		g.heartbeat(context.Background(), true)
+	}
+}
+
 // Create a dial function that connects to the given listener.
 func dqliteMemoryDial(listener net.Listener) dqlite.DialFunc {
 	return func(ctx context.Context, address string) (net.Conn, error) {
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index aa1dcaee97..45bca1d908 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -108,7 +108,7 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, raftNodes []db.RaftNode,
 	return
 }
 
-// sends heartbeat requests to the nodes supplied and updates heartbeat state.
+// Send sends heartbeat requests to the nodes supplied and updates heartbeat state.
 func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, localAddress string, nodes []db.NodeInfo, delay bool) {
 	heartbeatsWg := sync.WaitGroup{}
 	sendHeartbeat := func(nodeID int64, address string, delay bool, heartbeatData *APIHeartbeat) {
@@ -161,177 +161,175 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, lo
 	heartbeatsWg.Wait()
 }
 
-// Heartbeat returns a task function that performs leader-initiated heartbeat
+// HeartbeatTask returns a task function that performs leader-initiated heartbeat
 // checks against all LXD nodes in the cluster.
 //
 // It will update the heartbeat timestamp column of the nodes table
 // accordingly, and also notify them of the current list of database nodes.
-func Heartbeat(gateway *Gateway, cluster *db.Cluster, nodeRefreshTask func(*APIHeartbeat), lastLeaderHeartbeat *time.Time) (task.Func, task.Schedule) {
-	heartbeat := func(ctx context.Context) {
-		if gateway.server == nil || gateway.memoryDial != nil {
-			// We're not a raft node or we're not clustered
-			return
+func HeartbeatTask(gateway *Gateway) (task.Func, task.Schedule) {
+	// Since the database APIs are blocking we need to wrap the core logic
+	// and run it in a goroutine, so we can abort as soon as the context expires.
+	heartbeatWrapper := func(ctx context.Context) {
+		ch := make(chan struct{})
+		go func() {
+			gateway.heartbeat(ctx, false)
+			ch <- struct{}{}
+		}()
+		select {
+		case <-ch:
+		case <-ctx.Done():
 		}
+	}
 
-		raftNodes, err := gateway.currentRaftNodes()
-		if err == errNotLeader {
-			return
-		}
+	schedule := task.Every(time.Duration(heartbeatInterval) * time.Second)
 
-		logger.Debugf("Starting heartbeat round")
-		if err != nil {
-			logger.Warnf("Failed to get current raft nodes: %v", err)
-			return
-		}
+	return heartbeatWrapper, schedule
+}
 
-		// Replace the local raft_nodes table immediately because it
-		// might miss a row containing ourselves, since we might have
-		// been elected leader before the former leader had chance to
-		// send us a fresh update through the heartbeat pool.
-		logger.Debugf("Heartbeat updating local raft nodes to %+v", raftNodes)
-		err = gateway.db.Transaction(func(tx *db.NodeTx) error {
-			return tx.RaftNodesReplace(raftNodes)
-		})
-		if err != nil {
-			logger.Warnf("Failed to replace local raft nodes: %v", err)
-			return
-		}
+func (g *Gateway) heartbeat(ctx context.Context, initialHeartbeat bool) {
+	if g.Cluster == nil || g.server == nil || g.memoryDial != nil {
+		// We're not a raft node or we're not clustered
+		return
+	}
 
-		var allNodes []db.NodeInfo
-		var localAddress string // Address of this node
-		var offlineThreshold time.Duration
-		err = cluster.Transaction(func(tx *db.ClusterTx) error {
-			var err error
-			allNodes, err = tx.Nodes()
-			if err != nil {
-				return err
-			}
+	raftNodes, err := g.currentRaftNodes()
+	if err == errNotLeader {
+		return
+	}
 
-			localAddress, err = tx.NodeAddress()
-			if err != nil {
-				return err
-			}
+	logger.Debugf("Starting heartbeat round")
+	if err != nil {
+		logger.Warnf("Failed to get current raft nodes: %v", err)
+		return
+	}
 
-			offlineThreshold, err = tx.NodeOfflineThreshold()
-			if err != nil {
-				return err
-			}
+	// Replace the local raft_nodes table immediately because it
+	// might miss a row containing ourselves, since we might have
+	// been elected leader before the former leader had chance to
+	// send us a fresh update through the heartbeat pool.
+	logger.Debugf("Heartbeat updating local raft nodes to %+v", raftNodes)
+	err = g.db.Transaction(func(tx *db.NodeTx) error {
+		return tx.RaftNodesReplace(raftNodes)
+	})
+	if err != nil {
+		logger.Warnf("Failed to replace local raft nodes: %v", err)
+		return
+	}
 
-			return nil
-		})
+	var allNodes []db.NodeInfo
+	var localAddress string // Address of this node
+	var offlineThreshold time.Duration
+	err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
+		var err error
+		allNodes, err = tx.Nodes()
 		if err != nil {
-			logger.Warnf("Failed to get current cluster nodes: %v", err)
-			return
+			return err
 		}
 
-		// Cumulative set of node states (will be written back to database once done).
-		hbState := &APIHeartbeat{}
-
-		// If this leader node hasn't sent a heartbeat recently, then its node state records
-		// are likely out of date, this can happen when a node becomes a leader.
-		// Send stale set to all nodes in database to get a fresh set of active nodes.
-		if lastLeaderHeartbeat.IsZero() || time.Since(*lastLeaderHeartbeat) >= offlineThreshold {
-			logger.Warnf("Leader has not initiated heartbeat since '%v', doing initial heartbeat rounds", *lastLeaderHeartbeat)
-			hbState.Update(false, raftNodes, allNodes, offlineThreshold)
-			hbState.Send(ctx, gateway.cert, localAddress, allNodes, false)
-			logger.Debugf("Completed initial heartbeat round phase 1")
-			// We have the latest set of node states now, lets send that state set to all nodes.
-			hbState.Update(true, raftNodes, allNodes, offlineThreshold)
-			hbState.Send(ctx, gateway.cert, localAddress, allNodes, false)
-			logger.Debugf("Completed initial heartbeat round phase 2")
-		} else {
-			hbState.Update(true, raftNodes, allNodes, offlineThreshold)
-			hbState.Send(ctx, gateway.cert, localAddress, allNodes, true)
+		localAddress, err = tx.NodeAddress()
+		if err != nil {
+			return err
 		}
 
-		// Look for any new node which appeared since sending last heartbeat.
-		var currentNodes []db.NodeInfo
-		err = cluster.Transaction(func(tx *db.ClusterTx) error {
-			var err error
-			currentNodes, err = tx.Nodes()
-			if err != nil {
-				return err
-			}
-
-			return nil
-		})
+		offlineThreshold, err = tx.NodeOfflineThreshold()
 		if err != nil {
-			logger.Warnf("Failed to get current cluster nodes: %v", err)
-			return
+			return err
 		}
 
-		newNodes := []db.NodeInfo{}
-		for _, currentNode := range currentNodes {
-			existing := false
-			for _, node := range allNodes {
-				if node.Address == currentNode.Address && node.ID == currentNode.ID {
-					existing = true
-					break
-				}
-			}
-
-			if !existing {
-				// We found a new node
-				allNodes = append(allNodes, currentNode)
-				newNodes = append(newNodes, currentNode)
-			}
-		}
+		return nil
+	})
+	if err != nil {
+		logger.Warnf("Failed to get current cluster nodes: %v", err)
+		return
+	}
 
-		// If any new nodes found, send heartbeat to just them (with full node state).
-		if len(newNodes) > 0 {
-			hbState.Update(true, raftNodes, allNodes, offlineThreshold)
-			hbState.Send(ctx, gateway.cert, localAddress, newNodes, false)
-		}
+	// Cumulative set of node states (will be written back to database once done).
+	hbState := &APIHeartbeat{}
+
+	// If this leader node hasn't sent a heartbeat recently, then its node state records
+	// are likely out of date, this can happen when a node becomes a leader.
+	// Send stale set to all nodes in database to get a fresh set of active nodes.
+	if initialHeartbeat {
+		hbState.Update(false, raftNodes, allNodes, offlineThreshold)
+		hbState.Send(ctx, g.cert, localAddress, allNodes, false)
+
+		// We have the latest set of node states now, lets send that state set to all nodes.
+		hbState.Update(true, raftNodes, allNodes, offlineThreshold)
+		hbState.Send(ctx, g.cert, localAddress, allNodes, false)
+	} else {
+		hbState.Update(true, raftNodes, allNodes, offlineThreshold)
+		hbState.Send(ctx, g.cert, localAddress, allNodes, true)
+	}
 
-		// If the context has been cancelled, return immediately.
-		if ctx.Err() != nil {
-			logger.Debugf("Aborting heartbeat round")
-			return
+	// Look for any new node which appeared since sending last heartbeat.
+	var currentNodes []db.NodeInfo
+	err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
+		var err error
+		currentNodes, err = tx.Nodes()
+		if err != nil {
+			return err
 		}
 
-		err = cluster.Transaction(func(tx *db.ClusterTx) error {
-			for _, node := range hbState.Members {
-				if !node.updated {
-					continue
-				}
+		return nil
+	})
+	if err != nil {
+		logger.Warnf("Failed to get current cluster nodes: %v", err)
+		return
+	}
 
-				err := tx.NodeHeartbeat(node.Address, time.Now())
-				if err != nil {
-					return err
-				}
+	newNodes := []db.NodeInfo{}
+	for _, currentNode := range currentNodes {
+		existing := false
+		for _, node := range allNodes {
+			if node.Address == currentNode.Address && node.ID == currentNode.ID {
+				existing = true
+				break
 			}
-			return nil
-		})
-		if err != nil {
-			logger.Warnf("Failed to update heartbeat: %v", err)
 		}
-		// If full node state was sent and node refresh task is specified, run it async.
-		if nodeRefreshTask != nil {
-			go nodeRefreshTask(hbState)
+
+		if !existing {
+			// We found a new node
+			allNodes = append(allNodes, currentNode)
+			newNodes = append(newNodes, currentNode)
 		}
+	}
 
-		// Update last leader heartbeat time so next time a full node state list can be sent (if not this time).
-		*lastLeaderHeartbeat = time.Now()
-		logger.Debugf("Completed heartbeat round")
+	// If any new nodes found, send heartbeat to just them (with full node state).
+	if len(newNodes) > 0 {
+		hbState.Update(true, raftNodes, allNodes, offlineThreshold)
+		hbState.Send(ctx, g.cert, localAddress, newNodes, false)
 	}
 
-	// Since the database APIs are blocking we need to wrap the core logic
-	// and run it in a goroutine, so we can abort as soon as the context expires.
-	heartbeatWrapper := func(ctx context.Context) {
-		ch := make(chan struct{})
-		go func() {
-			heartbeat(ctx)
-			ch <- struct{}{}
-		}()
-		select {
-		case <-ch:
-		case <-ctx.Done():
+	// If the context has been cancelled, return immediately.
+	if ctx.Err() != nil {
+		logger.Debugf("Aborting heartbeat round")
+		return
+	}
+
+	err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
+		for _, node := range hbState.Members {
+			if !node.updated {
+				continue
+			}
+
+			err := tx.NodeHeartbeat(node.Address, time.Now())
+			if err != nil {
+				return err
+			}
 		}
+		return nil
+	})
+	if err != nil {
+		logger.Warnf("Failed to update heartbeat: %v", err)
 	}
 
-	schedule := task.Every(time.Duration(heartbeatInterval) * time.Second)
+	// If full node state was sent and node refresh task is specified, run it async.
+	if g.HeartbeatNodeHook != nil {
+		go g.HeartbeatNodeHook(hbState)
+	}
 
-	return heartbeatWrapper, schedule
+	// Update last leader heartbeat time so next time a full node state list can be sent (if not this time).
+	logger.Debugf("Completed heartbeat round")
 }
 
 // heartbeatInterval Number of seconds to wait between to heartbeat rounds.
diff --git a/lxd/daemon.go b/lxd/daemon.go
index 2c7fe5e450..6f6e1ef9fd 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -76,9 +76,6 @@ type Daemon struct {
 
 	externalAuth *externalAuth
 
-	// Stores the last time this node initiated a leader heartbeat run.
-	lastLeaderHeartbeat time.Time
-
 	// Stores last heartbeat node information to detect node changes.
 	lastNodeList *cluster.APIHeartbeat
 }
@@ -623,6 +620,7 @@ func (d *Daemon) init() error {
 	if err != nil {
 		return err
 	}
+	d.gateway.HeartbeatNodeHook = d.NodeRefreshTask
 
 	/* Setup some mounts (nice to have) */
 	if !d.os.MockMode {
@@ -710,7 +708,7 @@ func (d *Daemon) init() error {
 			// The only thing we want to still do on this node is
 			// to run the heartbeat task, in case we are the raft
 			// leader.
-			stop, _ := task.Start(cluster.Heartbeat(d.gateway, d.cluster, d.NodeRefreshTask, &d.lastLeaderHeartbeat))
+			stop, _ := task.Start(cluster.HeartbeatTask(d.gateway))
 			d.gateway.WaitUpgradeNotification()
 			stop(time.Second)
 
@@ -720,6 +718,7 @@ func (d *Daemon) init() error {
 		}
 		return errors.Wrap(err, "failed to open cluster database")
 	}
+
 	err = cluster.NotifyUpgradeCompleted(d.State(), certInfo)
 	if err != nil {
 		// Ignore the error, since it's not fatal for this particular
@@ -727,6 +726,7 @@ func (d *Daemon) init() error {
 		// offline.
 		logger.Debugf("Could not notify all nodes of database upgrade: %v", err)
 	}
+	d.gateway.Cluster = d.cluster
 
 	/* Migrate the node local data to the cluster database, if needed */
 	if dump != nil {
@@ -915,7 +915,7 @@ func (d *Daemon) init() error {
 
 func (d *Daemon) startClusterTasks() {
 	// Heartbeats
-	d.clusterTasks.Add(cluster.Heartbeat(d.gateway, d.cluster, d.NodeRefreshTask, &d.lastLeaderHeartbeat))
+	d.clusterTasks.Add(cluster.HeartbeatTask(d.gateway))
 
 	// Events
 	d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, eventForward))


More information about the lxc-devel mailing list