[lxc-devel] [lxd/master] Block cluster connections on working event forwarding

stgraber on Github lxc-bot at linuxcontainers.org
Tue Dec 3 04:59:11 UTC 2019


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 645 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20191202/e35c86ee/attachment.bin>
-------------- next part --------------
From 769f4fdfcc484b73b39343505bb030101e7c63a4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Sun, 1 Dec 2019 23:27:16 -0500
Subject: [PATCH 1/6] lxd/cluster: More reliable event delivery
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/cluster/connect.go | 22 ++++++++++++++++++++++
 lxd/cluster/events.go  | 39 +++++++++++++++++++++++++--------------
 2 files changed, 47 insertions(+), 14 deletions(-)

diff --git a/lxd/cluster/connect.go b/lxd/cluster/connect.go
index 43ec165eb6..bd9c3881cb 100644
--- a/lxd/cluster/connect.go
+++ b/lxd/cluster/connect.go
@@ -4,6 +4,7 @@ import (
 	"encoding/base64"
 	"encoding/pem"
 	"fmt"
+	"time"
 
 	lxd "github.com/lxc/lxd/client"
 	"github.com/lxc/lxd/lxd/db"
@@ -20,6 +21,27 @@ import (
 // value 'lxd-cluster-notifier', which can be used in some cases to distinguish
 // between a regular client request and an internal cluster request.
 func Connect(address string, cert *shared.CertInfo, notify bool) (lxd.InstanceServer, error) {
+	// Wait for a connection to the events API first for non-notify connections.
+	if !notify {
+		connected := false
+		for i := 0; i < 20; i++ {
+			listenersLock.Lock()
+			_, ok := listeners[address]
+			listenersLock.Unlock()
+
+			if ok {
+				connected = true
+				break
+			}
+
+			time.Sleep(500 * time.Millisecond)
+		}
+
+		if !connected {
+			return nil, fmt.Errorf("Missing event connection with target cluster member")
+		}
+	}
+
 	args := &lxd.ConnectionArgs{
 		TLSServerCert: string(cert.PublicKey()),
 		TLSClientCert: string(cert.PublicKey()),
diff --git a/lxd/cluster/events.go b/lxd/cluster/events.go
index bbcef4c0a7..8e77a0d46a 100644
--- a/lxd/cluster/events.go
+++ b/lxd/cluster/events.go
@@ -2,6 +2,7 @@ package cluster
 
 import (
 	"context"
+	"sync"
 	"time"
 
 	lxd "github.com/lxc/lxd/client"
@@ -13,28 +14,28 @@ import (
 	"github.com/lxc/lxd/shared/logger"
 )
 
+var listeners = map[string]*lxd.EventListener{}
+var listenersLock sync.Mutex
+
 // Events starts a task that continuously monitors the list of cluster nodes and
 // maintains a pool of websocket connections against all of them, in order to
 // get notified about events.
 //
 // Whenever an event is received the given callback is invoked.
 func Events(endpoints *endpoints.Endpoints, cluster *db.Cluster, f func(int64, api.Event)) (task.Func, task.Schedule) {
-	listeners := map[int64]*lxd.EventListener{}
-
 	// Update our pool of event listeners. Since database queries are
 	// blocking, we spawn the actual logic in a goroutine, to abort
 	// immediately when we receive the stop signal.
 	update := func(ctx context.Context) {
 		ch := make(chan struct{})
 		go func() {
-			eventsUpdateListeners(endpoints, cluster, listeners, f)
+			eventsUpdateListeners(endpoints, cluster, f)
 			ch <- struct{}{}
 		}()
 		select {
 		case <-ch:
 		case <-ctx.Done():
 		}
-
 	}
 
 	schedule := task.Every(time.Second)
@@ -42,7 +43,7 @@ func Events(endpoints *endpoints.Endpoints, cluster *db.Cluster, f func(int64, a
 	return update, schedule
 }
 
-func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster, listeners map[int64]*lxd.EventListener, f func(int64, api.Event)) {
+func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster, f func(int64, api.Event)) {
 	// Get the current cluster nodes.
 	var nodes []db.NodeInfo
 	var offlineThreshold time.Duration
@@ -72,27 +73,31 @@ func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster,
 
 	address := endpoints.NetworkAddress()
 
-	ids := make([]int, len(nodes))
+	addresses := make([]string, len(nodes))
 	for i, node := range nodes {
-		ids[i] = int(node.ID)
+		addresses[i] = node.Address
 
 		// Don't bother trying to connect to offline nodes, or to ourselves.
 		if node.IsOffline(offlineThreshold) || node.Address == address {
 			continue
 		}
 
-		_, ok := listeners[node.ID]
+		listenersLock.Lock()
+		listener, ok := listeners[node.Address]
 
 		// The node has already a listener associated to it.
 		if ok {
 			// Double check that the listener is still
 			// connected. If it is, just move on, other
 			// we'll try to connect again.
-			if listeners[node.ID].IsActive() {
+			if listeners[node.Address].IsActive() {
+				listenersLock.Unlock()
 				continue
 			}
-			delete(listeners, node.ID)
+
+			delete(listeners, node.Address)
 		}
+		listenersLock.Unlock()
 
 		listener, err := eventsConnect(node.Address, endpoints.NetworkCert())
 		if err != nil {
@@ -101,14 +106,20 @@ func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster,
 		}
 		logger.Debugf("Listening for events on node %s", node.Address)
 		listener.AddHandler(nil, func(event api.Event) { f(node.ID, event) })
-		listeners[node.ID] = listener
+
+		listenersLock.Lock()
+		listeners[node.Address] = listener
+		listenersLock.Unlock()
 	}
-	for id, listener := range listeners {
-		if !shared.IntInSlice(int(id), ids) {
+
+	listenersLock.Lock()
+	for address, listener := range listeners {
+		if !shared.StringInSlice(address, addresses) {
 			listener.Disconnect()
-			delete(listeners, id)
+			delete(listeners, address)
 		}
 	}
+	listenersLock.Unlock()
 }
 
 // Establish a client connection to get events from the given node.

From d09c28ff48c71d51323741e3771bcbc902625dfc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 2 Dec 2019 23:54:02 -0500
Subject: [PATCH 2/6] lxd/response: Coding style
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/response/response.go | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/lxd/response/response.go b/lxd/response/response.go
index 28572de771..128b1015d0 100644
--- a/lxd/response/response.go
+++ b/lxd/response/response.go
@@ -139,6 +139,7 @@ func Conflict(err error) Response {
 	if err != nil {
 		message = err.Error()
 	}
+
 	return &errorResponse{http.StatusConflict, message}
 }
 
@@ -148,6 +149,7 @@ func Forbidden(err error) Response {
 	if err != nil {
 		message = err.Error()
 	}
+
 	return &errorResponse{http.StatusForbidden, message}
 }
 
@@ -162,6 +164,7 @@ func NotFound(err error) Response {
 	if err != nil {
 		message = err.Error()
 	}
+
 	return &errorResponse{http.StatusNotFound, message}
 }
 
@@ -171,6 +174,7 @@ func NotImplemented(err error) Response {
 	if err != nil {
 		message = err.Error()
 	}
+
 	return &errorResponse{http.StatusNotImplemented, message}
 }
 
@@ -186,6 +190,7 @@ func Unavailable(err error) Response {
 	if err != nil {
 		message = err.Error()
 	}
+
 	return &errorResponse{http.StatusServiceUnavailable, message}
 }
 
@@ -308,6 +313,7 @@ func (r *fileResponse) Render(w http.ResponseWriter) error {
 				return err
 			}
 			defer fd.Close()
+
 			rd = fd
 		} else {
 			rd = bytes.NewReader(entry.Buffer)
@@ -361,6 +367,7 @@ func (r *forwardedResponse) Render(w http.ResponseWriter) error {
 	if err != nil {
 		return err
 	}
+
 	for key := range r.request.Header {
 		forwarded.Header.Set(key, r.request.Header.Get(key))
 	}
@@ -369,6 +376,7 @@ func (r *forwardedResponse) Render(w http.ResponseWriter) error {
 	if err != nil {
 		return err
 	}
+
 	response, err := httpClient.Do(forwarded)
 	if err != nil {
 		return err

From aaa0c0f86c217f2e98358295e313dffd674b4b32 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 2 Dec 2019 23:54:10 -0500
Subject: [PATCH 3/6] lxd/operations: Use ForwardedResponse
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/operations.go | 23 +++--------------------
 1 file changed, 3 insertions(+), 20 deletions(-)

diff --git a/lxd/operations.go b/lxd/operations.go
index 780aa18fdc..4bc0ba22b3 100644
--- a/lxd/operations.go
+++ b/lxd/operations.go
@@ -16,7 +16,6 @@ import (
 	"github.com/lxc/lxd/lxd/util"
 	"github.com/lxc/lxd/shared"
 	"github.com/lxc/lxd/shared/api"
-	"github.com/lxc/lxd/shared/logger"
 )
 
 var operationCmd = APIEndpoint{
@@ -82,12 +81,7 @@ func operationGet(d *Daemon, r *http.Request) response.Response {
 		return response.SmartError(err)
 	}
 
-	body, _, err = client.GetOperation(id)
-	if err != nil {
-		return response.SmartError(err)
-	}
-
-	return response.SyncResponse(true, body)
+	return response.ForwardedResponse(client, r)
 }
 
 func operationDelete(d *Daemon, r *http.Request) response.Response {
@@ -136,12 +130,7 @@ func operationDelete(d *Daemon, r *http.Request) response.Response {
 		return response.SmartError(err)
 	}
 
-	err = client.DeleteOperation(id)
-	if err != nil {
-		return response.SmartError(err)
-	}
-
-	return response.EmptySyncResponse
+	return response.ForwardedResponse(client, r)
 }
 
 func operationsGet(d *Daemon, r *http.Request) response.Response {
@@ -361,12 +350,7 @@ func operationWaitGet(d *Daemon, r *http.Request) response.Response {
 		return response.SmartError(err)
 	}
 
-	apiOp, _, err := client.GetOperationWait(id, timeout)
-	if err != nil {
-		return response.SmartError(err)
-	}
-
-	return response.SyncResponse(true, apiOp)
+	return response.ForwardedResponse(client, r)
 }
 
 type operationWebSocket struct {
@@ -447,7 +431,6 @@ func operationWebsocketGet(d *Daemon, r *http.Request) response.Response {
 		return response.SmartError(err)
 	}
 
-	logger.Debugf("Forward operation websocket from node %s", address)
 	source, err := client.GetOperationWebsocket(id, secret)
 	if err != nil {
 		return response.SmartError(err)

From e4bb1480e5c04a49de1c77d97b294e23fee68d76 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 2 Dec 2019 23:54:34 -0500
Subject: [PATCH 4/6] lxd/images: Coding style
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/images.go | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/lxd/images.go b/lxd/images.go
index a4d934b0c9..95dec7e775 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -1925,6 +1925,7 @@ func imageExport(d *Daemon, r *http.Request) response.Response {
 		if err != nil {
 			return response.SmartError(err)
 		}
+
 		return response.ForwardedResponse(client, r)
 	}
 
@@ -2217,12 +2218,14 @@ func imageSyncBetweenNodes(d *Daemon, project string, fingerprint string) error
 	if err != nil {
 		return errors.Wrap(err, "Failed to fetch the leader node address")
 	}
+
 	var targetNodeAddress string
 	if shared.StringInSlice(leader, addresses) {
 		targetNodeAddress = leader
 	} else {
 		targetNodeAddress = addresses[0]
 	}
+
 	client, err := cluster.Connect(targetNodeAddress, d.endpoints.NetworkCert(), true)
 	if err != nil {
 		return errors.Wrap(err, "Failed to connect node for image synchronization")

From 194a3e44fe7ce91a94849ef14e241bc191e330d7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 2 Dec 2019 23:55:26 -0500
Subject: [PATCH 5/6] lxd/cluster: Coding style
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/api_cluster.go | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 7ea90a760e..3f02fdd0f4 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -612,6 +612,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
 		if err != nil {
 			return err
 		}
+
 		err = clusterRebalance(client)
 		if err != nil {
 			return err
@@ -922,10 +923,12 @@ func clusterNodeDelete(d *Daemon, r *http.Request) response.Response {
 		if err != nil {
 			return response.SmartError(err)
 		}
+
 		networks, err := d.cluster.Networks()
 		if err != nil {
 			return response.SmartError(err)
 		}
+
 		for _, name := range networks {
 			err := client.DeleteNetwork(name)
 			if err != nil {
@@ -938,6 +941,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) response.Response {
 		if err != nil && err != db.ErrNoSuchObject {
 			return response.SmartError(err)
 		}
+
 		for _, name := range pools {
 			err := client.DeleteStoragePool(name)
 			if err != nil {
@@ -965,6 +969,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) response.Response {
 		if err != nil {
 			return response.SmartError(err)
 		}
+
 		put := api.ClusterPut{}
 		put.Enabled = false
 		_, err = client.UpdateCluster(put, "")
@@ -982,17 +987,19 @@ func tryClusterRebalance(d *Daemon) error {
 	leader, err := d.gateway.LeaderAddress()
 	if err != nil {
 		// This is not a fatal error, so let's just log it.
-		return errors.Wrap(err, "failed to get current leader member")
+		return errors.Wrap(err, "Failed to get current leader member")
 	}
 	cert := d.endpoints.NetworkCert()
 	client, err := cluster.Connect(leader, cert, true)
 	if err != nil {
-		return errors.Wrap(err, "failed to connect to leader member")
+		return errors.Wrap(err, "Failed to connect to leader member")
 	}
+
 	_, _, err = client.RawQuery("POST", "/internal/cluster/rebalance", nil, "")
 	if err != nil {
-		return errors.Wrap(err, "request to rebalance cluster failed")
+		return errors.Wrap(err, "Request to rebalance cluster failed")
 	}
+
 	return nil
 }
 
@@ -1159,6 +1166,7 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response
 	if err != nil {
 		return response.SmartError(err)
 	}
+
 	_, _, err = client.RawQuery("POST", "/internal/cluster/promote", post, "")
 	if err != nil {
 		return response.SmartError(err)

From c65b847a0a2b541cd6ffd6af843540db795552e7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 2 Dec 2019 23:56:49 -0500
Subject: [PATCH 6/6] lxd: Tweak cluster.Connect calls
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/api_cluster.go    | 6 +++---
 lxd/container_post.go | 2 +-
 lxd/networks.go       | 2 +-
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 3f02fdd0f4..209fb08ed4 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -608,7 +608,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
 		// Add the cluster flag from the agent
 		version.UserAgentFeatures([]string{"cluster"})
 
-		client, err = cluster.Connect(req.ClusterAddress, d.endpoints.NetworkCert(), false)
+		client, err = cluster.Connect(req.ClusterAddress, d.endpoints.NetworkCert(), true)
 		if err != nil {
 			return err
 		}
@@ -965,7 +965,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) response.Response {
 	if force != 1 {
 		// Try to gracefully reset the database on the node.
 		cert := d.endpoints.NetworkCert()
-		client, err := cluster.Connect(address, cert, false)
+		client, err := cluster.Connect(address, cert, true)
 		if err != nil {
 			return response.SmartError(err)
 		}
@@ -1162,7 +1162,7 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response
 	}
 
 	cert := d.endpoints.NetworkCert()
-	client, err := cluster.Connect(address, cert, false)
+	client, err := cluster.Connect(address, cert, true)
 	if err != nil {
 		return response.SmartError(err)
 	}
diff --git a/lxd/container_post.go b/lxd/container_post.go
index b8b6ba8592..0bcff60ed2 100644
--- a/lxd/container_post.go
+++ b/lxd/container_post.go
@@ -297,7 +297,7 @@ func containerPostClusteringMigrate(d *Daemon, c instance.Instance, oldName, new
 
 	run := func(*operations.Operation) error {
 		// Connect to the source host, i.e. ourselves (the node the container is running on).
-		source, err := cluster.Connect(sourceAddress, cert, false)
+		source, err := cluster.Connect(sourceAddress, cert, true)
 		if err != nil {
 			return errors.Wrap(err, "Failed to connect to source server")
 		}
diff --git a/lxd/networks.go b/lxd/networks.go
index a7b0336209..057e3dc53d 100644
--- a/lxd/networks.go
+++ b/lxd/networks.go
@@ -2316,7 +2316,7 @@ func (n *network) refreshForkdnsServerAddresses(heartbeatData *cluster.APIHeartb
 			continue
 		}
 
-		client, err := cluster.Connect(node.Address, cert, false)
+		client, err := cluster.Connect(node.Address, cert, true)
 		if err != nil {
 			return err
 		}


More information about the lxc-devel mailing list