[lxc-devel] [lxd/master] Implement initial LXD VM agent
monstermunchkin on Github
lxc-bot at linuxcontainers.org
Wed Sep 25 18:57:41 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 685 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190925/bcfd6e94/attachment-0001.bin>
-------------- next part --------------
From ae05812e07b4b3117affd4e162039c83f8058726 Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Wed, 25 Sep 2019 16:19:42 +0200
Subject: [PATCH 1/3] lxd: Move IsJSONRequest to util package
This moves IsJSONRequest to the util package in order for it to be used
by the lxd-agent.
Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
lxd/daemon.go | 13 +------------
lxd/util/http.go | 11 +++++++++++
2 files changed, 12 insertions(+), 12 deletions(-)
diff --git a/lxd/daemon.go b/lxd/daemon.go
index 2d2d433ee5..4ba13980a3 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -318,17 +318,6 @@ func writeMacaroonsRequiredResponse(b *identchecker.Bakery, r *http.Request, w h
return
}
-func isJSONRequest(r *http.Request) bool {
- for k, vs := range r.Header {
- if strings.ToLower(k) == "content-type" &&
- len(vs) == 1 && strings.ToLower(vs[0]) == "application/json" {
- return true
- }
- }
-
- return false
-}
-
// State creates a new State instance linked to our internal db and os.
func (d *Daemon) State() *state.State {
return state.NewState(d.db, d.cluster, d.maas, d.os, d.endpoints)
@@ -403,7 +392,7 @@ func (d *Daemon) createCmd(restAPI *mux.Router, version string, c APIEndpoint) {
}
// Dump full request JSON when in debug mode
- if debug && r.Method != "GET" && isJSONRequest(r) {
+ if debug && r.Method != "GET" && util.IsJSONRequest(r) {
newBody := &bytes.Buffer{}
captured := &bytes.Buffer{}
multiW := io.MultiWriter(newBody, captured)
diff --git a/lxd/util/http.go b/lxd/util/http.go
index 96674dd60a..6e65b6779b 100644
--- a/lxd/util/http.go
+++ b/lxd/util/http.go
@@ -269,3 +269,14 @@ func GetListeners(start int) []net.Listener {
// stdout and stderr), so this constant should always be the value passed to
// GetListeners, except for unit tests.
const SystemdListenFDsStart = 3
+
+func IsJSONRequest(r *http.Request) bool {
+ for k, vs := range r.Header {
+ if strings.ToLower(k) == "content-type" &&
+ len(vs) == 1 && strings.ToLower(vs[0]) == "application/json" {
+ return true
+ }
+ }
+
+ return false
+}
From caed22079fcf9e67648dcc2b0691b7e149d68854 Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Wed, 25 Sep 2019 11:24:12 +0200
Subject: [PATCH 2/3] client: Add vsock support
This allows us to reuse client functions when communicating with the vm
agent inside of the VM.
Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
client/connection.go | 37 +++++++++++++++++++++++++++++++++++++
client/util.go | 16 +++++++++++++++-
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/client/connection.go b/client/connection.go
index 9be6e4c3a1..43a6355026 100644
--- a/client/connection.go
+++ b/client/connection.go
@@ -67,6 +67,43 @@ func ConnectLXD(url string, args *ConnectionArgs) (InstanceServer, error) {
return httpsLXD(url, args)
}
+// ConnectVMAgent lets you connect to a VM agent over a VM socket.
+func ConnectVMAgent(vsockID int, args *ConnectionArgs) (InstanceServer, error) {
+ logger.Debugf("Connecting to a VM agent over a VM socket")
+
+ // Use empty args if not specified
+ if args == nil {
+ args = &ConnectionArgs{}
+ }
+
+ // Initialize the client struct
+ server := ProtocolLXD{
+ httpHost: "http://vm.socket",
+ httpProtocol: "vsock",
+ httpUserAgent: args.UserAgent,
+ }
+
+ // Setup the HTTP client
+ httpClient, err := vsockHTTPClient(args.HTTPClient, vsockID)
+ if err != nil {
+ return nil, err
+ }
+ server.http = httpClient
+
+ // Test the connection and seed the server information
+ if !args.SkipGetServer {
+ serverStatus, _, err := server.GetServer()
+ if err != nil {
+ return nil, err
+ }
+
+ // Record the server certificate
+ server.httpCertificate = serverStatus.Environment.Certificate
+ }
+
+ return &server, nil
+}
+
// ConnectLXDUnix lets you connect to a remote LXD daemon over a local unix socket.
//
// If the path argument is empty, then $LXD_SOCKET will be used, if
diff --git a/client/util.go b/client/util.go
index 6e13123e82..60aaec59d2 100644
--- a/client/util.go
+++ b/client/util.go
@@ -11,6 +11,7 @@ import (
"strings"
"github.com/lxc/lxd/shared"
+ "github.com/mdlayher/vsock"
)
func tlsHTTPClient(client *http.Client, tlsClientCert string, tlsClientKey string, tlsCA string, tlsServerCert string, insecureSkipVerify bool, proxy func(req *http.Request) (*url.URL, error)) (*http.Client, error) {
@@ -103,6 +104,15 @@ func tlsHTTPClient(client *http.Client, tlsClientCert string, tlsClientKey strin
return client, nil
}
+func vsockHTTPClient(client *http.Client, vsockID int) (*http.Client, error) {
+ // Setup a VM socket dialer
+ vsockDial := func(network, addr string) (net.Conn, error) {
+ return vsock.Dial(uint32(vsockID), 8443)
+ }
+
+ return socketHTTPClient(vsockDial, client)
+}
+
func unixHTTPClient(client *http.Client, path string) (*http.Client, error) {
// Setup a Unix socket dialer
unixDial := func(network, addr string) (net.Conn, error) {
@@ -114,9 +124,13 @@ func unixHTTPClient(client *http.Client, path string) (*http.Client, error) {
return net.DialUnix("unix", nil, raddr)
}
+ return socketHTTPClient(unixDial, client)
+}
+
+func socketHTTPClient(dial func(network, addr string) (net.Conn, error), client *http.Client) (*http.Client, error) {
// Define the http transport
transport := &http.Transport{
- Dial: unixDial,
+ Dial: dial,
DisableKeepAlives: true,
}
From f14827696d41953a5734f7998b436290ff523a1a Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Wed, 25 Sep 2019 20:44:32 +0200
Subject: [PATCH 3/3] lxd-agent: Add basic structure
Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
lxd-agent/api.go | 25 ++
lxd-agent/api_1.0.go | 57 +++++
lxd-agent/exec.go | 14 ++
lxd-agent/file.go | 16 ++
lxd-agent/main.go | 103 ++++++++
lxd-agent/operations.go | 539 ++++++++++++++++++++++++++++++++++++++++
lxd-agent/response.go | 478 +++++++++++++++++++++++++++++++++++
lxd-agent/state.go | 19 ++
8 files changed, 1251 insertions(+)
create mode 100644 lxd-agent/api.go
create mode 100644 lxd-agent/api_1.0.go
create mode 100644 lxd-agent/exec.go
create mode 100644 lxd-agent/file.go
create mode 100644 lxd-agent/main.go
create mode 100644 lxd-agent/operations.go
create mode 100644 lxd-agent/response.go
create mode 100644 lxd-agent/state.go
diff --git a/lxd-agent/api.go b/lxd-agent/api.go
new file mode 100644
index 0000000000..166c8f0532
--- /dev/null
+++ b/lxd-agent/api.go
@@ -0,0 +1,25 @@
+package main
+
+import "net/http"
+
+// APIEndpoint represents a URL in our API.
+type APIEndpoint struct {
+ Name string // Name for this endpoint.
+ Path string // Path pattern for this endpoint.
+ Get APIEndpointAction
+ Put APIEndpointAction
+ Post APIEndpointAction
+ Delete APIEndpointAction
+ Patch APIEndpointAction
+}
+
+// APIEndpointAlias represents an alias URL of and APIEndpoint in our API.
+type APIEndpointAlias struct {
+ Name string // Name for this alias.
+ Path string // Path pattern for this alias.
+}
+
+// APIEndpointAction represents an action on an API endpoint.
+type APIEndpointAction struct {
+ Handler func(r *http.Request) Response
+}
diff --git a/lxd-agent/api_1.0.go b/lxd-agent/api_1.0.go
new file mode 100644
index 0000000000..633bc5060f
--- /dev/null
+++ b/lxd-agent/api_1.0.go
@@ -0,0 +1,57 @@
+package main
+
+import (
+ "net/http"
+ "os"
+
+ "github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/api"
+ "github.com/lxc/lxd/shared/version"
+)
+
+var api10Cmd = APIEndpoint{
+ Get: APIEndpointAction{Handler: api10Get},
+}
+
+var api10 = []APIEndpoint{
+ execCmd,
+ fileCmd,
+ operationsCmd,
+ stateCmd,
+}
+
+func api10Get(r *http.Request) Response {
+ srv := api.ServerUntrusted{
+ APIExtensions: version.APIExtensions, // FIXME: use own API extensions
+ APIStatus: "stable",
+ APIVersion: version.APIVersion, // FIXME: use own API version
+ Public: false,
+ Auth: "trusted",
+ AuthMethods: []string{"tls"},
+ }
+
+ uname, err := shared.Uname()
+ if err != nil {
+ return InternalError(err)
+ }
+
+ serverName, err := os.Hostname()
+ if err != nil {
+ return SmartError(err)
+ }
+
+ env := api.ServerEnvironment{
+ Kernel: uname.Sysname,
+ KernelArchitecture: uname.Machine,
+ KernelVersion: uname.Release,
+ Server: "lxd-agent",
+ ServerPid: os.Getpid(),
+ ServerVersion: version.Version,
+ ServerName: serverName,
+ }
+
+ fullSrv := api.Server{ServerUntrusted: srv}
+ fullSrv.Environment = env
+
+ return SyncResponseETag(true, fullSrv, fullSrv)
+}
diff --git a/lxd-agent/exec.go b/lxd-agent/exec.go
new file mode 100644
index 0000000000..e3968dce0e
--- /dev/null
+++ b/lxd-agent/exec.go
@@ -0,0 +1,14 @@
+package main
+
+import "net/http"
+
+var execCmd = APIEndpoint{
+ Name: "exec",
+ Path: "exec",
+
+ Post: APIEndpointAction{Handler: execPost},
+}
+
+func execPost(r *http.Request) Response {
+ return NotImplemented(nil)
+}
diff --git a/lxd-agent/file.go b/lxd-agent/file.go
new file mode 100644
index 0000000000..bb819d00ff
--- /dev/null
+++ b/lxd-agent/file.go
@@ -0,0 +1,16 @@
+package main
+
+import "net/http"
+
+var fileCmd = APIEndpoint{
+ Name: "file",
+ Path: "files",
+
+ Get: APIEndpointAction{Handler: fileHandler},
+ Post: APIEndpointAction{Handler: fileHandler},
+ Delete: APIEndpointAction{Handler: fileHandler},
+}
+
+func fileHandler(r *http.Request) Response {
+ return NotImplemented(nil)
+}
diff --git a/lxd-agent/main.go b/lxd-agent/main.go
new file mode 100644
index 0000000000..2d99decd1c
--- /dev/null
+++ b/lxd-agent/main.go
@@ -0,0 +1,103 @@
+package main
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+
+ "github.com/gorilla/mux"
+ "github.com/lxc/lxd/lxd/util"
+ "github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/logger"
+)
+
+// FIXME: Make this settable
+var debug bool
+
+func main() {
+ mux := mux.NewRouter()
+ mux.StrictSlash(false)
+
+ mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ SyncResponse(true, []string{"/1.0"}).Render(w)
+ })
+
+ for _, c := range api10 {
+ createCmd(mux, "1.0", c)
+ }
+
+ // FIXME: Use ListenAndServeTLS once we know the location of the cert and keyfile
+ log.Println(http.ListenAndServe(":8443", mux))
+}
+
+func createCmd(restAPI *mux.Router, version string, c APIEndpoint) {
+ var uri string
+ if c.Path == "" {
+ uri = fmt.Sprintf("/%s", version)
+ } else {
+ uri = fmt.Sprintf("/%s/%s", version, c.Path)
+ }
+
+ route := restAPI.HandleFunc(uri, func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+
+ // Dump full request JSON when in debug mode
+ if debug && r.Method != "GET" && util.IsJSONRequest(r) {
+ newBody := &bytes.Buffer{}
+ captured := &bytes.Buffer{}
+ multiW := io.MultiWriter(newBody, captured)
+ if _, err := io.Copy(multiW, r.Body); err != nil {
+ InternalError(err).Render(w)
+ return
+ }
+
+ r.Body = shared.BytesReadCloser{Buf: newBody}
+ shared.DebugJson(captured)
+ }
+
+ // Actually process the request
+ var resp Response
+ resp = NotImplemented(nil)
+
+ handleRequest := func(action APIEndpointAction) Response {
+ if action.Handler == nil {
+ return NotImplemented(nil)
+ }
+
+ return action.Handler(r)
+ }
+
+ switch r.Method {
+ case "GET":
+ resp = handleRequest(c.Get)
+ case "PUT":
+ resp = handleRequest(c.Put)
+ case "POST":
+ resp = handleRequest(c.Post)
+ case "DELETE":
+ resp = handleRequest(c.Delete)
+ case "PATCH":
+ resp = handleRequest(c.Patch)
+ default:
+ resp = NotFound(fmt.Errorf("Method '%s' not found", r.Method))
+ }
+
+ // Handle errors
+ err := resp.Render(w)
+ if err != nil {
+ err := InternalError(err).Render(w)
+ if err != nil {
+ logger.Errorf("Failed writing error for error, giving up")
+ }
+ }
+ })
+
+ // If the endpoint has a canonical name then record it so it can be used to build URLS
+ // and accessed in the context of the request by the handler function.
+ if c.Name != "" {
+ route.Name(c.Name)
+ }
+}
diff --git a/lxd-agent/operations.go b/lxd-agent/operations.go
new file mode 100644
index 0000000000..0723067bac
--- /dev/null
+++ b/lxd-agent/operations.go
@@ -0,0 +1,539 @@
+package main
+
+import (
+ "fmt"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/gorilla/websocket"
+ "github.com/pborman/uuid"
+ "github.com/pkg/errors"
+
+ "github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/events"
+ "github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/api"
+ "github.com/lxc/lxd/shared/cancel"
+ "github.com/lxc/lxd/shared/logger"
+ "github.com/lxc/lxd/shared/version"
+)
+
+var operationCmd = APIEndpoint{
+ Path: "operations/{id}",
+
+ Delete: APIEndpointAction{Handler: operationDelete},
+ Get: APIEndpointAction{Handler: operationGet},
+}
+
+var operationsCmd = APIEndpoint{
+ Path: "operations",
+
+ Get: APIEndpointAction{Handler: operationsGet},
+}
+
+var operationWebsocket = APIEndpoint{
+ Path: "operations/{id}/websocket",
+
+ Get: APIEndpointAction{Handler: operationWebsocketGet},
+}
+
+func operationDelete(r *http.Request) Response {
+ return NotImplemented(nil)
+}
+
+func operationGet(r *http.Request) Response {
+ return NotImplemented(nil)
+}
+
+func operationsGet(r *http.Request) Response {
+ return NotImplemented(nil)
+}
+
+func operationWebsocketGet(r *http.Request) Response {
+ return NotImplemented(nil)
+}
+
+var operationsLock sync.Mutex
+var operations map[string]*operation = make(map[string]*operation)
+
+type operationClass int
+
+const (
+ operationClassTask operationClass = 1
+ operationClassWebsocket operationClass = 2
+ operationClassToken operationClass = 3
+)
+
+func (t operationClass) String() string {
+ return map[operationClass]string{
+ operationClassTask: "task",
+ operationClassWebsocket: "websocket",
+ operationClassToken: "token",
+ }[t]
+}
+
+type operation struct {
+ project string
+ id string
+ class operationClass
+ createdAt time.Time
+ updatedAt time.Time
+ status api.StatusCode
+ url string
+ resources map[string][]string
+ metadata map[string]interface{}
+ err string
+ readonly bool
+ canceler *cancel.Canceler
+ description string
+ permission string
+
+ // Those functions are called at various points in the operation lifecycle
+ onRun func(*operation) error
+ onCancel func(*operation) error
+ onConnect func(*operation, *http.Request, http.ResponseWriter) error
+
+ // Channels used for error reporting and state tracking of background actions
+ chanDone chan error
+
+ // Locking for concurent access to the operation
+ lock sync.Mutex
+
+ cluster *db.Cluster
+}
+
+func (op *operation) done() {
+ if op.readonly {
+ return
+ }
+
+ op.lock.Lock()
+ op.readonly = true
+ op.onRun = nil
+ op.onCancel = nil
+ op.onConnect = nil
+ close(op.chanDone)
+ op.lock.Unlock()
+
+ time.AfterFunc(time.Second*5, func() {
+ operationsLock.Lock()
+ _, ok := operations[op.id]
+ if !ok {
+ operationsLock.Unlock()
+ return
+ }
+
+ delete(operations, op.id)
+ operationsLock.Unlock()
+
+ err := op.cluster.Transaction(func(tx *db.ClusterTx) error {
+ return tx.OperationRemove(op.id)
+ })
+ if err != nil {
+ logger.Warnf("Failed to delete operation %s: %s", op.id, err)
+ }
+ })
+}
+
+func (op *operation) Run() (chan error, error) {
+ if op.status != api.Pending {
+ return nil, fmt.Errorf("Only pending operations can be started")
+ }
+
+ chanRun := make(chan error, 1)
+
+ op.lock.Lock()
+ op.status = api.Running
+
+ if op.onRun != nil {
+ go func(op *operation, chanRun chan error) {
+ err := op.onRun(op)
+ if err != nil {
+ op.lock.Lock()
+ op.status = api.Failure
+ op.err = SmartError(err).String()
+ op.lock.Unlock()
+ op.done()
+ chanRun <- err
+
+ logger.Debugf("Failure for %s operation: %s: %s", op.class.String(), op.id, err)
+
+ _, md, _ := op.Render()
+ events.Send(op.project, "operation", md)
+ return
+ }
+
+ op.lock.Lock()
+ op.status = api.Success
+ op.lock.Unlock()
+ op.done()
+ chanRun <- nil
+
+ op.lock.Lock()
+ logger.Debugf("Success for %s operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "operation", md)
+ op.lock.Unlock()
+ }(op, chanRun)
+ }
+ op.lock.Unlock()
+
+ logger.Debugf("Started %s operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "operation", md)
+
+ return chanRun, nil
+}
+
+func (op *operation) Cancel() (chan error, error) {
+ if op.status != api.Running {
+ return nil, fmt.Errorf("Only running operations can be cancelled")
+ }
+
+ if !op.mayCancel() {
+ return nil, fmt.Errorf("This operation can't be cancelled")
+ }
+
+ chanCancel := make(chan error, 1)
+
+ op.lock.Lock()
+ oldStatus := op.status
+ op.status = api.Cancelling
+ op.lock.Unlock()
+
+ if op.onCancel != nil {
+ go func(op *operation, oldStatus api.StatusCode, chanCancel chan error) {
+ err := op.onCancel(op)
+ if err != nil {
+ op.lock.Lock()
+ op.status = oldStatus
+ op.lock.Unlock()
+ chanCancel <- err
+
+ logger.Debugf("Failed to cancel %s operation: %s: %s", op.class.String(), op.id, err)
+ _, md, _ := op.Render()
+ events.Send(op.project, "operation", md)
+ return
+ }
+
+ op.lock.Lock()
+ op.status = api.Cancelled
+ op.lock.Unlock()
+ op.done()
+ chanCancel <- nil
+
+ logger.Debugf("Cancelled %s operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "operation", md)
+ }(op, oldStatus, chanCancel)
+ }
+
+ logger.Debugf("Cancelling %s operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "operation", md)
+
+ if op.canceler != nil {
+ err := op.canceler.Cancel()
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ if op.onCancel == nil {
+ op.lock.Lock()
+ op.status = api.Cancelled
+ op.lock.Unlock()
+ op.done()
+ chanCancel <- nil
+ }
+
+ logger.Debugf("Cancelled %s operation: %s", op.class.String(), op.id)
+ _, md, _ = op.Render()
+ events.Send(op.project, "operation", md)
+
+ return chanCancel, nil
+}
+
+func (op *operation) Connect(r *http.Request, w http.ResponseWriter) (chan error, error) {
+ if op.class != operationClassWebsocket {
+ return nil, fmt.Errorf("Only websocket operations can be connected")
+ }
+
+ if op.status != api.Running {
+ return nil, fmt.Errorf("Only running operations can be connected")
+ }
+
+ chanConnect := make(chan error, 1)
+
+ op.lock.Lock()
+
+ go func(op *operation, chanConnect chan error) {
+ err := op.onConnect(op, r, w)
+ if err != nil {
+ chanConnect <- err
+
+ logger.Debugf("Failed to handle %s operation: %s: %s", op.class.String(), op.id, err)
+ return
+ }
+
+ chanConnect <- nil
+
+ logger.Debugf("Handled %s operation: %s", op.class.String(), op.id)
+ }(op, chanConnect)
+ op.lock.Unlock()
+
+ logger.Debugf("Connected %s operation: %s", op.class.String(), op.id)
+
+ return chanConnect, nil
+}
+
+func (op *operation) mayCancel() bool {
+ if op.class == operationClassToken {
+ return true
+ }
+
+ if op.onCancel != nil {
+ return true
+ }
+
+ if op.canceler != nil && op.canceler.Cancelable() {
+ return true
+ }
+
+ return false
+}
+
+func (op *operation) Render() (string, *api.Operation, error) {
+ // Setup the resource URLs
+ resources := op.resources
+ if resources != nil {
+ tmpResources := make(map[string][]string)
+ for key, value := range resources {
+ var values []string
+ for _, c := range value {
+ values = append(values, fmt.Sprintf("/%s/%s/%s", version.APIVersion, key, c))
+ }
+ tmpResources[key] = values
+ }
+ resources = tmpResources
+ }
+
+ // Local server name
+ var err error
+ var serverName string
+ err = op.cluster.Transaction(func(tx *db.ClusterTx) error {
+ serverName, err = tx.NodeName()
+ return err
+ })
+ if err != nil {
+ return "", nil, err
+ }
+
+ return op.url, &api.Operation{
+ ID: op.id,
+ Class: op.class.String(),
+ Description: op.description,
+ CreatedAt: op.createdAt,
+ UpdatedAt: op.updatedAt,
+ Status: op.status.String(),
+ StatusCode: op.status,
+ Resources: resources,
+ Metadata: op.metadata,
+ MayCancel: op.mayCancel(),
+ Err: op.err,
+ Location: serverName,
+ }, nil
+}
+
+func (op *operation) WaitFinal(timeout int) (bool, error) {
+ // Check current state
+ if op.status.IsFinal() {
+ return true, nil
+ }
+
+ // Wait indefinitely
+ if timeout == -1 {
+ <-op.chanDone
+ return true, nil
+ }
+
+ // Wait until timeout
+ if timeout > 0 {
+ timer := time.NewTimer(time.Duration(timeout) * time.Second)
+ select {
+ case <-op.chanDone:
+ return true, nil
+
+ case <-timer.C:
+ return false, nil
+ }
+ }
+
+ return false, nil
+}
+
+func (op *operation) UpdateResources(opResources map[string][]string) error {
+ if op.status != api.Pending && op.status != api.Running {
+ return fmt.Errorf("Only pending or running operations can be updated")
+ }
+
+ if op.readonly {
+ return fmt.Errorf("Read-only operations can't be updated")
+ }
+
+ op.lock.Lock()
+ op.updatedAt = time.Now()
+ op.resources = opResources
+ op.lock.Unlock()
+
+ logger.Debugf("Updated resources for %s operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "operation", md)
+
+ return nil
+}
+
+func (op *operation) UpdateMetadata(opMetadata interface{}) error {
+ if op.status != api.Pending && op.status != api.Running {
+ return fmt.Errorf("Only pending or running operations can be updated")
+ }
+
+ if op.readonly {
+ return fmt.Errorf("Read-only operations can't be updated")
+ }
+
+ newMetadata, err := shared.ParseMetadata(opMetadata)
+ if err != nil {
+ return err
+ }
+
+ op.lock.Lock()
+ op.updatedAt = time.Now()
+ op.metadata = newMetadata
+ op.lock.Unlock()
+
+ logger.Debugf("Updated metadata for %s operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "operation", md)
+
+ return nil
+}
+
+func operationCreate(cluster *db.Cluster, project string, opClass operationClass, opType db.OperationType, opResources map[string][]string, opMetadata interface{}, onRun func(*operation) error, onCancel func(*operation) error, onConnect func(*operation, *http.Request, http.ResponseWriter) error) (*operation, error) {
+ // Main attributes
+ op := operation{}
+ op.project = project
+ op.id = uuid.NewRandom().String()
+ op.description = opType.Description()
+ op.permission = opType.Permission()
+ op.class = opClass
+ op.createdAt = time.Now()
+ op.updatedAt = op.createdAt
+ op.status = api.Pending
+ op.url = fmt.Sprintf("/%s/operations/%s", version.APIVersion, op.id)
+ op.resources = opResources
+ op.chanDone = make(chan error)
+ op.cluster = cluster
+
+ newMetadata, err := shared.ParseMetadata(opMetadata)
+ if err != nil {
+ return nil, err
+ }
+ op.metadata = newMetadata
+
+ // Callback functions
+ op.onRun = onRun
+ op.onCancel = onCancel
+ op.onConnect = onConnect
+
+ // Sanity check
+ if op.class != operationClassWebsocket && op.onConnect != nil {
+ return nil, fmt.Errorf("Only websocket operations can have a Connect hook")
+ }
+
+ if op.class == operationClassWebsocket && op.onConnect == nil {
+ return nil, fmt.Errorf("Websocket operations must have a Connect hook")
+ }
+
+ if op.class == operationClassToken && op.onRun != nil {
+ return nil, fmt.Errorf("Token operations can't have a Run hook")
+ }
+
+ if op.class == operationClassToken && op.onCancel != nil {
+ return nil, fmt.Errorf("Token operations can't have a Cancel hook")
+ }
+
+ operationsLock.Lock()
+ operations[op.id] = &op
+ operationsLock.Unlock()
+
+ err = op.cluster.Transaction(func(tx *db.ClusterTx) error {
+ _, err := tx.OperationAdd(project, op.id, opType)
+ return err
+ })
+ if err != nil {
+ return nil, errors.Wrapf(err, "failed to add operation %s to database", op.id)
+ }
+
+ logger.Debugf("New %s operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "operation", md)
+
+ return &op, nil
+}
+
+func operationGetInternal(id string) (*operation, error) {
+ operationsLock.Lock()
+ op, ok := operations[id]
+ operationsLock.Unlock()
+
+ if !ok {
+ return nil, fmt.Errorf("Operation '%s' doesn't exist", id)
+ }
+
+ return op, nil
+}
+
+type operationWebSocket struct {
+ req *http.Request
+ op *operation
+}
+
+func (r *operationWebSocket) Render(w http.ResponseWriter) error {
+ chanErr, err := r.op.Connect(r.req, w)
+ if err != nil {
+ return err
+ }
+
+ err = <-chanErr
+ return err
+}
+
+func (r *operationWebSocket) String() string {
+ _, md, err := r.op.Render()
+ if err != nil {
+ return fmt.Sprintf("error: %s", err)
+ }
+
+ return md.ID
+}
+
+type forwardedOperationWebSocket struct {
+ req *http.Request
+ id string
+ source *websocket.Conn // Connection to the node were the operation is running
+}
+
+func (r *forwardedOperationWebSocket) Render(w http.ResponseWriter) error {
+ target, err := shared.WebsocketUpgrader.Upgrade(w, r.req, nil)
+ if err != nil {
+ return err
+ }
+ <-shared.WebsocketProxy(r.source, target)
+ return nil
+}
+
+func (r *forwardedOperationWebSocket) String() string {
+ return r.id
+}
diff --git a/lxd-agent/response.go b/lxd-agent/response.go
new file mode 100644
index 0000000000..ebd964b040
--- /dev/null
+++ b/lxd-agent/response.go
@@ -0,0 +1,478 @@
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+ "mime/multipart"
+ "net/http"
+ "os"
+ "time"
+
+ "github.com/pkg/errors"
+
+ lxd "github.com/lxc/lxd/client"
+ "github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/util"
+ "github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/api"
+ "github.com/lxc/lxd/shared/version"
+)
+
+type Response interface {
+ Render(w http.ResponseWriter) error
+ String() string
+}
+
+// Sync response
+type syncResponse struct {
+ success bool
+ etag interface{}
+ metadata interface{}
+ location string
+ code int
+ headers map[string]string
+}
+
+func (r *syncResponse) Render(w http.ResponseWriter) error {
+ // Set an appropriate ETag header
+ if r.etag != nil {
+ etag, err := util.EtagHash(r.etag)
+ if err == nil {
+ w.Header().Set("ETag", etag)
+ }
+ }
+
+ // Prepare the JSON response
+ status := api.Success
+ if !r.success {
+ status = api.Failure
+ }
+
+ if r.headers != nil {
+ for h, v := range r.headers {
+ w.Header().Set(h, v)
+ }
+ }
+
+ if r.location != "" {
+ w.Header().Set("Location", r.location)
+ code := r.code
+ if code == 0 {
+ code = 201
+ }
+ w.WriteHeader(code)
+ }
+
+ resp := api.ResponseRaw{
+ Type: api.SyncResponse,
+ Status: status.String(),
+ StatusCode: int(status),
+ Metadata: r.metadata,
+ }
+
+ return util.WriteJSON(w, resp, debug)
+}
+
+func (r *syncResponse) String() string {
+ if r.success {
+ return "success"
+ }
+
+ return "failure"
+}
+
+func SyncResponse(success bool, metadata interface{}) Response {
+ return &syncResponse{success: success, metadata: metadata}
+}
+
+func SyncResponseETag(success bool, metadata interface{}, etag interface{}) Response {
+ return &syncResponse{success: success, metadata: metadata, etag: etag}
+}
+
+func SyncResponseLocation(success bool, metadata interface{}, location string) Response {
+ return &syncResponse{success: success, metadata: metadata, location: location}
+}
+
+func SyncResponseRedirect(address string) Response {
+ return &syncResponse{success: true, location: address, code: http.StatusPermanentRedirect}
+}
+
+func SyncResponseHeaders(success bool, metadata interface{}, headers map[string]string) Response {
+ return &syncResponse{success: success, metadata: metadata, headers: headers}
+}
+
+var EmptySyncResponse = &syncResponse{success: true, metadata: make(map[string]interface{})}
+
+type forwardedResponse struct {
+ client lxd.InstanceServer
+ request *http.Request
+}
+
+func (r *forwardedResponse) Render(w http.ResponseWriter) error {
+ info, err := r.client.GetConnectionInfo()
+ if err != nil {
+ return err
+ }
+
+ url := fmt.Sprintf("%s%s", info.Addresses[0], r.request.URL.RequestURI())
+ forwarded, err := http.NewRequest(r.request.Method, url, r.request.Body)
+ if err != nil {
+ return err
+ }
+ for key := range r.request.Header {
+ forwarded.Header.Set(key, r.request.Header.Get(key))
+ }
+
+ httpClient, err := r.client.GetHTTPClient()
+ if err != nil {
+ return err
+ }
+ response, err := httpClient.Do(forwarded)
+ if err != nil {
+ return err
+ }
+
+ for key := range response.Header {
+ w.Header().Set(key, response.Header.Get(key))
+ }
+
+ w.WriteHeader(response.StatusCode)
+ _, err = io.Copy(w, response.Body)
+ return err
+}
+
+func (r *forwardedResponse) String() string {
+ return fmt.Sprintf("request to %s", r.request.URL)
+}
+
+// ForwardedResponse takes a request directed to a node and forwards it to
+// another node, writing back the response it gegs.
+func ForwardedResponse(client lxd.InstanceServer, request *http.Request) Response {
+ return &forwardedResponse{
+ client: client,
+ request: request,
+ }
+}
+
+// File transfer response
+type fileResponseEntry struct {
+ identifier string
+ path string
+ filename string
+ buffer []byte /* either a path or a buffer must be provided */
+}
+
+type fileResponse struct {
+ req *http.Request
+ files []fileResponseEntry
+ headers map[string]string
+ removeAfterServe bool
+}
+
+func (r *fileResponse) Render(w http.ResponseWriter) error {
+ if r.headers != nil {
+ for k, v := range r.headers {
+ w.Header().Set(k, v)
+ }
+ }
+
+ // No file, well, it's easy then
+ if len(r.files) == 0 {
+ return nil
+ }
+
+ // For a single file, return it inline
+ if len(r.files) == 1 {
+ var rs io.ReadSeeker
+ var mt time.Time
+ var sz int64
+
+ if r.files[0].path == "" {
+ rs = bytes.NewReader(r.files[0].buffer)
+ mt = time.Now()
+ sz = int64(len(r.files[0].buffer))
+ } else {
+ f, err := os.Open(r.files[0].path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+
+ fi, err := f.Stat()
+ if err != nil {
+ return err
+ }
+
+ mt = fi.ModTime()
+ sz = fi.Size()
+ rs = f
+ }
+
+ w.Header().Set("Content-Type", "application/octet-stream")
+ w.Header().Set("Content-Length", fmt.Sprintf("%d", sz))
+ w.Header().Set("Content-Disposition", fmt.Sprintf("inline;filename=%s", r.files[0].filename))
+
+ http.ServeContent(w, r.req, r.files[0].filename, mt, rs)
+ if r.files[0].path != "" && r.removeAfterServe {
+ err := os.Remove(r.files[0].path)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+ }
+
+ // Now the complex multipart answer
+ body := &bytes.Buffer{}
+ mw := multipart.NewWriter(body)
+
+ for _, entry := range r.files {
+ var rd io.Reader
+ if entry.path != "" {
+ fd, err := os.Open(entry.path)
+ if err != nil {
+ return err
+ }
+ defer fd.Close()
+ rd = fd
+ } else {
+ rd = bytes.NewReader(entry.buffer)
+ }
+
+ fw, err := mw.CreateFormFile(entry.identifier, entry.filename)
+ if err != nil {
+ return err
+ }
+
+ _, err = io.Copy(fw, rd)
+ if err != nil {
+ return err
+ }
+ }
+ mw.Close()
+
+ w.Header().Set("Content-Type", mw.FormDataContentType())
+ w.Header().Set("Content-Length", fmt.Sprintf("%d", body.Len()))
+
+ _, err := io.Copy(w, body)
+ return err
+}
+
+func (r *fileResponse) String() string {
+ return fmt.Sprintf("%d files", len(r.files))
+}
+
+func FileResponse(r *http.Request, files []fileResponseEntry, headers map[string]string, removeAfterServe bool) Response {
+ return &fileResponse{r, files, headers, removeAfterServe}
+}
+
+// Operation response
+type operationResponse struct {
+ op *operation
+}
+
+func (r *operationResponse) Render(w http.ResponseWriter) error {
+ _, err := r.op.Run()
+ if err != nil {
+ return err
+ }
+
+ url, md, err := r.op.Render()
+ if err != nil {
+ return err
+ }
+
+ body := api.ResponseRaw{
+ Type: api.AsyncResponse,
+ Status: api.OperationCreated.String(),
+ StatusCode: int(api.OperationCreated),
+ Operation: url,
+ Metadata: md,
+ }
+
+ w.Header().Set("Location", url)
+ w.WriteHeader(202)
+
+ return util.WriteJSON(w, body, debug)
+}
+
+func (r *operationResponse) String() string {
+ _, md, err := r.op.Render()
+ if err != nil {
+ return fmt.Sprintf("error: %s", err)
+ }
+
+ return md.ID
+}
+
+func OperationResponse(op *operation) Response {
+ return &operationResponse{op}
+}
+
+// Forwarded operation response.
+//
+// Returned when the operation has been created on another node
+type forwardedOperationResponse struct {
+ op *api.Operation
+ project string
+}
+
+func (r *forwardedOperationResponse) Render(w http.ResponseWriter) error {
+ url := fmt.Sprintf("/%s/operations/%s", version.APIVersion, r.op.ID)
+ if r.project != "" {
+ url += fmt.Sprintf("?project=%s", r.project)
+ }
+
+ body := api.ResponseRaw{
+ Type: api.AsyncResponse,
+ Status: api.OperationCreated.String(),
+ StatusCode: int(api.OperationCreated),
+ Operation: url,
+ Metadata: r.op,
+ }
+
+ w.Header().Set("Location", url)
+ w.WriteHeader(202)
+
+ return util.WriteJSON(w, body, debug)
+}
+
+func (r *forwardedOperationResponse) String() string {
+ return r.op.ID
+}
+
+// ForwardedOperationResponse creates a response that forwards the metadata of
+// an operation created on another node.
+func ForwardedOperationResponse(project string, op *api.Operation) Response {
+ return &forwardedOperationResponse{
+ op: op,
+ project: project,
+ }
+}
+
+// Error response
+type errorResponse struct {
+ code int
+ msg string
+}
+
+func (r *errorResponse) String() string {
+ return r.msg
+}
+
+func (r *errorResponse) Render(w http.ResponseWriter) error {
+ var output io.Writer
+
+ buf := &bytes.Buffer{}
+ output = buf
+ var captured *bytes.Buffer
+ if debug {
+ captured = &bytes.Buffer{}
+ output = io.MultiWriter(buf, captured)
+ }
+
+ err := json.NewEncoder(output).Encode(shared.Jmap{"type": api.ErrorResponse, "error": r.msg, "error_code": r.code})
+
+ if err != nil {
+ return err
+ }
+
+ if debug {
+ shared.DebugJson(captured)
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.Header().Set("X-Content-Type-Options", "nosniff")
+ w.WriteHeader(r.code)
+ fmt.Fprintln(w, buf.String())
+
+ return nil
+}
+
+func NotImplemented(err error) Response {
+ message := "not implemented"
+ if err != nil {
+ message = err.Error()
+ }
+ return &errorResponse{http.StatusNotImplemented, message}
+}
+
+func NotFound(err error) Response {
+ message := "not found"
+ if err != nil {
+ message = err.Error()
+ }
+ return &errorResponse{http.StatusNotFound, message}
+}
+
+func Forbidden(err error) Response {
+ message := "not authorized"
+ if err != nil {
+ message = err.Error()
+ }
+ return &errorResponse{http.StatusForbidden, message}
+}
+
+func Conflict(err error) Response {
+ message := "already exists"
+ if err != nil {
+ message = err.Error()
+ }
+ return &errorResponse{http.StatusConflict, message}
+}
+
+func Unavailable(err error) Response {
+ message := "unavailable"
+ if err != nil {
+ message = err.Error()
+ }
+ return &errorResponse{http.StatusServiceUnavailable, message}
+}
+
+func BadRequest(err error) Response {
+ return &errorResponse{http.StatusBadRequest, err.Error()}
+}
+
+func InternalError(err error) Response {
+ return &errorResponse{http.StatusInternalServerError, err.Error()}
+}
+
+func PreconditionFailed(err error) Response {
+ return &errorResponse{http.StatusPreconditionFailed, err.Error()}
+}
+
+/*
+ * SmartError returns the right error message based on err.
+ */
+func SmartError(err error) Response {
+ if err == nil {
+ return EmptySyncResponse
+ }
+
+ switch errors.Cause(err) {
+ case os.ErrNotExist, db.ErrNoSuchObject:
+ if errors.Cause(err) != err {
+ return NotFound(err)
+ }
+
+ return NotFound(nil)
+ case os.ErrPermission:
+ if errors.Cause(err) != err {
+ return Forbidden(err)
+ }
+
+ return Forbidden(nil)
+ case db.ErrAlreadyDefined:
+ if errors.Cause(err) != err {
+ return Conflict(err)
+ }
+
+ return Conflict(nil)
+
+ default:
+ return InternalError(err)
+ }
+}
diff --git a/lxd-agent/state.go b/lxd-agent/state.go
new file mode 100644
index 0000000000..e0d1806b08
--- /dev/null
+++ b/lxd-agent/state.go
@@ -0,0 +1,19 @@
+package main
+
+import "net/http"
+
+var stateCmd = APIEndpoint{
+ Name: "state",
+ Path: "state",
+
+ Get: APIEndpointAction{Handler: stateGet},
+ Put: APIEndpointAction{Handler: statePut},
+}
+
+func stateGet(r *http.Request) Response {
+ return NotImplemented(nil)
+}
+
+func statePut(r *http.Request) Response {
+ return NotImplemented(nil)
+}
More information about the lxc-devel
mailing list