[lxc-devel] [lxd/master] lxd/cluster: Fix race condition during join
stgraber on Github
lxc-bot at linuxcontainers.org
Wed May 8 16:48:12 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 354 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190508/e9eae834/attachment.bin>
-------------- next part --------------
From f2275bad5cc74533a3ae22a0f93b2b0f53304392 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 8 May 2019 12:13:45 -0400
Subject: [PATCH] lxd/cluster: Fix race condition during join
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/cluster/heartbeat.go | 79 +++++++++++++++++++++++++++++----------
lxd/cluster/membership.go | 6 +++
2 files changed, 65 insertions(+), 20 deletions(-)
diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index e842787b64..709e2fa234 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -61,10 +61,12 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
if err != nil {
return err
}
+
nodeAddress, err = tx.NodeAddress()
if err != nil {
return err
}
+
return nil
})
if err != nil {
@@ -72,38 +74,75 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
return
}
- heartbeats := make([]time.Time, len(nodes))
+ heartbeats := map[int64]bool{}
heartbeatsLock := sync.Mutex{}
heartbeatsWg := sync.WaitGroup{}
- for i, node := range nodes {
+ sendHeartbeat := func(id int64, address string, delay bool) {
+ defer heartbeatsWg.Done()
+
+ if delay {
+ // Spread in time by waiting up to 3s less than the interval
+ time.Sleep(time.Duration(rand.Intn((heartbeatInterval*1000)-3000)) * time.Millisecond)
+ }
+ logger.Debugf("Sending heartbeat to %s", address)
+
+ err := heartbeatNode(ctx, address, gateway.cert, raftNodes)
+ if err == nil {
+ heartbeatsLock.Lock()
+ heartbeats[id] = true
+ heartbeatsLock.Unlock()
+ logger.Debugf("Successful heartbeat for %s", address)
+ } else {
+ logger.Debugf("Failed heartbeat for %s: %v", address, err)
+ }
+ }
+
+ for _, node := range nodes {
// Special case the local node
if node.Address == nodeAddress {
heartbeatsLock.Lock()
- heartbeats[i] = time.Now()
+ heartbeats[node.ID] = true
heartbeatsLock.Unlock()
continue
}
// Parallelize the rest
heartbeatsWg.Add(1)
- go func(i int, address string) {
- defer heartbeatsWg.Done()
+ go sendHeartbeat(node.ID, node.Address, true)
+ }
+ heartbeatsWg.Wait()
- // Spread in time by waiting up to 3s less than the interval
- time.Sleep(time.Duration(rand.Intn((heartbeatInterval*1000)-3000)) * time.Millisecond)
- logger.Debugf("Sending heartbeat to %s", address)
-
- err := heartbeatNode(ctx, address, gateway.cert, raftNodes)
- if err == nil {
- heartbeatsLock.Lock()
- heartbeats[i] = time.Now()
- heartbeatsLock.Unlock()
- logger.Debugf("Successful heartbeat for %s", address)
- } else {
- logger.Debugf("Failed heartbeat for %s: %v", address, err)
+ // Look for any new node which appeared since
+ var currentNodes []db.NodeInfo
+ err = cluster.Transaction(func(tx *db.ClusterTx) error {
+ var err error
+ nodes, err = tx.Nodes()
+ if err != nil {
+ return err
+ }
+
+ return nil
+ })
+ if err != nil {
+ logger.Warnf("Failed to get current cluster nodes: %v", err)
+ return
+ }
+
+ for _, currentNode := range currentNodes {
+ found := false
+ for _, node := range nodes {
+ if node.Address == currentNode.Address {
+ found = true
+ break
}
- }(i, node.Address)
+ }
+
+ if !found {
+ // We found a new node
+ heartbeatsWg.Add(1)
+ go sendHeartbeat(currentNode.ID, currentNode.Address, false)
+ }
}
heartbeatsWg.Wait()
@@ -114,8 +153,8 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
}
err = cluster.Transaction(func(tx *db.ClusterTx) error {
- for i, node := range nodes {
- if heartbeats[i].Equal(time.Time{}) {
+ for _, node := range nodes {
+ if !heartbeats[node.ID] {
continue
}
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 2eca5a6385..de49f7c45b 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -1,6 +1,7 @@
package cluster
import (
+ "context"
"fmt"
"os"
"path/filepath"
@@ -412,6 +413,11 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
return errors.Wrapf(err, "failed to unmark the node as pending")
}
+ // Attempt to send a heartbeat to all other nodes
+ for _, node := range nodes {
+ go heartbeatNode(context.Background(), node.Address, cert, nodes)
+ }
+
return nil
})
if err != nil {
More information about the lxc-devel
mailing list