[lxc-devel] [lxd/master] Fix operation handling in cluster
stgraber on Github
lxc-bot at linuxcontainers.org
Fri Jul 6 22:12:36 UTC 2018
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180706/508dcfd1/attachment.bin>
-------------- next part --------------
From 3a341e9aadc5be669dac2f4717ab2f978efeefab Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Fri, 6 Jul 2018 18:10:41 -0400
Subject: [PATCH 1/2] client: Export OperationWait
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
client/interfaces.go | 3 ++-
client/lxd_operations.go | 13 +++++++++++++
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git a/client/interfaces.go b/client/interfaces.go
index d871be229..f44f0e4b8 100644
--- a/client/interfaces.go
+++ b/client/interfaces.go
@@ -163,8 +163,9 @@ type ContainerServer interface {
GetOperationUUIDs() (uuids []string, err error)
GetOperations() (operations []api.Operation, err error)
GetOperation(uuid string) (op *api.Operation, ETag string, err error)
- DeleteOperation(uuid string) (err error)
+ GetOperationWait(uuid string, timeout int) (op *api.Operation, ETag string, err error)
GetOperationWebsocket(uuid string, secret string) (conn *websocket.Conn, err error)
+ DeleteOperation(uuid string) (err error)
// Profile functions
GetProfileNames() (names []string, err error)
diff --git a/client/lxd_operations.go b/client/lxd_operations.go
index d5886214b..988558a84 100644
--- a/client/lxd_operations.go
+++ b/client/lxd_operations.go
@@ -64,6 +64,19 @@ func (r *ProtocolLXD) GetOperation(uuid string) (*api.Operation, string, error)
return &op, etag, nil
}
+// GetOperationWait returns an Operation entry for the provided uuid once it's complete or hits the timeout
+func (r *ProtocolLXD) GetOperationWait(uuid string, timeout int) (*api.Operation, string, error) {
+ op := api.Operation{}
+
+ // Fetch the raw value
+ etag, err := r.queryStruct("GET", fmt.Sprintf("/operations/%s/wait?timeout=%d", url.QueryEscape(uuid), timeout), nil, "", &op)
+ if err != nil {
+ return nil, "", err
+ }
+
+ return &op, etag, nil
+}
+
// GetOperationWebsocket returns a websocket connection for the provided operation
func (r *ProtocolLXD) GetOperationWebsocket(uuid string, secret string) (*websocket.Conn, error) {
path := fmt.Sprintf("/operations/%s/websocket", url.QueryEscape(uuid))
From 024c71f8655f6f0a0629eda00b8d8119e147dbeb Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Fri, 6 Jul 2018 18:11:15 -0400
Subject: [PATCH 2/2] lxd/operations: Forward to right cluster node
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Closes #4721
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/db/operations.go | 6 ++
lxd/operations.go | 293 ++++++++++++++++++++++++++++++++++++++++++---------
2 files changed, 247 insertions(+), 52 deletions(-)
diff --git a/lxd/db/operations.go b/lxd/db/operations.go
index 0597d5cd2..6e7ccbe6e 100644
--- a/lxd/db/operations.go
+++ b/lxd/db/operations.go
@@ -22,6 +22,12 @@ func (c *ClusterTx) OperationsUUIDs() ([]string, error) {
return query.SelectStrings(c.tx, stmt, c.nodeID)
}
+// OperationNodes returns a list of nodes that have running operations
+func (c *ClusterTx) OperationNodes() ([]string, error) {
+ stmt := "SELECT DISTINCT nodes.address FROM operations JOIN nodes ON nodes.id = node_id"
+ return query.SelectStrings(c.tx, stmt)
+}
+
// OperationByUUID returns the operation with the given UUID.
func (c *ClusterTx) OperationByUUID(uuid string) (Operation, error) {
null := Operation{}
diff --git a/lxd/operations.go b/lxd/operations.go
index 299ab8107..a36cec4ec 100644
--- a/lxd/operations.go
+++ b/lxd/operations.go
@@ -14,6 +14,7 @@ import (
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/node"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
@@ -453,35 +454,41 @@ func operationAPIGet(d *Daemon, r *http.Request) Response {
var body *api.Operation
- // First check the local cache, then the cluster database table.
+ // First check if the query is for a local operation from this node
op, err := operationGet(id)
if err == nil {
_, body, err = op.Render()
if err != nil {
return SmartError(err)
}
- } else {
- var address string
- err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
- operation, err := tx.OperationByUUID(id)
- if err != nil {
- return err
- }
- address = operation.NodeAddress
- return nil
- })
- if err != nil {
- return SmartError(err)
- }
- cert := d.endpoints.NetworkCert()
- client, err := cluster.Connect(address, cert, false)
- if err != nil {
- return SmartError(err)
- }
- body, _, err = client.GetOperation(id)
+
+ return SyncResponse(true, body)
+ }
+
+ // Then check if the query is from an operation on another node, and, if so, forward it
+ var address string
+ err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
+ operation, err := tx.OperationByUUID(id)
if err != nil {
- return SmartError(err)
+ return err
}
+
+ address = operation.NodeAddress
+ return nil
+ })
+ if err != nil {
+ return SmartError(err)
+ }
+
+ cert := d.endpoints.NetworkCert()
+ client, err := cluster.Connect(address, cert, false)
+ if err != nil {
+ return SmartError(err)
+ }
+
+ body, _, err = client.GetOperation(id)
+ if err != nil {
+ return SmartError(err)
}
return SyncResponse(true, body)
@@ -490,14 +497,41 @@ func operationAPIGet(d *Daemon, r *http.Request) Response {
func operationAPIDelete(d *Daemon, r *http.Request) Response {
id := mux.Vars(r)["id"]
+ // First check if the query is for a local operation from this node
op, err := operationGet(id)
+ if err == nil {
+ _, err = op.Cancel()
+ if err != nil {
+ return BadRequest(err)
+ }
+
+ return EmptySyncResponse
+ }
+
+ // Then check if the query is from an operation on another node, and, if so, forward it
+ var address string
+ err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
+ operation, err := tx.OperationByUUID(id)
+ if err != nil {
+ return err
+ }
+
+ address = operation.NodeAddress
+ return nil
+ })
if err != nil {
- return NotFound(err)
+ return SmartError(err)
}
- _, err = op.Cancel()
+ cert := d.endpoints.NetworkCert()
+ client, err := cluster.Connect(address, cert, false)
if err != nil {
- return BadRequest(err)
+ return SmartError(err)
+ }
+
+ err = client.DeleteOperation(id)
+ if err != nil {
+ return SmartError(err)
}
return EmptySyncResponse
@@ -506,38 +540,165 @@ func operationAPIDelete(d *Daemon, r *http.Request) Response {
var operationCmd = Command{name: "operations/{id}", get: operationAPIGet, delete: operationAPIDelete}
func operationsAPIGet(d *Daemon, r *http.Request) Response {
- var md shared.Jmap
-
recursion := util.IsRecursionRequest(r)
- md = shared.Jmap{}
+ localOperationURLs := func() (shared.Jmap, error) {
+ // Get all the operations
+ operationsLock.Lock()
+ ops := operations
+ operationsLock.Unlock()
- operationsLock.Lock()
- ops := operations
- operationsLock.Unlock()
+ // Build a list of URLs
+ body := shared.Jmap{}
- for _, v := range ops {
- status := strings.ToLower(v.status.String())
- _, ok := md[status]
- if !ok {
- if recursion {
- md[status] = make([]*api.Operation, 0)
- } else {
- md[status] = make([]string, 0)
+ for _, v := range ops {
+ status := strings.ToLower(v.status.String())
+ _, ok := body[status]
+ if !ok {
+ body[status] = make([]string, 0)
}
+
+ body[status] = append(body[status].([]string), v.url)
}
- if !recursion {
- md[status] = append(md[status].([]string), v.url)
- continue
+ return body, nil
+ }
+
+ localOperations := func() (shared.Jmap, error) {
+ // Get all the operations
+ operationsLock.Lock()
+ ops := operations
+ operationsLock.Unlock()
+
+ // Build a list of operations
+ body := shared.Jmap{}
+
+ for _, v := range ops {
+ status := strings.ToLower(v.status.String())
+ _, ok := body[status]
+ if !ok {
+ body[status] = make([]*api.Operation, 0)
+ }
+
+ _, op, err := v.Render()
+ if err != nil {
+ return nil, err
+ }
+
+ body[status] = append(body[status].([]*api.Operation), op)
}
- _, body, err := v.Render()
+ return body, nil
+ }
+
+ // Check if called from a cluster node
+ if isClusterNotification(r) {
+ // Only return the local data
+ if recursion {
+ // Recursive queries
+ body, err := localOperations()
+ if err != nil {
+ return InternalError(err)
+ }
+
+ return SyncResponse(true, body)
+ }
+
+ // Normal queries
+ body, err := localOperationURLs()
+ if err != nil {
+ return InternalError(err)
+ }
+
+ return SyncResponse(true, body)
+ }
+
+ // Start with local operations
+ var md shared.Jmap
+ var err error
+
+ if recursion {
+ md, err = localOperations()
+ if err != nil {
+ return InternalError(err)
+ }
+ } else {
+ md, err = localOperationURLs()
+ if err != nil {
+ return InternalError(err)
+ }
+ }
+
+ // Check if clustered
+ clustered, err := cluster.Enabled(d.db)
+ if err != nil {
+ return InternalError(err)
+ }
+
+ // Return now if not clustered
+ if !clustered {
+ return SyncResponse(true, md)
+ }
+
+ // Get all nodes with running operations
+ var nodes []string
+ err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
+ var err error
+
+ nodes, err = tx.OperationNodes()
if err != nil {
+ return err
+ }
+
+ return nil
+ })
+ if err != nil {
+ return SmartError(err)
+ }
+
+ // Get local address
+ localAddress, err := node.HTTPSAddress(d.db)
+ if err != nil {
+ return InternalError(err)
+ }
+
+ cert := d.endpoints.NetworkCert()
+ for _, node := range nodes {
+ if node == localAddress {
continue
}
- md[status] = append(md[status].([]*api.Operation), body)
+ // Connect to the remote server
+ client, err := cluster.Connect(node, cert, true)
+ if err != nil {
+ return SmartError(err)
+ }
+
+ // Get operation data
+ ops, err := client.GetOperations()
+ if err != nil {
+ return SmartError(err)
+ }
+
+ // Merge with existing data
+ for _, op := range ops {
+ status := strings.ToLower(op.Status)
+
+ _, ok := md[status]
+ if !ok {
+ if recursion {
+ md[status] = make([]*api.Operation, 0)
+ } else {
+ md[status] = make([]string, 0)
+ }
+ }
+
+ if recursion {
+ md[status] = append(md[status].([]*api.Operation), &op)
+ } else {
+ md[status] = append(md[status].([]string), fmt.Sprintf("/1.0/operations/%s", op.ID))
+ }
+ }
}
return SyncResponse(true, md)
@@ -546,23 +707,51 @@ func operationsAPIGet(d *Daemon, r *http.Request) Response {
var operationsCmd = Command{name: "operations", get: operationsAPIGet}
func operationAPIWaitGet(d *Daemon, r *http.Request) Response {
+ id := mux.Vars(r)["id"]
+
timeout, err := shared.AtoiEmptyDefault(r.FormValue("timeout"), -1)
if err != nil {
return InternalError(err)
}
- id := mux.Vars(r)["id"]
+ // First check if the query is for a local operation from this node
op, err := operationGet(id)
+ if err == nil {
+ _, err = op.WaitFinal(timeout)
+ if err != nil {
+ return InternalError(err)
+ }
+
+ _, body, err := op.Render()
+ if err != nil {
+ return SmartError(err)
+ }
+
+ return SyncResponse(true, body)
+ }
+
+ // Then check if the query is from an operation on another node, and, if so, forward it
+ var address string
+ err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
+ operation, err := tx.OperationByUUID(id)
+ if err != nil {
+ return err
+ }
+
+ address = operation.NodeAddress
+ return nil
+ })
if err != nil {
- return NotFound(err)
+ return SmartError(err)
}
- _, err = op.WaitFinal(timeout)
+ cert := d.endpoints.NetworkCert()
+ client, err := cluster.Connect(address, cert, false)
if err != nil {
- return InternalError(err)
+ return SmartError(err)
}
- _, body, err := op.Render()
+ _, body, err := client.GetOperationWait(id, timeout)
if err != nil {
return SmartError(err)
}
@@ -618,15 +807,13 @@ func (r *forwardedOperationWebSocket) String() string {
func operationAPIWebsocketGet(d *Daemon, r *http.Request) Response {
id := mux.Vars(r)["id"]
- // First check if the websocket is for a local operation from this
- // node.
+ // First check if the query is for a local operation from this node
op, err := operationGet(id)
if err == nil {
return &operationWebSocket{r, op}
}
- // Secondly check if the websocket is from an operation on another
- // node, and, if so, proxy it.
+ // Then check if the query is from an operation on another node, and, if so, forward it
secret := r.FormValue("secret")
if secret == "" {
return BadRequest(fmt.Errorf("missing secret"))
@@ -638,6 +825,7 @@ func operationAPIWebsocketGet(d *Daemon, r *http.Request) Response {
if err != nil {
return err
}
+
address = operation.NodeAddress
return nil
})
@@ -656,6 +844,7 @@ func operationAPIWebsocketGet(d *Daemon, r *http.Request) Response {
if err != nil {
return SmartError(err)
}
+
return &forwardedOperationWebSocket{req: r, id: id, source: source}
}
More information about the lxc-devel
mailing list