[lxc-devel] [lxd/master] lxd/cluster: Fix race condition during join

stgraber on Github lxc-bot at linuxcontainers.org
Wed May 8 16:48:12 UTC 2019


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 354 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190508/e9eae834/attachment.bin>
-------------- next part --------------
From f2275bad5cc74533a3ae22a0f93b2b0f53304392 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 8 May 2019 12:13:45 -0400
Subject: [PATCH] lxd/cluster: Fix race condition during join
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/cluster/heartbeat.go  | 79 +++++++++++++++++++++++++++++----------
 lxd/cluster/membership.go |  6 +++
 2 files changed, 65 insertions(+), 20 deletions(-)

diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go
index e842787b64..709e2fa234 100644
--- a/lxd/cluster/heartbeat.go
+++ b/lxd/cluster/heartbeat.go
@@ -61,10 +61,12 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
 			if err != nil {
 				return err
 			}
+
 			nodeAddress, err = tx.NodeAddress()
 			if err != nil {
 				return err
 			}
+
 			return nil
 		})
 		if err != nil {
@@ -72,38 +74,75 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
 			return
 		}
 
-		heartbeats := make([]time.Time, len(nodes))
+		heartbeats := map[int64]bool{}
 		heartbeatsLock := sync.Mutex{}
 		heartbeatsWg := sync.WaitGroup{}
 
-		for i, node := range nodes {
+		sendHeartbeat := func(id int64, address string, delay bool) {
+			defer heartbeatsWg.Done()
+
+			if delay {
+				// Spread in time by waiting up to 3s less than the interval
+				time.Sleep(time.Duration(rand.Intn((heartbeatInterval*1000)-3000)) * time.Millisecond)
+			}
+			logger.Debugf("Sending heartbeat to %s", address)
+
+			err := heartbeatNode(ctx, address, gateway.cert, raftNodes)
+			if err == nil {
+				heartbeatsLock.Lock()
+				heartbeats[id] = true
+				heartbeatsLock.Unlock()
+				logger.Debugf("Successful heartbeat for %s", address)
+			} else {
+				logger.Debugf("Failed heartbeat for %s: %v", address, err)
+			}
+		}
+
+		for _, node := range nodes {
 			// Special case the local node
 			if node.Address == nodeAddress {
 				heartbeatsLock.Lock()
-				heartbeats[i] = time.Now()
+				heartbeats[node.ID] = true
 				heartbeatsLock.Unlock()
 				continue
 			}
 
 			// Parallelize the rest
 			heartbeatsWg.Add(1)
-			go func(i int, address string) {
-				defer heartbeatsWg.Done()
+			go sendHeartbeat(node.ID, node.Address, true)
+		}
+		heartbeatsWg.Wait()
 
-				// Spread in time by waiting up to 3s less than the interval
-				time.Sleep(time.Duration(rand.Intn((heartbeatInterval*1000)-3000)) * time.Millisecond)
-				logger.Debugf("Sending heartbeat to %s", address)
-
-				err := heartbeatNode(ctx, address, gateway.cert, raftNodes)
-				if err == nil {
-					heartbeatsLock.Lock()
-					heartbeats[i] = time.Now()
-					heartbeatsLock.Unlock()
-					logger.Debugf("Successful heartbeat for %s", address)
-				} else {
-					logger.Debugf("Failed heartbeat for %s: %v", address, err)
+		// Look for any new node which appeared since
+		var currentNodes []db.NodeInfo
+		err = cluster.Transaction(func(tx *db.ClusterTx) error {
+			var err error
+			nodes, err = tx.Nodes()
+			if err != nil {
+				return err
+			}
+
+			return nil
+		})
+		if err != nil {
+			logger.Warnf("Failed to get current cluster nodes: %v", err)
+			return
+		}
+
+		for _, currentNode := range currentNodes {
+			found := false
+			for _, node := range nodes {
+				if node.Address == currentNode.Address {
+					found = true
+					break
 				}
-			}(i, node.Address)
+			}
+
+			if !found {
+				// We found a new node
+				heartbeatsWg.Add(1)
+				go sendHeartbeat(currentNode.ID, currentNode.Address, false)
+			}
 		}
 		heartbeatsWg.Wait()
 
@@ -114,8 +153,8 @@ func Heartbeat(gateway *Gateway, cluster *db.Cluster) (task.Func, task.Schedule)
 		}
 
 		err = cluster.Transaction(func(tx *db.ClusterTx) error {
-			for i, node := range nodes {
-				if heartbeats[i].Equal(time.Time{}) {
+			for _, node := range nodes {
+				if !heartbeats[node.ID] {
 					continue
 				}
 
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 2eca5a6385..de49f7c45b 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -1,6 +1,7 @@
 package cluster
 
 import (
+	"context"
 	"fmt"
 	"os"
 	"path/filepath"
@@ -412,6 +413,11 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
 			return errors.Wrapf(err, "failed to unmark the node as pending")
 		}
 
+		// Attempt to send a heartbeat to all other nodes
+		for _, node := range nodes {
+			go heartbeatNode(context.Background(), node.Address, cert, nodes)
+		}
+
 		return nil
 	})
 	if err != nil {


More information about the lxc-devel mailing list