[lxc-devel] [lxd/master] DEBUG: clustering_storage ceph failures

stgraber on Github lxc-bot at linuxcontainers.org
Thu Jul 18 04:08:10 UTC 2019


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190717/c370a1ff/attachment-0001.bin>
-------------- next part --------------
From 00daff533635a7dd908977d3c7d1eaffe991cf60 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 17 Jul 2019 18:23:46 -0400
Subject: [PATCH 1/4] lxd/storage/ceph: Handle EBUSY on unmap
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/storage_ceph_utils.go | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/lxd/storage_ceph_utils.go b/lxd/storage_ceph_utils.go
index 2ae569d358..9c458ad929 100644
--- a/lxd/storage_ceph_utils.go
+++ b/lxd/storage_ceph_utils.go
@@ -9,6 +9,7 @@ import (
 	"strconv"
 	"strings"
 	"syscall"
+	"time"
 
 	"github.com/pborman/uuid"
 	"golang.org/x/sys/unix"
@@ -169,6 +170,8 @@ func cephRBDVolumeUnmap(clusterName string, poolName string, volumeName string,
 	volumeType string, userName string, unmapUntilEINVAL bool) error {
 	unmapImageName := fmt.Sprintf("%s_%s", volumeType, volumeName)
 
+	busyCount := 0
+
 again:
 	_, err := shared.RunCommand(
 		"rbd",
@@ -187,8 +190,21 @@ again:
 					// EINVAL (already unmapped)
 					return nil
 				}
+
+				if waitStatus.ExitStatus() == 16 {
+					// EBUSY (currently in use)
+					busyCount++
+					if busyCount == 10 {
+						return err
+					}
+
+					// Wait a second an try again
+					time.Sleep(time.Second)
+					goto again
+				}
 			}
 		}
+
 		return err
 	}
 

From 8e850c97fdd7082ba91dc5d6161c4d08b28b6a7c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 17 Jul 2019 19:18:20 -0400
Subject: [PATCH 2/4] lxd/storage/ceph: Slightly speed up creation
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/storage_ceph.go | 65 ++++++++++++++++-----------------------------
 1 file changed, 23 insertions(+), 42 deletions(-)

diff --git a/lxd/storage_ceph.go b/lxd/storage_ceph.go
index 6a1253f102..279ea043a8 100644
--- a/lxd/storage_ceph.go
+++ b/lxd/storage_ceph.go
@@ -136,11 +136,8 @@ func (s *storageCeph) StoragePoolCreate() error {
 	if !cephOSDPoolExists(s.ClusterName, s.OSDPoolName, s.UserName) {
 		logger.Debugf(`CEPH OSD storage pool "%s" does not exist`, s.OSDPoolName)
 
-		// create new osd pool
-		msg, err := shared.TryRunCommand("ceph", "--name",
-			fmt.Sprintf("client.%s", s.UserName), "--cluster",
-			s.ClusterName, "osd", "pool", "create", s.OSDPoolName,
-			s.PGNum)
+		// Create new osd pool
+		msg, err := shared.TryRunCommand("ceph", "--name", fmt.Sprintf("client.%s", s.UserName), "--cluster", s.ClusterName, "osd", "pool", "create", s.OSDPoolName, s.PGNum)
 		if err != nil {
 			logger.Errorf(`Failed to create CEPH osd storage pool "%s" in cluster "%s": %s`, s.OSDPoolName, s.ClusterName, msg)
 			return err
@@ -152,27 +149,39 @@ func (s *storageCeph) StoragePoolCreate() error {
 				return
 			}
 
-			err := cephOSDPoolDestroy(s.ClusterName, s.OSDPoolName,
-				s.UserName)
+			err := cephOSDPoolDestroy(s.ClusterName, s.OSDPoolName, s.UserName)
 			if err != nil {
 				logger.Warnf(`Failed to delete ceph storage pool "%s" in cluster "%s": %s`, s.OSDPoolName, s.ClusterName, err)
 			}
 		}()
 
+		// Create dummy storage volume. Other LXD instances will use this to detect whether this osd pool is already in use by another LXD instance.
+		err = cephRBDVolumeCreate(s.ClusterName, s.OSDPoolName, s.OSDPoolName, "lxd", "0", s.UserName)
+		if err != nil {
+			logger.Errorf(`Failed to create RBD storage volume "%s" on storage pool "%s": %s`, s.pool.Name, s.pool.Name, err)
+			return err
+		}
+		s.pool.Config["volatile.pool.pristine"] = "true"
+		logger.Debugf(`Created RBD storage volume "%s" on storage pool "%s"`, s.pool.Name, s.pool.Name)
 	} else {
 		logger.Debugf(`CEPH OSD storage pool "%s" does exist`, s.OSDPoolName)
 
-		// use existing osd pool
-		msg, err := shared.RunCommand("ceph", "--name",
-			fmt.Sprintf("client.%s", s.UserName),
-			"--cluster", s.ClusterName, "osd", "pool", "get",
-			s.OSDPoolName, "pg_num")
+		ok := cephRBDVolumeExists(s.ClusterName, s.OSDPoolName, s.OSDPoolName, "lxd", s.UserName)
+		s.pool.Config["volatile.pool.pristine"] = "false"
+		if ok {
+			if s.pool.Config["ceph.osd.force_reuse"] == "" || !shared.IsTrue(s.pool.Config["ceph.osd.force_reuse"]) {
+				return fmt.Errorf("CEPH OSD storage pool \"%s\" in cluster \"%s\" seems to be in use by another LXD instance. Use \"ceph.osd.force_reuse=true\" to force.", s.pool.Name, s.ClusterName)
+			}
+		}
+
+		// Use existing osd pool
+		msg, err := shared.RunCommand("ceph", "--name", fmt.Sprintf("client.%s", s.UserName), "--cluster", s.ClusterName, "osd", "pool", "get", s.OSDPoolName, "pg_num")
 		if err != nil {
 			logger.Errorf(`Failed to retrieve number of placement groups for CEPH osd storage pool "%s" in cluster "%s": %s`, s.OSDPoolName, s.ClusterName, msg)
 			return err
 		}
-		logger.Debugf(`Retrieved number of placement groups or CEPH osd storage pool "%s" in cluster "%s"`, s.OSDPoolName, s.ClusterName)
 
+		logger.Debugf(`Retrieved number of placement groups or CEPH osd storage pool "%s" in cluster "%s"`, s.OSDPoolName, s.ClusterName)
 		idx := strings.Index(msg, "pg_num:")
 		if idx == -1 {
 			logger.Errorf(`Failed to parse number of placement groups for CEPH osd storage pool "%s" in cluster "%s": %s`, s.OSDPoolName, s.ClusterName, msg)
@@ -180,6 +189,7 @@ func (s *storageCeph) StoragePoolCreate() error {
 
 		msg = msg[(idx + len("pg_num:")):]
 		msg = strings.TrimSpace(msg)
+
 		// It is ok to update the pool configuration since storage pool
 		// creation via API is implemented such that the storage pool is
 		// checked for a changed config after this function returns and
@@ -225,35 +235,6 @@ func (s *storageCeph) StoragePoolCreate() error {
 		}
 	}()
 
-	ok := cephRBDVolumeExists(s.ClusterName, s.OSDPoolName, s.OSDPoolName,
-		"lxd", s.UserName)
-	s.pool.Config["volatile.pool.pristine"] = "false"
-	if !ok {
-		s.pool.Config["volatile.pool.pristine"] = "true"
-		// Create dummy storage volume. Other LXD instances will use
-		// this to detect whether this osd pool is already in use by
-		// another LXD instance.
-		err = cephRBDVolumeCreate(s.ClusterName, s.OSDPoolName,
-			s.OSDPoolName, "lxd", "0", s.UserName)
-		if err != nil {
-			logger.Errorf(`Failed to create RBD storage volume "%s" on storage pool "%s": %s`, s.pool.Name, s.pool.Name, err)
-			return err
-		}
-		logger.Debugf(`Created RBD storage volume "%s" on storage pool "%s"`, s.pool.Name, s.pool.Name)
-	} else {
-		msg := fmt.Sprintf(`CEPH OSD storage pool "%s" in cluster `+
-			`"%s" seems to be in use by another LXD instace`,
-			s.pool.Name, s.ClusterName)
-		if s.pool.Config["ceph.osd.force_reuse"] == "" ||
-			!shared.IsTrue(s.pool.Config["ceph.osd.force_reuse"]) {
-			msg += `. Set "ceph.osd.force_reuse=true" to force ` +
-				`LXD to reuse the pool`
-			logger.Errorf(msg)
-			return fmt.Errorf(msg)
-		}
-		logger.Warnf(msg)
-	}
-
 	logger.Infof(`Created CEPH OSD storage pool "%s" in cluster "%s"`,
 		s.pool.Name, s.ClusterName)
 

From 1747a6ef7710e7fbc3a4026fb1873e3aae776187 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 17 Jul 2019 17:11:57 -0400
Subject: [PATCH 3/4] lxd/cluster: Use hook for initial heartbeat
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/cluster/gateway.go        |  15 ++
 lxd/cluster/heartbeat.go      | 272 +++++++++++++++++-----------------
 lxd/cluster/heartbeat_test.go |   8 +-
 lxd/daemon.go                 |  10 +-
 4 files changed, 159 insertions(+), 146 deletions(-)

diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 5051dc8174..78a9264a46 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -21,6 +21,7 @@ import (
 	"github.com/lxc/lxd/lxd/util"
 	"github.com/lxc/lxd/shared"
 	"github.com/lxc/lxd/shared/eagain"
+	log "github.com/lxc/lxd/shared/log15"
 	"github.com/lxc/lxd/shared/logger"
 	"github.com/pkg/errors"
 )
@@ -102,6 +103,10 @@ type Gateway struct {
 	// detected a peer with an higher version.
 	upgradeTriggered bool
 
+	// Used for the heartbeat handler
+	Cluster           *db.Cluster
+	HeartbeatNodeHook func(*APIHeartbeat)
+
 	// ServerStore wrapper.
 	store *dqliteServerStore
 
@@ -541,6 +546,7 @@ func (g *Gateway) init() error {
 		}
 		options := []dqlite.ServerOption{
 			dqlite.WithServerLogFunc(DqliteLog),
+			dqlite.WithServerWatchFunc(g.watchFunc),
 		}
 
 		if raft.info.Address == "1" {
@@ -804,6 +810,15 @@ func dqliteNetworkDial(ctx context.Context, addr string, g *Gateway, checkLeader
 	return cUnix, nil
 }
 
+func (g *Gateway) watchFunc(oldState int, newState int) {
+	if newState == dqlite.Leader && g.raft != nil {
+		logger.Info("Node was elected as dqlite leader", log.Ctx{"id": g.raft.info.ID, "address": g.raft.info.Address})
+
+		// Trigger an immediate full hearbeat run
+		go g.heartbeat(g.ctx, true)
+	}
+}
+
 // Create a dial function that connects to the given listener.
 func dqliteMemoryDial(listener net.Listener) dqlite.DialFunc {
 	return func(ctx context.Context, address string) (net.Conn, error) {
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index aa1dcaee97..45bca1d908 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -108,7 +108,7 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, raftNodes []db.RaftNode,
 	return
 }
 
-// sends heartbeat requests to the nodes supplied and updates heartbeat state.
+// Send sends heartbeat requests to the nodes supplied and updates heartbeat state.
 func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, localAddress string, nodes []db.NodeInfo, delay bool) {
 	heartbeatsWg := sync.WaitGroup{}
 	sendHeartbeat := func(nodeID int64, address string, delay bool, heartbeatData *APIHeartbeat) {
@@ -161,177 +161,175 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, lo
 	heartbeatsWg.Wait()
 }
 
-// Heartbeat returns a task function that performs leader-initiated heartbeat
+// HeartbeatTask returns a task function that performs leader-initiated heartbeat
 // checks against all LXD nodes in the cluster.
 //
 // It will update the heartbeat timestamp column of the nodes table
 // accordingly, and also notify them of the current list of database nodes.
-func Heartbeat(gateway *Gateway, cluster *db.Cluster, nodeRefreshTask func(*APIHeartbeat), lastLeaderHeartbeat *time.Time) (task.Func, task.Schedule) {
-	heartbeat := func(ctx context.Context) {
-		if gateway.server == nil || gateway.memoryDial != nil {
-			// We're not a raft node or we're not clustered
-			return
+func HeartbeatTask(gateway *Gateway) (task.Func, task.Schedule) {
+	// Since the database APIs are blocking we need to wrap the core logic
+	// and run it in a goroutine, so we can abort as soon as the context expires.
+	heartbeatWrapper := func(ctx context.Context) {
+		ch := make(chan struct{})
+		go func() {
+			gateway.heartbeat(ctx, false)
+			ch <- struct{}{}
+		}()
+		select {
+		case <-ch:
+		case <-ctx.Done():
 		}
+	}
 
-		raftNodes, err := gateway.currentRaftNodes()
-		if err == errNotLeader {
-			return
-		}
+	schedule := task.Every(time.Duration(heartbeatInterval) * time.Second)
 
-		logger.Debugf("Starting heartbeat round")
-		if err != nil {
-			logger.Warnf("Failed to get current raft nodes: %v", err)
-			return
-		}
+	return heartbeatWrapper, schedule
+}
 
-		// Replace the local raft_nodes table immediately because it
-		// might miss a row containing ourselves, since we might have
-		// been elected leader before the former leader had chance to
-		// send us a fresh update through the heartbeat pool.
-		logger.Debugf("Heartbeat updating local raft nodes to %+v", raftNodes)
-		err = gateway.db.Transaction(func(tx *db.NodeTx) error {
-			return tx.RaftNodesReplace(raftNodes)
-		})
-		if err != nil {
-			logger.Warnf("Failed to replace local raft nodes: %v", err)
-			return
-		}
+func (g *Gateway) heartbeat(ctx context.Context, initialHeartbeat bool) {
+	if g.Cluster == nil || g.server == nil || g.memoryDial != nil {
+		// We're not a raft node or we're not clustered
+		return
+	}
 
-		var allNodes []db.NodeInfo
-		var localAddress string // Address of this node
-		var offlineThreshold time.Duration
-		err = cluster.Transaction(func(tx *db.ClusterTx) error {
-			var err error
-			allNodes, err = tx.Nodes()
-			if err != nil {
-				return err
-			}
+	raftNodes, err := g.currentRaftNodes()
+	if err == errNotLeader {
+		return
+	}
 
-			localAddress, err = tx.NodeAddress()
-			if err != nil {
-				return err
-			}
+	logger.Debugf("Starting heartbeat round")
+	if err != nil {
+		logger.Warnf("Failed to get current raft nodes: %v", err)
+		return
+	}
 
-			offlineThreshold, err = tx.NodeOfflineThreshold()
-			if err != nil {
-				return err
-			}
+	// Replace the local raft_nodes table immediately because it
+	// might miss a row containing ourselves, since we might have
+	// been elected leader before the former leader had chance to
+	// send us a fresh update through the heartbeat pool.
+	logger.Debugf("Heartbeat updating local raft nodes to %+v", raftNodes)
+	err = g.db.Transaction(func(tx *db.NodeTx) error {
+		return tx.RaftNodesReplace(raftNodes)
+	})
+	if err != nil {
+		logger.Warnf("Failed to replace local raft nodes: %v", err)
+		return
+	}
 
-			return nil
-		})
+	var allNodes []db.NodeInfo
+	var localAddress string // Address of this node
+	var offlineThreshold time.Duration
+	err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
+		var err error
+		allNodes, err = tx.Nodes()
 		if err != nil {
-			logger.Warnf("Failed to get current cluster nodes: %v", err)
-			return
+			return err
 		}
 
-		// Cumulative set of node states (will be written back to database once done).
-		hbState := &APIHeartbeat{}
-
-		// If this leader node hasn't sent a heartbeat recently, then its node state records
-		// are likely out of date, this can happen when a node becomes a leader.
-		// Send stale set to all nodes in database to get a fresh set of active nodes.
-		if lastLeaderHeartbeat.IsZero() || time.Since(*lastLeaderHeartbeat) >= offlineThreshold {
-			logger.Warnf("Leader has not initiated heartbeat since '%v', doing initial heartbeat rounds", *lastLeaderHeartbeat)
-			hbState.Update(false, raftNodes, allNodes, offlineThreshold)
-			hbState.Send(ctx, gateway.cert, localAddress, allNodes, false)
-			logger.Debugf("Completed initial heartbeat round phase 1")
-			// We have the latest set of node states now, lets send that state set to all nodes.
-			hbState.Update(true, raftNodes, allNodes, offlineThreshold)
-			hbState.Send(ctx, gateway.cert, localAddress, allNodes, false)
-			logger.Debugf("Completed initial heartbeat round phase 2")
-		} else {
-			hbState.Update(true, raftNodes, allNodes, offlineThreshold)
-			hbState.Send(ctx, gateway.cert, localAddress, allNodes, true)
+		localAddress, err = tx.NodeAddress()
+		if err != nil {
+			return err
 		}
 
-		// Look for any new node which appeared since sending last heartbeat.
-		var currentNodes []db.NodeInfo
-		err = cluster.Transaction(func(tx *db.ClusterTx) error {
-			var err error
-			currentNodes, err = tx.Nodes()
-			if err != nil {
-				return err
-			}
-
-			return nil
-		})
+		offlineThreshold, err = tx.NodeOfflineThreshold()
 		if err != nil {
-			logger.Warnf("Failed to get current cluster nodes: %v", err)
-			return
+			return err
 		}
 
-		newNodes := []db.NodeInfo{}
-		for _, currentNode := range currentNodes {
-			existing := false
-			for _, node := range allNodes {
-				if node.Address == currentNode.Address && node.ID == currentNode.ID {
-					existing = true
-					break
-				}
-			}
-
-			if !existing {
-				// We found a new node
-				allNodes = append(allNodes, currentNode)
-				newNodes = append(newNodes, currentNode)
-			}
-		}
+		return nil
+	})
+	if err != nil {
+		logger.Warnf("Failed to get current cluster nodes: %v", err)
+		return
+	}
 
-		// If any new nodes found, send heartbeat to just them (with full node state).
-		if len(newNodes) > 0 {
-			hbState.Update(true, raftNodes, allNodes, offlineThreshold)
-			hbState.Send(ctx, gateway.cert, localAddress, newNodes, false)
-		}
+	// Cumulative set of node states (will be written back to database once done).
+	hbState := &APIHeartbeat{}
+
+	// If this leader node hasn't sent a heartbeat recently, then its node state records
+	// are likely out of date, this can happen when a node becomes a leader.
+	// Send stale set to all nodes in database to get a fresh set of active nodes.
+	if initialHeartbeat {
+		hbState.Update(false, raftNodes, allNodes, offlineThreshold)
+		hbState.Send(ctx, g.cert, localAddress, allNodes, false)
+
+		// We have the latest set of node states now, lets send that state set to all nodes.
+		hbState.Update(true, raftNodes, allNodes, offlineThreshold)
+		hbState.Send(ctx, g.cert, localAddress, allNodes, false)
+	} else {
+		hbState.Update(true, raftNodes, allNodes, offlineThreshold)
+		hbState.Send(ctx, g.cert, localAddress, allNodes, true)
+	}
 
-		// If the context has been cancelled, return immediately.
-		if ctx.Err() != nil {
-			logger.Debugf("Aborting heartbeat round")
-			return
+	// Look for any new node which appeared since sending last heartbeat.
+	var currentNodes []db.NodeInfo
+	err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
+		var err error
+		currentNodes, err = tx.Nodes()
+		if err != nil {
+			return err
 		}
 
-		err = cluster.Transaction(func(tx *db.ClusterTx) error {
-			for _, node := range hbState.Members {
-				if !node.updated {
-					continue
-				}
+		return nil
+	})
+	if err != nil {
+		logger.Warnf("Failed to get current cluster nodes: %v", err)
+		return
+	}
 
-				err := tx.NodeHeartbeat(node.Address, time.Now())
-				if err != nil {
-					return err
-				}
+	newNodes := []db.NodeInfo{}
+	for _, currentNode := range currentNodes {
+		existing := false
+		for _, node := range allNodes {
+			if node.Address == currentNode.Address && node.ID == currentNode.ID {
+				existing = true
+				break
 			}
-			return nil
-		})
-		if err != nil {
-			logger.Warnf("Failed to update heartbeat: %v", err)
 		}
-		// If full node state was sent and node refresh task is specified, run it async.
-		if nodeRefreshTask != nil {
-			go nodeRefreshTask(hbState)
+
+		if !existing {
+			// We found a new node
+			allNodes = append(allNodes, currentNode)
+			newNodes = append(newNodes, currentNode)
 		}
+	}
 
-		// Update last leader heartbeat time so next time a full node state list can be sent (if not this time).
-		*lastLeaderHeartbeat = time.Now()
-		logger.Debugf("Completed heartbeat round")
+	// If any new nodes found, send heartbeat to just them (with full node state).
+	if len(newNodes) > 0 {
+		hbState.Update(true, raftNodes, allNodes, offlineThreshold)
+		hbState.Send(ctx, g.cert, localAddress, newNodes, false)
 	}
 
-	// Since the database APIs are blocking we need to wrap the core logic
-	// and run it in a goroutine, so we can abort as soon as the context expires.
-	heartbeatWrapper := func(ctx context.Context) {
-		ch := make(chan struct{})
-		go func() {
-			heartbeat(ctx)
-			ch <- struct{}{}
-		}()
-		select {
-		case <-ch:
-		case <-ctx.Done():
+	// If the context has been cancelled, return immediately.
+	if ctx.Err() != nil {
+		logger.Debugf("Aborting heartbeat round")
+		return
+	}
+
+	err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
+		for _, node := range hbState.Members {
+			if !node.updated {
+				continue
+			}
+
+			err := tx.NodeHeartbeat(node.Address, time.Now())
+			if err != nil {
+				return err
+			}
 		}
+		return nil
+	})
+	if err != nil {
+		logger.Warnf("Failed to update heartbeat: %v", err)
 	}
 
-	schedule := task.Every(time.Duration(heartbeatInterval) * time.Second)
+	// If full node state was sent and node refresh task is specified, run it async.
+	if g.HeartbeatNodeHook != nil {
+		go g.HeartbeatNodeHook(hbState)
+	}
 
-	return heartbeatWrapper, schedule
+	// Update last leader heartbeat time so next time a full node state list can be sent (if not this time).
+	logger.Debugf("Completed heartbeat round")
 }
 
 // heartbeatInterval Number of seconds to wait between to heartbeat rounds.
diff --git a/lxd/cluster/heartbeat_test.go b/lxd/cluster/heartbeat_test.go
index 081b587b1f..79c5d92758 100644
--- a/lxd/cluster/heartbeat_test.go
+++ b/lxd/cluster/heartbeat_test.go
@@ -43,8 +43,8 @@ func TestHeartbeat(t *testing.T) {
 	require.NoError(t, err)
 
 	// Perform the heartbeat requests.
-	timeNow := time.Now()
-	heartbeat, _ := cluster.Heartbeat(leader, leaderState.Cluster, nil, &timeNow)
+	leader.Cluster = leaderState.Cluster
+	heartbeat, _ := cluster.HeartbeatTask(leader)
 	ctx := context.Background()
 	heartbeat(ctx)
 
@@ -93,8 +93,8 @@ func DISABLE_TestHeartbeat_MarkAsDown(t *testing.T) {
 
 	// Shutdown the follower node and perform the heartbeat requests.
 	f.Server(follower).Close()
-	timeNow := time.Now()
-	heartbeat, _ := cluster.Heartbeat(leader, leaderState.Cluster, nil, &timeNow)
+	leader.Cluster = leaderState.Cluster
+	heartbeat, _ := cluster.HeartbeatTask(leader)
 	ctx := context.Background()
 	heartbeat(ctx)
 
diff --git a/lxd/daemon.go b/lxd/daemon.go
index 2c7fe5e450..6f6e1ef9fd 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -76,9 +76,6 @@ type Daemon struct {
 
 	externalAuth *externalAuth
 
-	// Stores the last time this node initiated a leader heartbeat run.
-	lastLeaderHeartbeat time.Time
-
 	// Stores last heartbeat node information to detect node changes.
 	lastNodeList *cluster.APIHeartbeat
 }
@@ -623,6 +620,7 @@ func (d *Daemon) init() error {
 	if err != nil {
 		return err
 	}
+	d.gateway.HeartbeatNodeHook = d.NodeRefreshTask
 
 	/* Setup some mounts (nice to have) */
 	if !d.os.MockMode {
@@ -710,7 +708,7 @@ func (d *Daemon) init() error {
 			// The only thing we want to still do on this node is
 			// to run the heartbeat task, in case we are the raft
 			// leader.
-			stop, _ := task.Start(cluster.Heartbeat(d.gateway, d.cluster, d.NodeRefreshTask, &d.lastLeaderHeartbeat))
+			stop, _ := task.Start(cluster.HeartbeatTask(d.gateway))
 			d.gateway.WaitUpgradeNotification()
 			stop(time.Second)
 
@@ -720,6 +718,7 @@ func (d *Daemon) init() error {
 		}
 		return errors.Wrap(err, "failed to open cluster database")
 	}
+
 	err = cluster.NotifyUpgradeCompleted(d.State(), certInfo)
 	if err != nil {
 		// Ignore the error, since it's not fatal for this particular
@@ -727,6 +726,7 @@ func (d *Daemon) init() error {
 		// offline.
 		logger.Debugf("Could not notify all nodes of database upgrade: %v", err)
 	}
+	d.gateway.Cluster = d.cluster
 
 	/* Migrate the node local data to the cluster database, if needed */
 	if dump != nil {
@@ -915,7 +915,7 @@ func (d *Daemon) init() error {
 
 func (d *Daemon) startClusterTasks() {
 	// Heartbeats
-	d.clusterTasks.Add(cluster.Heartbeat(d.gateway, d.cluster, d.NodeRefreshTask, &d.lastLeaderHeartbeat))
+	d.clusterTasks.Add(cluster.HeartbeatTask(d.gateway))
 
 	// Events
 	d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, eventForward))

From fcc2205a3d883ab4188e80e3b3911c5808deee7d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Thu, 18 Jul 2019 00:06:53 -0400
Subject: [PATCH 4/4] DEBUG
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 test/main.sh              | 180 +++++++++++++++++++-------------------
 test/suites/clustering.sh |   1 +
 2 files changed, 92 insertions(+), 89 deletions(-)

diff --git a/test/main.sh b/test/main.sh
index 1e1e1b0578..82c1951d4e 100755
--- a/test/main.sh
+++ b/test/main.sh
@@ -23,6 +23,8 @@ fi
 if [ -z "${LXD_BACKEND:-}" ]; then
     LXD_BACKEND="dir"
 fi
+LXD_BACKEND="ceph"
+LXD_DEBUG=1
 
 # shellcheck disable=SC2034
 LXD_NETNS=""
@@ -154,96 +156,96 @@ if [ "$#" -gt 0 ]; then
   exit
 fi
 
-run_test test_check_deps "checking dependencies"
-run_test test_static_analysis "static analysis"
-run_test test_database_update "database schema updates"
-run_test test_database_restore "database restore"
-run_test test_sql "lxd sql"
-run_test test_basic_usage "basic usage"
-run_test test_remote_url "remote url handling"
-run_test test_remote_admin "remote administration"
-run_test test_remote_usage "remote usage"
-run_test test_clustering_enable "clustering enable"
-run_test test_clustering_membership "clustering membership"
-run_test test_clustering_containers "clustering containers"
+###run_test test_check_deps "checking dependencies"
+###run_test test_static_analysis "static analysis"
+###run_test test_database_update "database schema updates"
+###run_test test_database_restore "database restore"
+###run_test test_sql "lxd sql"
+###run_test test_basic_usage "basic usage"
+###run_test test_remote_url "remote url handling"
+###run_test test_remote_admin "remote administration"
+###run_test test_remote_usage "remote usage"
+###run_test test_clustering_enable "clustering enable"
+###run_test test_clustering_membership "clustering membership"
+###run_test test_clustering_containers "clustering containers"
 run_test test_clustering_storage "clustering storage"
-run_test test_clustering_network "clustering network"
-run_test test_clustering_publish "clustering publish"
-run_test test_clustering_profiles "clustering profiles"
-run_test test_clustering_join_api "clustering join api"
-run_test test_clustering_shutdown_nodes "clustering shutdown"
-run_test test_clustering_projects "clustering projects"
-run_test test_clustering_address "clustering address"
-run_test test_clustering_image_replication "clustering image replication"
-run_test test_clustering_dns "clustering DNS"
-#run_test test_clustering_upgrade "clustering upgrade"
-run_test test_projects_default "default project"
-run_test test_projects_crud "projects CRUD operations"
-run_test test_projects_containers "containers inside projects"
-run_test test_projects_snapshots "snapshots inside projects"
-run_test test_projects_backups "backups inside projects"
-run_test test_projects_profiles "profiles inside projects"
-run_test test_projects_profiles_default "profiles from the global default project"
-run_test test_projects_images "images inside projects"
-run_test test_projects_images_default "images from the global default project"
-run_test test_projects_storage "projects and storage pools"
-run_test test_projects_network "projects and networks"
-run_test test_container_devices_nic_p2p "container devices - nic - p2p"
-run_test test_container_devices_nic_bridged "container devices - nic - bridged"
-run_test test_container_devices_nic_bridged_filtering "container devices - nic - bridged - filtering"
-run_test test_container_devices_nic_physical "container devices - nic - physical"
-run_test test_container_devices_nic_macvlan "container devices - nic - macvlan"
-run_test test_container_devices_nic_ipvlan "container devices - nic - ipvlan"
-run_test test_container_devices_nic_sriov "container devices - nic - sriov"
-run_test test_security "security features"
-run_test test_security_protection "container protection"
-run_test test_image_expiry "image expiry"
-run_test test_image_list_all_aliases "image list all aliases"
-run_test test_image_auto_update "image auto-update"
-run_test test_image_prefer_cached "image prefer cached"
-run_test test_image_import_dir "import image from directory"
-run_test test_concurrent_exec "concurrent exec"
-run_test test_concurrent "concurrent startup"
-run_test test_snapshots "container snapshots"
-run_test test_snap_restore "snapshot restores"
-run_test test_snap_expiry "snapshot expiry"
-run_test test_config_profiles "profiles and configuration"
-run_test test_config_edit "container configuration edit"
-run_test test_config_edit_container_snapshot_pool_config "container and snapshot volume configuration edit"
-run_test test_container_metadata "manage container metadata and templates"
-run_test test_container_snapshot_config "container snapshot configuration"
-run_test test_server_config "server configuration"
-run_test test_filemanip "file manipulations"
-run_test test_network "network management"
-run_test test_idmap "id mapping"
-run_test test_template "file templating"
-run_test test_pki "PKI mode"
-run_test test_devlxd "/dev/lxd"
-run_test test_fuidshift "fuidshift"
-run_test test_migration "migration"
-run_test test_fdleak "fd leak"
-run_test test_storage "storage"
-run_test test_storage_volume_snapshots "storage volume snapshots"
-run_test test_init_auto "lxd init auto"
-run_test test_init_interactive "lxd init interactive"
-run_test test_init_preseed "lxd init preseed"
-run_test test_storage_profiles "storage profiles"
-run_test test_container_import "container import"
-run_test test_storage_volume_attach "attaching storage volumes"
-run_test test_storage_driver_ceph "ceph storage driver"
-run_test test_storage_driver_cephfs "cephfs storage driver"
-run_test test_resources "resources"
-run_test test_kernel_limits "kernel limits"
-run_test test_macaroon_auth "macaroon authentication"
-run_test test_console "console"
-run_test test_query "query"
-run_test test_proxy_device "proxy device"
-run_test test_storage_local_volume_handling "storage local volume handling"
-run_test test_backup_import "backup import"
-run_test test_backup_export "backup export"
-run_test test_backup_rename "backup rename"
-run_test test_container_local_cross_pool_handling "container local cross pool handling"
-run_test test_incremental_copy "incremental container copy"
+###run_test test_clustering_network "clustering network"
+###run_test test_clustering_publish "clustering publish"
+###run_test test_clustering_profiles "clustering profiles"
+###run_test test_clustering_join_api "clustering join api"
+###run_test test_clustering_shutdown_nodes "clustering shutdown"
+###run_test test_clustering_projects "clustering projects"
+###run_test test_clustering_address "clustering address"
+###run_test test_clustering_image_replication "clustering image replication"
+###run_test test_clustering_dns "clustering DNS"
+####run_test test_clustering_upgrade "clustering upgrade"
+###run_test test_projects_default "default project"
+###run_test test_projects_crud "projects CRUD operations"
+###run_test test_projects_containers "containers inside projects"
+###run_test test_projects_snapshots "snapshots inside projects"
+###run_test test_projects_backups "backups inside projects"
+###run_test test_projects_profiles "profiles inside projects"
+###run_test test_projects_profiles_default "profiles from the global default project"
+###run_test test_projects_images "images inside projects"
+###run_test test_projects_images_default "images from the global default project"
+###run_test test_projects_storage "projects and storage pools"
+###run_test test_projects_network "projects and networks"
+###run_test test_container_devices_nic_p2p "container devices - nic - p2p"
+###run_test test_container_devices_nic_bridged "container devices - nic - bridged"
+###run_test test_container_devices_nic_bridged_filtering "container devices - nic - bridged - filtering"
+###run_test test_container_devices_nic_physical "container devices - nic - physical"
+###run_test test_container_devices_nic_macvlan "container devices - nic - macvlan"
+###run_test test_container_devices_nic_ipvlan "container devices - nic - ipvlan"
+###run_test test_container_devices_nic_sriov "container devices - nic - sriov"
+###run_test test_security "security features"
+###run_test test_security_protection "container protection"
+###run_test test_image_expiry "image expiry"
+###run_test test_image_list_all_aliases "image list all aliases"
+###run_test test_image_auto_update "image auto-update"
+###run_test test_image_prefer_cached "image prefer cached"
+###run_test test_image_import_dir "import image from directory"
+###run_test test_concurrent_exec "concurrent exec"
+###run_test test_concurrent "concurrent startup"
+###run_test test_snapshots "container snapshots"
+###run_test test_snap_restore "snapshot restores"
+###run_test test_snap_expiry "snapshot expiry"
+###run_test test_config_profiles "profiles and configuration"
+###run_test test_config_edit "container configuration edit"
+###run_test test_config_edit_container_snapshot_pool_config "container and snapshot volume configuration edit"
+###run_test test_container_metadata "manage container metadata and templates"
+###run_test test_container_snapshot_config "container snapshot configuration"
+###run_test test_server_config "server configuration"
+###run_test test_filemanip "file manipulations"
+###run_test test_network "network management"
+###run_test test_idmap "id mapping"
+###run_test test_template "file templating"
+###run_test test_pki "PKI mode"
+###run_test test_devlxd "/dev/lxd"
+###run_test test_fuidshift "fuidshift"
+###run_test test_migration "migration"
+###run_test test_fdleak "fd leak"
+###run_test test_storage "storage"
+###run_test test_storage_volume_snapshots "storage volume snapshots"
+###run_test test_init_auto "lxd init auto"
+###run_test test_init_interactive "lxd init interactive"
+###run_test test_init_preseed "lxd init preseed"
+###run_test test_storage_profiles "storage profiles"
+###run_test test_container_import "container import"
+###run_test test_storage_volume_attach "attaching storage volumes"
+###run_test test_storage_driver_ceph "ceph storage driver"
+###run_test test_storage_driver_cephfs "cephfs storage driver"
+###run_test test_resources "resources"
+###run_test test_kernel_limits "kernel limits"
+###run_test test_macaroon_auth "macaroon authentication"
+###run_test test_console "console"
+###run_test test_query "query"
+###run_test test_proxy_device "proxy device"
+###run_test test_storage_local_volume_handling "storage local volume handling"
+###run_test test_backup_import "backup import"
+###run_test test_backup_export "backup export"
+###run_test test_backup_rename "backup rename"
+###run_test test_container_local_cross_pool_handling "container local cross pool handling"
+###run_test test_incremental_copy "incremental container copy"
 
 # shellcheck disable=SC2034
 TEST_RESULT=success
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index 5754e10152..d9770f4f20 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -510,6 +510,7 @@ test_clustering_storage() {
     sleep 30
 
     # Move the container back to node2, even if node3 is offline
+    LXD_DIR="${LXD_ONE_DIR}" lxc cluster list
     LXD_DIR="${LXD_ONE_DIR}" lxc move bar --target node2
     LXD_DIR="${LXD_ONE_DIR}" lxc info bar | grep -q "Location: node2"
     LXD_DIR="${LXD_TWO_DIR}" lxc info bar | grep -q "backup (taken at"


More information about the lxc-devel mailing list