[lxc-devel] [lxd/master] DEBUG: clustering_storage ceph failures
stgraber on Github
lxc-bot at linuxcontainers.org
Thu Jul 18 04:08:10 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190717/c370a1ff/attachment-0001.bin>
-------------- next part --------------
From 00daff533635a7dd908977d3c7d1eaffe991cf60 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 17 Jul 2019 18:23:46 -0400
Subject: [PATCH 1/4] lxd/storage/ceph: Handle EBUSY on unmap
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/storage_ceph_utils.go | 16 ++++++++++++++++
1 file changed, 16 insertions(+)
diff --git a/lxd/storage_ceph_utils.go b/lxd/storage_ceph_utils.go
index 2ae569d358..9c458ad929 100644
--- a/lxd/storage_ceph_utils.go
+++ b/lxd/storage_ceph_utils.go
@@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"syscall"
+ "time"
"github.com/pborman/uuid"
"golang.org/x/sys/unix"
@@ -169,6 +170,8 @@ func cephRBDVolumeUnmap(clusterName string, poolName string, volumeName string,
volumeType string, userName string, unmapUntilEINVAL bool) error {
unmapImageName := fmt.Sprintf("%s_%s", volumeType, volumeName)
+ busyCount := 0
+
again:
_, err := shared.RunCommand(
"rbd",
@@ -187,8 +190,21 @@ again:
// EINVAL (already unmapped)
return nil
}
+
+ if waitStatus.ExitStatus() == 16 {
+ // EBUSY (currently in use)
+ busyCount++
+ if busyCount == 10 {
+ return err
+ }
+
+ // Wait a second an try again
+ time.Sleep(time.Second)
+ goto again
+ }
}
}
+
return err
}
From 8e850c97fdd7082ba91dc5d6161c4d08b28b6a7c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 17 Jul 2019 19:18:20 -0400
Subject: [PATCH 2/4] lxd/storage/ceph: Slightly speed up creation
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/storage_ceph.go | 65 ++++++++++++++++-----------------------------
1 file changed, 23 insertions(+), 42 deletions(-)
diff --git a/lxd/storage_ceph.go b/lxd/storage_ceph.go
index 6a1253f102..279ea043a8 100644
--- a/lxd/storage_ceph.go
+++ b/lxd/storage_ceph.go
@@ -136,11 +136,8 @@ func (s *storageCeph) StoragePoolCreate() error {
if !cephOSDPoolExists(s.ClusterName, s.OSDPoolName, s.UserName) {
logger.Debugf(`CEPH OSD storage pool "%s" does not exist`, s.OSDPoolName)
- // create new osd pool
- msg, err := shared.TryRunCommand("ceph", "--name",
- fmt.Sprintf("client.%s", s.UserName), "--cluster",
- s.ClusterName, "osd", "pool", "create", s.OSDPoolName,
- s.PGNum)
+ // Create new osd pool
+ msg, err := shared.TryRunCommand("ceph", "--name", fmt.Sprintf("client.%s", s.UserName), "--cluster", s.ClusterName, "osd", "pool", "create", s.OSDPoolName, s.PGNum)
if err != nil {
logger.Errorf(`Failed to create CEPH osd storage pool "%s" in cluster "%s": %s`, s.OSDPoolName, s.ClusterName, msg)
return err
@@ -152,27 +149,39 @@ func (s *storageCeph) StoragePoolCreate() error {
return
}
- err := cephOSDPoolDestroy(s.ClusterName, s.OSDPoolName,
- s.UserName)
+ err := cephOSDPoolDestroy(s.ClusterName, s.OSDPoolName, s.UserName)
if err != nil {
logger.Warnf(`Failed to delete ceph storage pool "%s" in cluster "%s": %s`, s.OSDPoolName, s.ClusterName, err)
}
}()
+ // Create dummy storage volume. Other LXD instances will use this to detect whether this osd pool is already in use by another LXD instance.
+ err = cephRBDVolumeCreate(s.ClusterName, s.OSDPoolName, s.OSDPoolName, "lxd", "0", s.UserName)
+ if err != nil {
+ logger.Errorf(`Failed to create RBD storage volume "%s" on storage pool "%s": %s`, s.pool.Name, s.pool.Name, err)
+ return err
+ }
+ s.pool.Config["volatile.pool.pristine"] = "true"
+ logger.Debugf(`Created RBD storage volume "%s" on storage pool "%s"`, s.pool.Name, s.pool.Name)
} else {
logger.Debugf(`CEPH OSD storage pool "%s" does exist`, s.OSDPoolName)
- // use existing osd pool
- msg, err := shared.RunCommand("ceph", "--name",
- fmt.Sprintf("client.%s", s.UserName),
- "--cluster", s.ClusterName, "osd", "pool", "get",
- s.OSDPoolName, "pg_num")
+ ok := cephRBDVolumeExists(s.ClusterName, s.OSDPoolName, s.OSDPoolName, "lxd", s.UserName)
+ s.pool.Config["volatile.pool.pristine"] = "false"
+ if ok {
+ if s.pool.Config["ceph.osd.force_reuse"] == "" || !shared.IsTrue(s.pool.Config["ceph.osd.force_reuse"]) {
+ return fmt.Errorf("CEPH OSD storage pool \"%s\" in cluster \"%s\" seems to be in use by another LXD instance. Use \"ceph.osd.force_reuse=true\" to force.", s.pool.Name, s.ClusterName)
+ }
+ }
+
+ // Use existing osd pool
+ msg, err := shared.RunCommand("ceph", "--name", fmt.Sprintf("client.%s", s.UserName), "--cluster", s.ClusterName, "osd", "pool", "get", s.OSDPoolName, "pg_num")
if err != nil {
logger.Errorf(`Failed to retrieve number of placement groups for CEPH osd storage pool "%s" in cluster "%s": %s`, s.OSDPoolName, s.ClusterName, msg)
return err
}
- logger.Debugf(`Retrieved number of placement groups or CEPH osd storage pool "%s" in cluster "%s"`, s.OSDPoolName, s.ClusterName)
+ logger.Debugf(`Retrieved number of placement groups or CEPH osd storage pool "%s" in cluster "%s"`, s.OSDPoolName, s.ClusterName)
idx := strings.Index(msg, "pg_num:")
if idx == -1 {
logger.Errorf(`Failed to parse number of placement groups for CEPH osd storage pool "%s" in cluster "%s": %s`, s.OSDPoolName, s.ClusterName, msg)
@@ -180,6 +189,7 @@ func (s *storageCeph) StoragePoolCreate() error {
msg = msg[(idx + len("pg_num:")):]
msg = strings.TrimSpace(msg)
+
// It is ok to update the pool configuration since storage pool
// creation via API is implemented such that the storage pool is
// checked for a changed config after this function returns and
@@ -225,35 +235,6 @@ func (s *storageCeph) StoragePoolCreate() error {
}
}()
- ok := cephRBDVolumeExists(s.ClusterName, s.OSDPoolName, s.OSDPoolName,
- "lxd", s.UserName)
- s.pool.Config["volatile.pool.pristine"] = "false"
- if !ok {
- s.pool.Config["volatile.pool.pristine"] = "true"
- // Create dummy storage volume. Other LXD instances will use
- // this to detect whether this osd pool is already in use by
- // another LXD instance.
- err = cephRBDVolumeCreate(s.ClusterName, s.OSDPoolName,
- s.OSDPoolName, "lxd", "0", s.UserName)
- if err != nil {
- logger.Errorf(`Failed to create RBD storage volume "%s" on storage pool "%s": %s`, s.pool.Name, s.pool.Name, err)
- return err
- }
- logger.Debugf(`Created RBD storage volume "%s" on storage pool "%s"`, s.pool.Name, s.pool.Name)
- } else {
- msg := fmt.Sprintf(`CEPH OSD storage pool "%s" in cluster `+
- `"%s" seems to be in use by another LXD instace`,
- s.pool.Name, s.ClusterName)
- if s.pool.Config["ceph.osd.force_reuse"] == "" ||
- !shared.IsTrue(s.pool.Config["ceph.osd.force_reuse"]) {
- msg += `. Set "ceph.osd.force_reuse=true" to force ` +
- `LXD to reuse the pool`
- logger.Errorf(msg)
- return fmt.Errorf(msg)
- }
- logger.Warnf(msg)
- }
-
logger.Infof(`Created CEPH OSD storage pool "%s" in cluster "%s"`,
s.pool.Name, s.ClusterName)
From 1747a6ef7710e7fbc3a4026fb1873e3aae776187 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 17 Jul 2019 17:11:57 -0400
Subject: [PATCH 3/4] lxd/cluster: Use hook for initial heartbeat
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/cluster/gateway.go | 15 ++
lxd/cluster/heartbeat.go | 272 +++++++++++++++++-----------------
lxd/cluster/heartbeat_test.go | 8 +-
lxd/daemon.go | 10 +-
4 files changed, 159 insertions(+), 146 deletions(-)
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 5051dc8174..78a9264a46 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -21,6 +21,7 @@ import (
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/eagain"
+ log "github.com/lxc/lxd/shared/log15"
"github.com/lxc/lxd/shared/logger"
"github.com/pkg/errors"
)
@@ -102,6 +103,10 @@ type Gateway struct {
// detected a peer with an higher version.
upgradeTriggered bool
+ // Used for the heartbeat handler
+ Cluster *db.Cluster
+ HeartbeatNodeHook func(*APIHeartbeat)
+
// ServerStore wrapper.
store *dqliteServerStore
@@ -541,6 +546,7 @@ func (g *Gateway) init() error {
}
options := []dqlite.ServerOption{
dqlite.WithServerLogFunc(DqliteLog),
+ dqlite.WithServerWatchFunc(g.watchFunc),
}
if raft.info.Address == "1" {
@@ -804,6 +810,15 @@ func dqliteNetworkDial(ctx context.Context, addr string, g *Gateway, checkLeader
return cUnix, nil
}
+func (g *Gateway) watchFunc(oldState int, newState int) {
+ if newState == dqlite.Leader && g.raft != nil {
+ logger.Info("Node was elected as dqlite leader", log.Ctx{"id": g.raft.info.ID, "address": g.raft.info.Address})
+
+ // Trigger an immediate full hearbeat run
+ go g.heartbeat(g.ctx, true)
+ }
+}
+
// Create a dial function that connects to the given listener.
func dqliteMemoryDial(listener net.Listener) dqlite.DialFunc {
return func(ctx context.Context, address string) (net.Conn, error) {
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index aa1dcaee97..45bca1d908 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -108,7 +108,7 @@ func (hbState *APIHeartbeat) Update(fullStateList bool, raftNodes []db.RaftNode,
return
}
-// sends heartbeat requests to the nodes supplied and updates heartbeat state.
+// Send sends heartbeat requests to the nodes supplied and updates heartbeat state.
func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, localAddress string, nodes []db.NodeInfo, delay bool) {
heartbeatsWg := sync.WaitGroup{}
sendHeartbeat := func(nodeID int64, address string, delay bool, heartbeatData *APIHeartbeat) {
@@ -161,177 +161,175 @@ func (hbState *APIHeartbeat) Send(ctx context.Context, cert *shared.CertInfo, lo
heartbeatsWg.Wait()
}
-// Heartbeat returns a task function that performs leader-initiated heartbeat
+// HeartbeatTask returns a task function that performs leader-initiated heartbeat
// checks against all LXD nodes in the cluster.
//
// It will update the heartbeat timestamp column of the nodes table
// accordingly, and also notify them of the current list of database nodes.
-func Heartbeat(gateway *Gateway, cluster *db.Cluster, nodeRefreshTask func(*APIHeartbeat), lastLeaderHeartbeat *time.Time) (task.Func, task.Schedule) {
- heartbeat := func(ctx context.Context) {
- if gateway.server == nil || gateway.memoryDial != nil {
- // We're not a raft node or we're not clustered
- return
+func HeartbeatTask(gateway *Gateway) (task.Func, task.Schedule) {
+ // Since the database APIs are blocking we need to wrap the core logic
+ // and run it in a goroutine, so we can abort as soon as the context expires.
+ heartbeatWrapper := func(ctx context.Context) {
+ ch := make(chan struct{})
+ go func() {
+ gateway.heartbeat(ctx, false)
+ ch <- struct{}{}
+ }()
+ select {
+ case <-ch:
+ case <-ctx.Done():
}
+ }
- raftNodes, err := gateway.currentRaftNodes()
- if err == errNotLeader {
- return
- }
+ schedule := task.Every(time.Duration(heartbeatInterval) * time.Second)
- logger.Debugf("Starting heartbeat round")
- if err != nil {
- logger.Warnf("Failed to get current raft nodes: %v", err)
- return
- }
+ return heartbeatWrapper, schedule
+}
- // Replace the local raft_nodes table immediately because it
- // might miss a row containing ourselves, since we might have
- // been elected leader before the former leader had chance to
- // send us a fresh update through the heartbeat pool.
- logger.Debugf("Heartbeat updating local raft nodes to %+v", raftNodes)
- err = gateway.db.Transaction(func(tx *db.NodeTx) error {
- return tx.RaftNodesReplace(raftNodes)
- })
- if err != nil {
- logger.Warnf("Failed to replace local raft nodes: %v", err)
- return
- }
+func (g *Gateway) heartbeat(ctx context.Context, initialHeartbeat bool) {
+ if g.Cluster == nil || g.server == nil || g.memoryDial != nil {
+ // We're not a raft node or we're not clustered
+ return
+ }
- var allNodes []db.NodeInfo
- var localAddress string // Address of this node
- var offlineThreshold time.Duration
- err = cluster.Transaction(func(tx *db.ClusterTx) error {
- var err error
- allNodes, err = tx.Nodes()
- if err != nil {
- return err
- }
+ raftNodes, err := g.currentRaftNodes()
+ if err == errNotLeader {
+ return
+ }
- localAddress, err = tx.NodeAddress()
- if err != nil {
- return err
- }
+ logger.Debugf("Starting heartbeat round")
+ if err != nil {
+ logger.Warnf("Failed to get current raft nodes: %v", err)
+ return
+ }
- offlineThreshold, err = tx.NodeOfflineThreshold()
- if err != nil {
- return err
- }
+ // Replace the local raft_nodes table immediately because it
+ // might miss a row containing ourselves, since we might have
+ // been elected leader before the former leader had chance to
+ // send us a fresh update through the heartbeat pool.
+ logger.Debugf("Heartbeat updating local raft nodes to %+v", raftNodes)
+ err = g.db.Transaction(func(tx *db.NodeTx) error {
+ return tx.RaftNodesReplace(raftNodes)
+ })
+ if err != nil {
+ logger.Warnf("Failed to replace local raft nodes: %v", err)
+ return
+ }
- return nil
- })
+ var allNodes []db.NodeInfo
+ var localAddress string // Address of this node
+ var offlineThreshold time.Duration
+ err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
+ var err error
+ allNodes, err = tx.Nodes()
if err != nil {
- logger.Warnf("Failed to get current cluster nodes: %v", err)
- return
+ return err
}
- // Cumulative set of node states (will be written back to database once done).
- hbState := &APIHeartbeat{}
-
- // If this leader node hasn't sent a heartbeat recently, then its node state records
- // are likely out of date, this can happen when a node becomes a leader.
- // Send stale set to all nodes in database to get a fresh set of active nodes.
- if lastLeaderHeartbeat.IsZero() || time.Since(*lastLeaderHeartbeat) >= offlineThreshold {
- logger.Warnf("Leader has not initiated heartbeat since '%v', doing initial heartbeat rounds", *lastLeaderHeartbeat)
- hbState.Update(false, raftNodes, allNodes, offlineThreshold)
- hbState.Send(ctx, gateway.cert, localAddress, allNodes, false)
- logger.Debugf("Completed initial heartbeat round phase 1")
- // We have the latest set of node states now, lets send that state set to all nodes.
- hbState.Update(true, raftNodes, allNodes, offlineThreshold)
- hbState.Send(ctx, gateway.cert, localAddress, allNodes, false)
- logger.Debugf("Completed initial heartbeat round phase 2")
- } else {
- hbState.Update(true, raftNodes, allNodes, offlineThreshold)
- hbState.Send(ctx, gateway.cert, localAddress, allNodes, true)
+ localAddress, err = tx.NodeAddress()
+ if err != nil {
+ return err
}
- // Look for any new node which appeared since sending last heartbeat.
- var currentNodes []db.NodeInfo
- err = cluster.Transaction(func(tx *db.ClusterTx) error {
- var err error
- currentNodes, err = tx.Nodes()
- if err != nil {
- return err
- }
-
- return nil
- })
+ offlineThreshold, err = tx.NodeOfflineThreshold()
if err != nil {
- logger.Warnf("Failed to get current cluster nodes: %v", err)
- return
+ return err
}
- newNodes := []db.NodeInfo{}
- for _, currentNode := range currentNodes {
- existing := false
- for _, node := range allNodes {
- if node.Address == currentNode.Address && node.ID == currentNode.ID {
- existing = true
- break
- }
- }
-
- if !existing {
- // We found a new node
- allNodes = append(allNodes, currentNode)
- newNodes = append(newNodes, currentNode)
- }
- }
+ return nil
+ })
+ if err != nil {
+ logger.Warnf("Failed to get current cluster nodes: %v", err)
+ return
+ }
- // If any new nodes found, send heartbeat to just them (with full node state).
- if len(newNodes) > 0 {
- hbState.Update(true, raftNodes, allNodes, offlineThreshold)
- hbState.Send(ctx, gateway.cert, localAddress, newNodes, false)
- }
+ // Cumulative set of node states (will be written back to database once done).
+ hbState := &APIHeartbeat{}
+
+ // If this leader node hasn't sent a heartbeat recently, then its node state records
+ // are likely out of date, this can happen when a node becomes a leader.
+ // Send stale set to all nodes in database to get a fresh set of active nodes.
+ if initialHeartbeat {
+ hbState.Update(false, raftNodes, allNodes, offlineThreshold)
+ hbState.Send(ctx, g.cert, localAddress, allNodes, false)
+
+ // We have the latest set of node states now, lets send that state set to all nodes.
+ hbState.Update(true, raftNodes, allNodes, offlineThreshold)
+ hbState.Send(ctx, g.cert, localAddress, allNodes, false)
+ } else {
+ hbState.Update(true, raftNodes, allNodes, offlineThreshold)
+ hbState.Send(ctx, g.cert, localAddress, allNodes, true)
+ }
- // If the context has been cancelled, return immediately.
- if ctx.Err() != nil {
- logger.Debugf("Aborting heartbeat round")
- return
+ // Look for any new node which appeared since sending last heartbeat.
+ var currentNodes []db.NodeInfo
+ err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
+ var err error
+ currentNodes, err = tx.Nodes()
+ if err != nil {
+ return err
}
- err = cluster.Transaction(func(tx *db.ClusterTx) error {
- for _, node := range hbState.Members {
- if !node.updated {
- continue
- }
+ return nil
+ })
+ if err != nil {
+ logger.Warnf("Failed to get current cluster nodes: %v", err)
+ return
+ }
- err := tx.NodeHeartbeat(node.Address, time.Now())
- if err != nil {
- return err
- }
+ newNodes := []db.NodeInfo{}
+ for _, currentNode := range currentNodes {
+ existing := false
+ for _, node := range allNodes {
+ if node.Address == currentNode.Address && node.ID == currentNode.ID {
+ existing = true
+ break
}
- return nil
- })
- if err != nil {
- logger.Warnf("Failed to update heartbeat: %v", err)
}
- // If full node state was sent and node refresh task is specified, run it async.
- if nodeRefreshTask != nil {
- go nodeRefreshTask(hbState)
+
+ if !existing {
+ // We found a new node
+ allNodes = append(allNodes, currentNode)
+ newNodes = append(newNodes, currentNode)
}
+ }
- // Update last leader heartbeat time so next time a full node state list can be sent (if not this time).
- *lastLeaderHeartbeat = time.Now()
- logger.Debugf("Completed heartbeat round")
+ // If any new nodes found, send heartbeat to just them (with full node state).
+ if len(newNodes) > 0 {
+ hbState.Update(true, raftNodes, allNodes, offlineThreshold)
+ hbState.Send(ctx, g.cert, localAddress, newNodes, false)
}
- // Since the database APIs are blocking we need to wrap the core logic
- // and run it in a goroutine, so we can abort as soon as the context expires.
- heartbeatWrapper := func(ctx context.Context) {
- ch := make(chan struct{})
- go func() {
- heartbeat(ctx)
- ch <- struct{}{}
- }()
- select {
- case <-ch:
- case <-ctx.Done():
+ // If the context has been cancelled, return immediately.
+ if ctx.Err() != nil {
+ logger.Debugf("Aborting heartbeat round")
+ return
+ }
+
+ err = g.Cluster.Transaction(func(tx *db.ClusterTx) error {
+ for _, node := range hbState.Members {
+ if !node.updated {
+ continue
+ }
+
+ err := tx.NodeHeartbeat(node.Address, time.Now())
+ if err != nil {
+ return err
+ }
}
+ return nil
+ })
+ if err != nil {
+ logger.Warnf("Failed to update heartbeat: %v", err)
}
- schedule := task.Every(time.Duration(heartbeatInterval) * time.Second)
+ // If full node state was sent and node refresh task is specified, run it async.
+ if g.HeartbeatNodeHook != nil {
+ go g.HeartbeatNodeHook(hbState)
+ }
- return heartbeatWrapper, schedule
+ // Update last leader heartbeat time so next time a full node state list can be sent (if not this time).
+ logger.Debugf("Completed heartbeat round")
}
// heartbeatInterval Number of seconds to wait between to heartbeat rounds.
diff --git a/lxd/cluster/heartbeat_test.go b/lxd/cluster/heartbeat_test.go
index 081b587b1f..79c5d92758 100644
--- a/lxd/cluster/heartbeat_test.go
+++ b/lxd/cluster/heartbeat_test.go
@@ -43,8 +43,8 @@ func TestHeartbeat(t *testing.T) {
require.NoError(t, err)
// Perform the heartbeat requests.
- timeNow := time.Now()
- heartbeat, _ := cluster.Heartbeat(leader, leaderState.Cluster, nil, &timeNow)
+ leader.Cluster = leaderState.Cluster
+ heartbeat, _ := cluster.HeartbeatTask(leader)
ctx := context.Background()
heartbeat(ctx)
@@ -93,8 +93,8 @@ func DISABLE_TestHeartbeat_MarkAsDown(t *testing.T) {
// Shutdown the follower node and perform the heartbeat requests.
f.Server(follower).Close()
- timeNow := time.Now()
- heartbeat, _ := cluster.Heartbeat(leader, leaderState.Cluster, nil, &timeNow)
+ leader.Cluster = leaderState.Cluster
+ heartbeat, _ := cluster.HeartbeatTask(leader)
ctx := context.Background()
heartbeat(ctx)
diff --git a/lxd/daemon.go b/lxd/daemon.go
index 2c7fe5e450..6f6e1ef9fd 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -76,9 +76,6 @@ type Daemon struct {
externalAuth *externalAuth
- // Stores the last time this node initiated a leader heartbeat run.
- lastLeaderHeartbeat time.Time
-
// Stores last heartbeat node information to detect node changes.
lastNodeList *cluster.APIHeartbeat
}
@@ -623,6 +620,7 @@ func (d *Daemon) init() error {
if err != nil {
return err
}
+ d.gateway.HeartbeatNodeHook = d.NodeRefreshTask
/* Setup some mounts (nice to have) */
if !d.os.MockMode {
@@ -710,7 +708,7 @@ func (d *Daemon) init() error {
// The only thing we want to still do on this node is
// to run the heartbeat task, in case we are the raft
// leader.
- stop, _ := task.Start(cluster.Heartbeat(d.gateway, d.cluster, d.NodeRefreshTask, &d.lastLeaderHeartbeat))
+ stop, _ := task.Start(cluster.HeartbeatTask(d.gateway))
d.gateway.WaitUpgradeNotification()
stop(time.Second)
@@ -720,6 +718,7 @@ func (d *Daemon) init() error {
}
return errors.Wrap(err, "failed to open cluster database")
}
+
err = cluster.NotifyUpgradeCompleted(d.State(), certInfo)
if err != nil {
// Ignore the error, since it's not fatal for this particular
@@ -727,6 +726,7 @@ func (d *Daemon) init() error {
// offline.
logger.Debugf("Could not notify all nodes of database upgrade: %v", err)
}
+ d.gateway.Cluster = d.cluster
/* Migrate the node local data to the cluster database, if needed */
if dump != nil {
@@ -915,7 +915,7 @@ func (d *Daemon) init() error {
func (d *Daemon) startClusterTasks() {
// Heartbeats
- d.clusterTasks.Add(cluster.Heartbeat(d.gateway, d.cluster, d.NodeRefreshTask, &d.lastLeaderHeartbeat))
+ d.clusterTasks.Add(cluster.HeartbeatTask(d.gateway))
// Events
d.clusterTasks.Add(cluster.Events(d.endpoints, d.cluster, eventForward))
From fcc2205a3d883ab4188e80e3b3911c5808deee7d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Thu, 18 Jul 2019 00:06:53 -0400
Subject: [PATCH 4/4] DEBUG
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
test/main.sh | 180 +++++++++++++++++++-------------------
test/suites/clustering.sh | 1 +
2 files changed, 92 insertions(+), 89 deletions(-)
diff --git a/test/main.sh b/test/main.sh
index 1e1e1b0578..82c1951d4e 100755
--- a/test/main.sh
+++ b/test/main.sh
@@ -23,6 +23,8 @@ fi
if [ -z "${LXD_BACKEND:-}" ]; then
LXD_BACKEND="dir"
fi
+LXD_BACKEND="ceph"
+LXD_DEBUG=1
# shellcheck disable=SC2034
LXD_NETNS=""
@@ -154,96 +156,96 @@ if [ "$#" -gt 0 ]; then
exit
fi
-run_test test_check_deps "checking dependencies"
-run_test test_static_analysis "static analysis"
-run_test test_database_update "database schema updates"
-run_test test_database_restore "database restore"
-run_test test_sql "lxd sql"
-run_test test_basic_usage "basic usage"
-run_test test_remote_url "remote url handling"
-run_test test_remote_admin "remote administration"
-run_test test_remote_usage "remote usage"
-run_test test_clustering_enable "clustering enable"
-run_test test_clustering_membership "clustering membership"
-run_test test_clustering_containers "clustering containers"
+###run_test test_check_deps "checking dependencies"
+###run_test test_static_analysis "static analysis"
+###run_test test_database_update "database schema updates"
+###run_test test_database_restore "database restore"
+###run_test test_sql "lxd sql"
+###run_test test_basic_usage "basic usage"
+###run_test test_remote_url "remote url handling"
+###run_test test_remote_admin "remote administration"
+###run_test test_remote_usage "remote usage"
+###run_test test_clustering_enable "clustering enable"
+###run_test test_clustering_membership "clustering membership"
+###run_test test_clustering_containers "clustering containers"
run_test test_clustering_storage "clustering storage"
-run_test test_clustering_network "clustering network"
-run_test test_clustering_publish "clustering publish"
-run_test test_clustering_profiles "clustering profiles"
-run_test test_clustering_join_api "clustering join api"
-run_test test_clustering_shutdown_nodes "clustering shutdown"
-run_test test_clustering_projects "clustering projects"
-run_test test_clustering_address "clustering address"
-run_test test_clustering_image_replication "clustering image replication"
-run_test test_clustering_dns "clustering DNS"
-#run_test test_clustering_upgrade "clustering upgrade"
-run_test test_projects_default "default project"
-run_test test_projects_crud "projects CRUD operations"
-run_test test_projects_containers "containers inside projects"
-run_test test_projects_snapshots "snapshots inside projects"
-run_test test_projects_backups "backups inside projects"
-run_test test_projects_profiles "profiles inside projects"
-run_test test_projects_profiles_default "profiles from the global default project"
-run_test test_projects_images "images inside projects"
-run_test test_projects_images_default "images from the global default project"
-run_test test_projects_storage "projects and storage pools"
-run_test test_projects_network "projects and networks"
-run_test test_container_devices_nic_p2p "container devices - nic - p2p"
-run_test test_container_devices_nic_bridged "container devices - nic - bridged"
-run_test test_container_devices_nic_bridged_filtering "container devices - nic - bridged - filtering"
-run_test test_container_devices_nic_physical "container devices - nic - physical"
-run_test test_container_devices_nic_macvlan "container devices - nic - macvlan"
-run_test test_container_devices_nic_ipvlan "container devices - nic - ipvlan"
-run_test test_container_devices_nic_sriov "container devices - nic - sriov"
-run_test test_security "security features"
-run_test test_security_protection "container protection"
-run_test test_image_expiry "image expiry"
-run_test test_image_list_all_aliases "image list all aliases"
-run_test test_image_auto_update "image auto-update"
-run_test test_image_prefer_cached "image prefer cached"
-run_test test_image_import_dir "import image from directory"
-run_test test_concurrent_exec "concurrent exec"
-run_test test_concurrent "concurrent startup"
-run_test test_snapshots "container snapshots"
-run_test test_snap_restore "snapshot restores"
-run_test test_snap_expiry "snapshot expiry"
-run_test test_config_profiles "profiles and configuration"
-run_test test_config_edit "container configuration edit"
-run_test test_config_edit_container_snapshot_pool_config "container and snapshot volume configuration edit"
-run_test test_container_metadata "manage container metadata and templates"
-run_test test_container_snapshot_config "container snapshot configuration"
-run_test test_server_config "server configuration"
-run_test test_filemanip "file manipulations"
-run_test test_network "network management"
-run_test test_idmap "id mapping"
-run_test test_template "file templating"
-run_test test_pki "PKI mode"
-run_test test_devlxd "/dev/lxd"
-run_test test_fuidshift "fuidshift"
-run_test test_migration "migration"
-run_test test_fdleak "fd leak"
-run_test test_storage "storage"
-run_test test_storage_volume_snapshots "storage volume snapshots"
-run_test test_init_auto "lxd init auto"
-run_test test_init_interactive "lxd init interactive"
-run_test test_init_preseed "lxd init preseed"
-run_test test_storage_profiles "storage profiles"
-run_test test_container_import "container import"
-run_test test_storage_volume_attach "attaching storage volumes"
-run_test test_storage_driver_ceph "ceph storage driver"
-run_test test_storage_driver_cephfs "cephfs storage driver"
-run_test test_resources "resources"
-run_test test_kernel_limits "kernel limits"
-run_test test_macaroon_auth "macaroon authentication"
-run_test test_console "console"
-run_test test_query "query"
-run_test test_proxy_device "proxy device"
-run_test test_storage_local_volume_handling "storage local volume handling"
-run_test test_backup_import "backup import"
-run_test test_backup_export "backup export"
-run_test test_backup_rename "backup rename"
-run_test test_container_local_cross_pool_handling "container local cross pool handling"
-run_test test_incremental_copy "incremental container copy"
+###run_test test_clustering_network "clustering network"
+###run_test test_clustering_publish "clustering publish"
+###run_test test_clustering_profiles "clustering profiles"
+###run_test test_clustering_join_api "clustering join api"
+###run_test test_clustering_shutdown_nodes "clustering shutdown"
+###run_test test_clustering_projects "clustering projects"
+###run_test test_clustering_address "clustering address"
+###run_test test_clustering_image_replication "clustering image replication"
+###run_test test_clustering_dns "clustering DNS"
+####run_test test_clustering_upgrade "clustering upgrade"
+###run_test test_projects_default "default project"
+###run_test test_projects_crud "projects CRUD operations"
+###run_test test_projects_containers "containers inside projects"
+###run_test test_projects_snapshots "snapshots inside projects"
+###run_test test_projects_backups "backups inside projects"
+###run_test test_projects_profiles "profiles inside projects"
+###run_test test_projects_profiles_default "profiles from the global default project"
+###run_test test_projects_images "images inside projects"
+###run_test test_projects_images_default "images from the global default project"
+###run_test test_projects_storage "projects and storage pools"
+###run_test test_projects_network "projects and networks"
+###run_test test_container_devices_nic_p2p "container devices - nic - p2p"
+###run_test test_container_devices_nic_bridged "container devices - nic - bridged"
+###run_test test_container_devices_nic_bridged_filtering "container devices - nic - bridged - filtering"
+###run_test test_container_devices_nic_physical "container devices - nic - physical"
+###run_test test_container_devices_nic_macvlan "container devices - nic - macvlan"
+###run_test test_container_devices_nic_ipvlan "container devices - nic - ipvlan"
+###run_test test_container_devices_nic_sriov "container devices - nic - sriov"
+###run_test test_security "security features"
+###run_test test_security_protection "container protection"
+###run_test test_image_expiry "image expiry"
+###run_test test_image_list_all_aliases "image list all aliases"
+###run_test test_image_auto_update "image auto-update"
+###run_test test_image_prefer_cached "image prefer cached"
+###run_test test_image_import_dir "import image from directory"
+###run_test test_concurrent_exec "concurrent exec"
+###run_test test_concurrent "concurrent startup"
+###run_test test_snapshots "container snapshots"
+###run_test test_snap_restore "snapshot restores"
+###run_test test_snap_expiry "snapshot expiry"
+###run_test test_config_profiles "profiles and configuration"
+###run_test test_config_edit "container configuration edit"
+###run_test test_config_edit_container_snapshot_pool_config "container and snapshot volume configuration edit"
+###run_test test_container_metadata "manage container metadata and templates"
+###run_test test_container_snapshot_config "container snapshot configuration"
+###run_test test_server_config "server configuration"
+###run_test test_filemanip "file manipulations"
+###run_test test_network "network management"
+###run_test test_idmap "id mapping"
+###run_test test_template "file templating"
+###run_test test_pki "PKI mode"
+###run_test test_devlxd "/dev/lxd"
+###run_test test_fuidshift "fuidshift"
+###run_test test_migration "migration"
+###run_test test_fdleak "fd leak"
+###run_test test_storage "storage"
+###run_test test_storage_volume_snapshots "storage volume snapshots"
+###run_test test_init_auto "lxd init auto"
+###run_test test_init_interactive "lxd init interactive"
+###run_test test_init_preseed "lxd init preseed"
+###run_test test_storage_profiles "storage profiles"
+###run_test test_container_import "container import"
+###run_test test_storage_volume_attach "attaching storage volumes"
+###run_test test_storage_driver_ceph "ceph storage driver"
+###run_test test_storage_driver_cephfs "cephfs storage driver"
+###run_test test_resources "resources"
+###run_test test_kernel_limits "kernel limits"
+###run_test test_macaroon_auth "macaroon authentication"
+###run_test test_console "console"
+###run_test test_query "query"
+###run_test test_proxy_device "proxy device"
+###run_test test_storage_local_volume_handling "storage local volume handling"
+###run_test test_backup_import "backup import"
+###run_test test_backup_export "backup export"
+###run_test test_backup_rename "backup rename"
+###run_test test_container_local_cross_pool_handling "container local cross pool handling"
+###run_test test_incremental_copy "incremental container copy"
# shellcheck disable=SC2034
TEST_RESULT=success
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index 5754e10152..d9770f4f20 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -510,6 +510,7 @@ test_clustering_storage() {
sleep 30
# Move the container back to node2, even if node3 is offline
+ LXD_DIR="${LXD_ONE_DIR}" lxc cluster list
LXD_DIR="${LXD_ONE_DIR}" lxc move bar --target node2
LXD_DIR="${LXD_ONE_DIR}" lxc info bar | grep -q "Location: node2"
LXD_DIR="${LXD_TWO_DIR}" lxc info bar | grep -q "backup (taken at"
More information about the lxc-devel
mailing list