[lxc-devel] [lxd/master] Block cluster connections on working event forwarding
stgraber on Github
lxc-bot at linuxcontainers.org
Tue Dec 3 04:59:11 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 645 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20191202/e35c86ee/attachment.bin>
-------------- next part --------------
From 769f4fdfcc484b73b39343505bb030101e7c63a4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Sun, 1 Dec 2019 23:27:16 -0500
Subject: [PATCH 1/6] lxd/cluster: More reliable event delivery
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/cluster/connect.go | 22 ++++++++++++++++++++++
lxd/cluster/events.go | 39 +++++++++++++++++++++++++--------------
2 files changed, 47 insertions(+), 14 deletions(-)
diff --git a/lxd/cluster/connect.go b/lxd/cluster/connect.go
index 43ec165eb6..bd9c3881cb 100644
--- a/lxd/cluster/connect.go
+++ b/lxd/cluster/connect.go
@@ -4,6 +4,7 @@ import (
"encoding/base64"
"encoding/pem"
"fmt"
+ "time"
lxd "github.com/lxc/lxd/client"
"github.com/lxc/lxd/lxd/db"
@@ -20,6 +21,27 @@ import (
// value 'lxd-cluster-notifier', which can be used in some cases to distinguish
// between a regular client request and an internal cluster request.
func Connect(address string, cert *shared.CertInfo, notify bool) (lxd.InstanceServer, error) {
+ // Wait for a connection to the events API first for non-notify connections.
+ if !notify {
+ connected := false
+ for i := 0; i < 20; i++ {
+ listenersLock.Lock()
+ _, ok := listeners[address]
+ listenersLock.Unlock()
+
+ if ok {
+ connected = true
+ break
+ }
+
+ time.Sleep(500 * time.Millisecond)
+ }
+
+ if !connected {
+ return nil, fmt.Errorf("Missing event connection with target cluster member")
+ }
+ }
+
args := &lxd.ConnectionArgs{
TLSServerCert: string(cert.PublicKey()),
TLSClientCert: string(cert.PublicKey()),
diff --git a/lxd/cluster/events.go b/lxd/cluster/events.go
index bbcef4c0a7..8e77a0d46a 100644
--- a/lxd/cluster/events.go
+++ b/lxd/cluster/events.go
@@ -2,6 +2,7 @@ package cluster
import (
"context"
+ "sync"
"time"
lxd "github.com/lxc/lxd/client"
@@ -13,28 +14,28 @@ import (
"github.com/lxc/lxd/shared/logger"
)
+var listeners = map[string]*lxd.EventListener{}
+var listenersLock sync.Mutex
+
// Events starts a task that continuously monitors the list of cluster nodes and
// maintains a pool of websocket connections against all of them, in order to
// get notified about events.
//
// Whenever an event is received the given callback is invoked.
func Events(endpoints *endpoints.Endpoints, cluster *db.Cluster, f func(int64, api.Event)) (task.Func, task.Schedule) {
- listeners := map[int64]*lxd.EventListener{}
-
// Update our pool of event listeners. Since database queries are
// blocking, we spawn the actual logic in a goroutine, to abort
// immediately when we receive the stop signal.
update := func(ctx context.Context) {
ch := make(chan struct{})
go func() {
- eventsUpdateListeners(endpoints, cluster, listeners, f)
+ eventsUpdateListeners(endpoints, cluster, f)
ch <- struct{}{}
}()
select {
case <-ch:
case <-ctx.Done():
}
-
}
schedule := task.Every(time.Second)
@@ -42,7 +43,7 @@ func Events(endpoints *endpoints.Endpoints, cluster *db.Cluster, f func(int64, a
return update, schedule
}
-func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster, listeners map[int64]*lxd.EventListener, f func(int64, api.Event)) {
+func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster, f func(int64, api.Event)) {
// Get the current cluster nodes.
var nodes []db.NodeInfo
var offlineThreshold time.Duration
@@ -72,27 +73,31 @@ func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster,
address := endpoints.NetworkAddress()
- ids := make([]int, len(nodes))
+ addresses := make([]string, len(nodes))
for i, node := range nodes {
- ids[i] = int(node.ID)
+ addresses[i] = node.Address
// Don't bother trying to connect to offline nodes, or to ourselves.
if node.IsOffline(offlineThreshold) || node.Address == address {
continue
}
- _, ok := listeners[node.ID]
+ listenersLock.Lock()
+ listener, ok := listeners[node.Address]
// The node has already a listener associated to it.
if ok {
// Double check that the listener is still
// connected. If it is, just move on, other
// we'll try to connect again.
- if listeners[node.ID].IsActive() {
+ if listeners[node.Address].IsActive() {
+ listenersLock.Unlock()
continue
}
- delete(listeners, node.ID)
+
+ delete(listeners, node.Address)
}
+ listenersLock.Unlock()
listener, err := eventsConnect(node.Address, endpoints.NetworkCert())
if err != nil {
@@ -101,14 +106,20 @@ func eventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster,
}
logger.Debugf("Listening for events on node %s", node.Address)
listener.AddHandler(nil, func(event api.Event) { f(node.ID, event) })
- listeners[node.ID] = listener
+
+ listenersLock.Lock()
+ listeners[node.Address] = listener
+ listenersLock.Unlock()
}
- for id, listener := range listeners {
- if !shared.IntInSlice(int(id), ids) {
+
+ listenersLock.Lock()
+ for address, listener := range listeners {
+ if !shared.StringInSlice(address, addresses) {
listener.Disconnect()
- delete(listeners, id)
+ delete(listeners, address)
}
}
+ listenersLock.Unlock()
}
// Establish a client connection to get events from the given node.
From d09c28ff48c71d51323741e3771bcbc902625dfc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 2 Dec 2019 23:54:02 -0500
Subject: [PATCH 2/6] lxd/response: Coding style
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/response/response.go | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git a/lxd/response/response.go b/lxd/response/response.go
index 28572de771..128b1015d0 100644
--- a/lxd/response/response.go
+++ b/lxd/response/response.go
@@ -139,6 +139,7 @@ func Conflict(err error) Response {
if err != nil {
message = err.Error()
}
+
return &errorResponse{http.StatusConflict, message}
}
@@ -148,6 +149,7 @@ func Forbidden(err error) Response {
if err != nil {
message = err.Error()
}
+
return &errorResponse{http.StatusForbidden, message}
}
@@ -162,6 +164,7 @@ func NotFound(err error) Response {
if err != nil {
message = err.Error()
}
+
return &errorResponse{http.StatusNotFound, message}
}
@@ -171,6 +174,7 @@ func NotImplemented(err error) Response {
if err != nil {
message = err.Error()
}
+
return &errorResponse{http.StatusNotImplemented, message}
}
@@ -186,6 +190,7 @@ func Unavailable(err error) Response {
if err != nil {
message = err.Error()
}
+
return &errorResponse{http.StatusServiceUnavailable, message}
}
@@ -308,6 +313,7 @@ func (r *fileResponse) Render(w http.ResponseWriter) error {
return err
}
defer fd.Close()
+
rd = fd
} else {
rd = bytes.NewReader(entry.Buffer)
@@ -361,6 +367,7 @@ func (r *forwardedResponse) Render(w http.ResponseWriter) error {
if err != nil {
return err
}
+
for key := range r.request.Header {
forwarded.Header.Set(key, r.request.Header.Get(key))
}
@@ -369,6 +376,7 @@ func (r *forwardedResponse) Render(w http.ResponseWriter) error {
if err != nil {
return err
}
+
response, err := httpClient.Do(forwarded)
if err != nil {
return err
From aaa0c0f86c217f2e98358295e313dffd674b4b32 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 2 Dec 2019 23:54:10 -0500
Subject: [PATCH 3/6] lxd/operations: Use ForwardedResponse
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/operations.go | 23 +++--------------------
1 file changed, 3 insertions(+), 20 deletions(-)
diff --git a/lxd/operations.go b/lxd/operations.go
index 780aa18fdc..4bc0ba22b3 100644
--- a/lxd/operations.go
+++ b/lxd/operations.go
@@ -16,7 +16,6 @@ import (
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
- "github.com/lxc/lxd/shared/logger"
)
var operationCmd = APIEndpoint{
@@ -82,12 +81,7 @@ func operationGet(d *Daemon, r *http.Request) response.Response {
return response.SmartError(err)
}
- body, _, err = client.GetOperation(id)
- if err != nil {
- return response.SmartError(err)
- }
-
- return response.SyncResponse(true, body)
+ return response.ForwardedResponse(client, r)
}
func operationDelete(d *Daemon, r *http.Request) response.Response {
@@ -136,12 +130,7 @@ func operationDelete(d *Daemon, r *http.Request) response.Response {
return response.SmartError(err)
}
- err = client.DeleteOperation(id)
- if err != nil {
- return response.SmartError(err)
- }
-
- return response.EmptySyncResponse
+ return response.ForwardedResponse(client, r)
}
func operationsGet(d *Daemon, r *http.Request) response.Response {
@@ -361,12 +350,7 @@ func operationWaitGet(d *Daemon, r *http.Request) response.Response {
return response.SmartError(err)
}
- apiOp, _, err := client.GetOperationWait(id, timeout)
- if err != nil {
- return response.SmartError(err)
- }
-
- return response.SyncResponse(true, apiOp)
+ return response.ForwardedResponse(client, r)
}
type operationWebSocket struct {
@@ -447,7 +431,6 @@ func operationWebsocketGet(d *Daemon, r *http.Request) response.Response {
return response.SmartError(err)
}
- logger.Debugf("Forward operation websocket from node %s", address)
source, err := client.GetOperationWebsocket(id, secret)
if err != nil {
return response.SmartError(err)
From e4bb1480e5c04a49de1c77d97b294e23fee68d76 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 2 Dec 2019 23:54:34 -0500
Subject: [PATCH 4/6] lxd/images: Coding style
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/images.go | 3 +++
1 file changed, 3 insertions(+)
diff --git a/lxd/images.go b/lxd/images.go
index a4d934b0c9..95dec7e775 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -1925,6 +1925,7 @@ func imageExport(d *Daemon, r *http.Request) response.Response {
if err != nil {
return response.SmartError(err)
}
+
return response.ForwardedResponse(client, r)
}
@@ -2217,12 +2218,14 @@ func imageSyncBetweenNodes(d *Daemon, project string, fingerprint string) error
if err != nil {
return errors.Wrap(err, "Failed to fetch the leader node address")
}
+
var targetNodeAddress string
if shared.StringInSlice(leader, addresses) {
targetNodeAddress = leader
} else {
targetNodeAddress = addresses[0]
}
+
client, err := cluster.Connect(targetNodeAddress, d.endpoints.NetworkCert(), true)
if err != nil {
return errors.Wrap(err, "Failed to connect node for image synchronization")
From 194a3e44fe7ce91a94849ef14e241bc191e330d7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 2 Dec 2019 23:55:26 -0500
Subject: [PATCH 5/6] lxd/cluster: Coding style
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/api_cluster.go | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 7ea90a760e..3f02fdd0f4 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -612,6 +612,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
if err != nil {
return err
}
+
err = clusterRebalance(client)
if err != nil {
return err
@@ -922,10 +923,12 @@ func clusterNodeDelete(d *Daemon, r *http.Request) response.Response {
if err != nil {
return response.SmartError(err)
}
+
networks, err := d.cluster.Networks()
if err != nil {
return response.SmartError(err)
}
+
for _, name := range networks {
err := client.DeleteNetwork(name)
if err != nil {
@@ -938,6 +941,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) response.Response {
if err != nil && err != db.ErrNoSuchObject {
return response.SmartError(err)
}
+
for _, name := range pools {
err := client.DeleteStoragePool(name)
if err != nil {
@@ -965,6 +969,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) response.Response {
if err != nil {
return response.SmartError(err)
}
+
put := api.ClusterPut{}
put.Enabled = false
_, err = client.UpdateCluster(put, "")
@@ -982,17 +987,19 @@ func tryClusterRebalance(d *Daemon) error {
leader, err := d.gateway.LeaderAddress()
if err != nil {
// This is not a fatal error, so let's just log it.
- return errors.Wrap(err, "failed to get current leader member")
+ return errors.Wrap(err, "Failed to get current leader member")
}
cert := d.endpoints.NetworkCert()
client, err := cluster.Connect(leader, cert, true)
if err != nil {
- return errors.Wrap(err, "failed to connect to leader member")
+ return errors.Wrap(err, "Failed to connect to leader member")
}
+
_, _, err = client.RawQuery("POST", "/internal/cluster/rebalance", nil, "")
if err != nil {
- return errors.Wrap(err, "request to rebalance cluster failed")
+ return errors.Wrap(err, "Request to rebalance cluster failed")
}
+
return nil
}
@@ -1159,6 +1166,7 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response
if err != nil {
return response.SmartError(err)
}
+
_, _, err = client.RawQuery("POST", "/internal/cluster/promote", post, "")
if err != nil {
return response.SmartError(err)
From c65b847a0a2b541cd6ffd6af843540db795552e7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 2 Dec 2019 23:56:49 -0500
Subject: [PATCH 6/6] lxd: Tweak cluster.Connect calls
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/api_cluster.go | 6 +++---
lxd/container_post.go | 2 +-
lxd/networks.go | 2 +-
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 3f02fdd0f4..209fb08ed4 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -608,7 +608,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
// Add the cluster flag from the agent
version.UserAgentFeatures([]string{"cluster"})
- client, err = cluster.Connect(req.ClusterAddress, d.endpoints.NetworkCert(), false)
+ client, err = cluster.Connect(req.ClusterAddress, d.endpoints.NetworkCert(), true)
if err != nil {
return err
}
@@ -965,7 +965,7 @@ func clusterNodeDelete(d *Daemon, r *http.Request) response.Response {
if force != 1 {
// Try to gracefully reset the database on the node.
cert := d.endpoints.NetworkCert()
- client, err := cluster.Connect(address, cert, false)
+ client, err := cluster.Connect(address, cert, true)
if err != nil {
return response.SmartError(err)
}
@@ -1162,7 +1162,7 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response
}
cert := d.endpoints.NetworkCert()
- client, err := cluster.Connect(address, cert, false)
+ client, err := cluster.Connect(address, cert, true)
if err != nil {
return response.SmartError(err)
}
diff --git a/lxd/container_post.go b/lxd/container_post.go
index b8b6ba8592..0bcff60ed2 100644
--- a/lxd/container_post.go
+++ b/lxd/container_post.go
@@ -297,7 +297,7 @@ func containerPostClusteringMigrate(d *Daemon, c instance.Instance, oldName, new
run := func(*operations.Operation) error {
// Connect to the source host, i.e. ourselves (the node the container is running on).
- source, err := cluster.Connect(sourceAddress, cert, false)
+ source, err := cluster.Connect(sourceAddress, cert, true)
if err != nil {
return errors.Wrap(err, "Failed to connect to source server")
}
diff --git a/lxd/networks.go b/lxd/networks.go
index a7b0336209..057e3dc53d 100644
--- a/lxd/networks.go
+++ b/lxd/networks.go
@@ -2316,7 +2316,7 @@ func (n *network) refreshForkdnsServerAddresses(heartbeatData *cluster.APIHeartb
continue
}
- client, err := cluster.Connect(node.Address, cert, false)
+ client, err := cluster.Connect(node.Address, cert, true)
if err != nil {
return err
}
More information about the lxc-devel
mailing list