[lxc-devel] [lxd/master] Move operations to its own package
monstermunchkin on Github
lxc-bot at linuxcontainers.org
Fri Sep 27 09:22:01 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190927/ae999fed/attachment-0001.bin>
-------------- next part --------------
From ff5b7ed6886a70fd0e2349c5bd08cce08ef50939 Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Fri, 27 Sep 2019 11:20:06 +0200
Subject: [PATCH 1/2] lxd: Move operations to its own package
This moves the operations code to its own operations package.
Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
lxd/api_cluster.go | 13 +-
lxd/api_project.go | 7 +-
lxd/backup.go | 6 +-
lxd/container.go | 9 +-
lxd/container_backup.go | 19 +-
lxd/container_console.go | 11 +-
lxd/container_delete.go | 7 +-
lxd/container_exec.go | 21 +-
lxd/container_instance_types.go | 5 +-
lxd/container_lxc.go | 7 +-
lxd/container_post.go | 27 +-
lxd/container_put.go | 11 +-
lxd/container_snapshot.go | 37 +--
lxd/container_state.go | 21 +-
lxd/containers_post.go | 37 +--
lxd/daemon_images.go | 9 +-
lxd/images.go | 55 ++--
lxd/instance_interface.go | 3 +-
lxd/logging.go | 5 +-
lxd/main.go | 4 +
lxd/migrate.go | 5 +-
lxd/migrate_container.go | 15 +-
lxd/migrate_storage_volumes.go | 5 +-
lxd/operations.go | 493 ++---------------------------
lxd/operations/operations.go | 543 ++++++++++++++++++++++++++++++++
lxd/operations/response.go | 96 ++++++
lxd/operations/websocket.go | 63 ++++
lxd/response.go | 89 ------
lxd/storage.go | 13 +-
lxd/storage_btrfs.go | 5 +-
lxd/storage_ceph.go | 5 +-
lxd/storage_cephfs.go | 5 +-
lxd/storage_dir.go | 5 +-
lxd/storage_lvm.go | 5 +-
lxd/storage_migration.go | 13 +-
lxd/storage_migration_btrfs.go | 5 +-
lxd/storage_migration_ceph.go | 5 +-
lxd/storage_migration_zfs.go | 5 +-
lxd/storage_mock.go | 5 +-
lxd/storage_volumes.go | 31 +-
lxd/storage_volumes_snapshot.go | 27 +-
lxd/storage_zfs.go | 5 +-
42 files changed, 982 insertions(+), 775 deletions(-)
create mode 100644 lxd/operations/operations.go
create mode 100644 lxd/operations/response.go
create mode 100644 lxd/operations/websocket.go
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 927558caab..335e47a7a3 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -20,6 +20,7 @@ import (
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/node"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
storagedriver "github.com/lxc/lxd/lxd/storage"
"github.com/lxc/lxd/lxd/util"
@@ -203,7 +204,7 @@ func clusterPut(d *Daemon, r *http.Request) response.Response {
}
func clusterPutBootstrap(d *Daemon, req api.ClusterPut) response.Response {
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
// The default timeout when non-clustered is one minute, let's
// lower it down now that we'll likely have to make requests
// over the network.
@@ -251,7 +252,7 @@ func clusterPutBootstrap(d *Daemon, req api.ClusterPut) response.Response {
return nil
})
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationClusterBootstrap, resources, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationClusterBootstrap, resources, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
@@ -259,7 +260,7 @@ func clusterPutBootstrap(d *Daemon, req api.ClusterPut) response.Response {
// Add the cluster flag from the agent
version.UserAgentFeatures([]string{"cluster"})
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
@@ -355,7 +356,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
fingerprint := cert.Fingerprint()
// Asynchronously join the cluster.
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
logger.Debug("Running cluster join operation")
// If the user has provided a cluster password, setup the trust
@@ -609,12 +610,12 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) response.Response {
resources := map[string][]string{}
resources["cluster"] = []string{}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationClusterJoin, resources, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationClusterJoin, resources, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
// Disable clustering on a node.
diff --git a/lxd/api_project.go b/lxd/api_project.go
index 473ddefcdb..a0b1eecab2 100644
--- a/lxd/api_project.go
+++ b/lxd/api_project.go
@@ -12,6 +12,7 @@ import (
"github.com/pkg/errors"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
@@ -381,7 +382,7 @@ func projectPost(d *Daemon, r *http.Request) response.Response {
}
// Perform the rename
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
var id int64
err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
project, err := tx.ProjectGet(req.Name)
@@ -423,12 +424,12 @@ func projectPost(d *Daemon, r *http.Request) response.Response {
return nil
}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationProjectRename, nil, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationProjectRename, nil, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func projectDelete(d *Daemon, r *http.Request) response.Response {
diff --git a/lxd/backup.go b/lxd/backup.go
index 16f6b7c9f6..e536b07361 100644
--- a/lxd/backup.go
+++ b/lxd/backup.go
@@ -11,10 +11,12 @@ import (
"time"
"context"
+
"gopkg.in/yaml.v2"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/state"
"github.com/lxc/lxd/lxd/task"
"github.com/lxc/lxd/shared"
@@ -445,11 +447,11 @@ func backupCreateTarball(s *state.State, path string, backup backup) error {
func pruneExpiredContainerBackupsTask(d *Daemon) (task.Func, task.Schedule) {
f := func(ctx context.Context) {
- opRun := func(op *operation) error {
+ opRun := func(op *operations.Operation) error {
return pruneExpiredContainerBackups(ctx, d)
}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationBackupsExpire, nil, nil, opRun, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationBackupsExpire, nil, nil, opRun, nil, nil)
if err != nil {
logger.Error("Failed to start expired backups operation", log.Ctx{"err": err})
return
diff --git a/lxd/container.go b/lxd/container.go
index 8489f31152..32afff77bf 100644
--- a/lxd/container.go
+++ b/lxd/container.go
@@ -22,6 +22,7 @@ import (
"github.com/lxc/lxd/lxd/device/config"
"github.com/lxc/lxd/lxd/events"
"github.com/lxc/lxd/lxd/instance/instancetype"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/state"
"github.com/lxc/lxd/lxd/sys"
"github.com/lxc/lxd/lxd/task"
@@ -1249,11 +1250,11 @@ func autoCreateContainerSnapshotsTask(d *Daemon) (task.Func, task.Schedule) {
return
}
- opRun := func(op *operation) error {
+ opRun := func(op *operations.Operation) error {
return autoCreateContainerSnapshots(ctx, d, instances)
}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationSnapshotCreate, nil, nil, opRun, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationSnapshotCreate, nil, nil, opRun, nil, nil)
if err != nil {
logger.Error("Failed to start create snapshot operation", log.Ctx{"err": err})
return
@@ -1370,11 +1371,11 @@ func pruneExpiredContainerSnapshotsTask(d *Daemon) (task.Func, task.Schedule) {
return
}
- opRun := func(op *operation) error {
+ opRun := func(op *operations.Operation) error {
return pruneExpiredContainerSnapshots(ctx, d, expiredSnapshots)
}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationSnapshotsExpire, nil, nil, opRun, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationSnapshotsExpire, nil, nil, opRun, nil, nil)
if err != nil {
logger.Error("Failed to start expired snapshots operation", log.Ctx{"err": err})
return
diff --git a/lxd/container_backup.go b/lxd/container_backup.go
index 01a3e8f7b2..4f8b8fdd30 100644
--- a/lxd/container_backup.go
+++ b/lxd/container_backup.go
@@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
@@ -156,7 +157,7 @@ func containerBackupsPost(d *Daemon, r *http.Request) response.Response {
fullName := name + shared.SnapshotDelimiter + req.Name
instanceOnly := req.InstanceOnly || req.ContainerOnly
- backup := func(op *operation) error {
+ backup := func(op *operations.Operation) error {
args := db.ContainerBackupArgs{
Name: fullName,
ContainerID: c.Id(),
@@ -178,13 +179,13 @@ func containerBackupsPost(d *Daemon, r *http.Request) response.Response {
resources["containers"] = []string{name}
resources["backups"] = []string{req.Name}
- op, err := operationCreate(d.cluster, project, operationClassTask,
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask,
db.OperationBackupCreate, resources, nil, backup, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func containerBackupGet(d *Daemon, r *http.Request) response.Response {
@@ -253,7 +254,7 @@ func containerBackupPost(d *Daemon, r *http.Request) response.Response {
newName := name + shared.SnapshotDelimiter + req.Name
- rename := func(op *operation) error {
+ rename := func(op *operations.Operation) error {
err := backup.Rename(newName)
if err != nil {
return err
@@ -265,13 +266,13 @@ func containerBackupPost(d *Daemon, r *http.Request) response.Response {
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, project, operationClassTask,
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask,
db.OperationBackupRename, resources, nil, rename, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func containerBackupDelete(d *Daemon, r *http.Request) response.Response {
@@ -299,7 +300,7 @@ func containerBackupDelete(d *Daemon, r *http.Request) response.Response {
return response.SmartError(err)
}
- remove := func(op *operation) error {
+ remove := func(op *operations.Operation) error {
err := backup.Delete()
if err != nil {
return err
@@ -311,13 +312,13 @@ func containerBackupDelete(d *Daemon, r *http.Request) response.Response {
resources := map[string][]string{}
resources["container"] = []string{name}
- op, err := operationCreate(d.cluster, project, operationClassTask,
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask,
db.OperationBackupRemove, resources, nil, remove, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func containerBackupExportGet(d *Daemon, r *http.Request) response.Response {
diff --git a/lxd/container_console.go b/lxd/container_console.go
index 6e7c0c8a46..c43f219642 100644
--- a/lxd/container_console.go
+++ b/lxd/container_console.go
@@ -19,6 +19,7 @@ import (
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/instance/instancetype"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
@@ -71,7 +72,7 @@ func (s *consoleWs) Metadata() interface{} {
return shared.Jmap{"fds": fds}
}
-func (s *consoleWs) Connect(op *operation, r *http.Request, w http.ResponseWriter) error {
+func (s *consoleWs) Connect(op *operations.Operation, r *http.Request, w http.ResponseWriter) error {
secret := r.FormValue("secret")
if secret == "" {
return fmt.Errorf("missing secret")
@@ -112,7 +113,7 @@ func (s *consoleWs) Connect(op *operation, r *http.Request, w http.ResponseWrite
return os.ErrPermission
}
-func (s *consoleWs) Do(op *operation) error {
+func (s *consoleWs) Do(op *operations.Operation) error {
<-s.allConnected
var err error
@@ -290,7 +291,7 @@ func containerConsolePost(d *Daemon, r *http.Request) response.Response {
}
opAPI := op.Get()
- return ForwardedOperationResponse(project, &opAPI)
+ return operations.ForwardedOperationResponse(project, &opAPI)
}
inst, err := instanceLoadByProjectAndName(d.State(), project, name)
@@ -343,13 +344,13 @@ func containerConsolePost(d *Daemon, r *http.Request) response.Response {
resources := map[string][]string{}
resources["containers"] = []string{ws.instance.Name()}
- op, err := operationCreate(d.cluster, project, operationClassWebsocket, db.OperationConsoleShow,
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassWebsocket, db.OperationConsoleShow,
resources, ws.Metadata(), ws.Do, nil, ws.Connect)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func containerConsoleLogGet(d *Daemon, r *http.Request) response.Response {
diff --git a/lxd/container_delete.go b/lxd/container_delete.go
index 4b8626c0b7..cc0a380e5f 100644
--- a/lxd/container_delete.go
+++ b/lxd/container_delete.go
@@ -6,6 +6,7 @@ import (
"github.com/gorilla/mux"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
)
@@ -35,17 +36,17 @@ func containerDelete(d *Daemon, r *http.Request) response.Response {
return response.BadRequest(fmt.Errorf("container is running"))
}
- rmct := func(op *operation) error {
+ rmct := func(op *operations.Operation) error {
return c.Delete()
}
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, project, operationClassTask, db.OperationContainerDelete, resources, nil, rmct, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationContainerDelete, resources, nil, rmct, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index c9ac5c662b..b426355e26 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -20,6 +20,7 @@ import (
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/instance/instancetype"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
@@ -67,7 +68,7 @@ func (s *execWs) Metadata() interface{} {
}
}
-func (s *execWs) Connect(op *operation, r *http.Request, w http.ResponseWriter) error {
+func (s *execWs) Connect(op *operations.Operation, r *http.Request, w http.ResponseWriter) error {
secret := r.FormValue("secret")
if secret == "" {
return fmt.Errorf("missing secret")
@@ -108,7 +109,7 @@ func (s *execWs) Connect(op *operation, r *http.Request, w http.ResponseWriter)
return os.ErrPermission
}
-func (s *execWs) Do(op *operation) error {
+func (s *execWs) Do(op *operations.Operation) error {
<-s.allConnected
var err error
@@ -376,7 +377,7 @@ func containerExecPost(d *Daemon, r *http.Request) response.Response {
}
opAPI := op.Get()
- return ForwardedOperationResponse(project, &opAPI)
+ return operations.ForwardedOperationResponse(project, &opAPI)
}
inst, err := instanceLoadByProjectAndName(d.State(), project, name)
@@ -483,28 +484,28 @@ func containerExecPost(d *Daemon, r *http.Request) response.Response {
resources := map[string][]string{}
resources["containers"] = []string{ws.instance.Name()}
- op, err := operationCreate(d.cluster, project, operationClassWebsocket, db.OperationCommandExec, resources, ws.Metadata(), ws.Do, nil, ws.Connect)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassWebsocket, db.OperationCommandExec, resources, ws.Metadata(), ws.Do, nil, ws.Connect)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
var cmdErr error
var cmdResult int
metadata := shared.Jmap{}
if post.RecordOutput {
// Prepare stdout and stderr recording
- stdout, err := os.OpenFile(filepath.Join(inst.LogPath(), fmt.Sprintf("exec_%s.stdout", op.id)), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
+ stdout, err := os.OpenFile(filepath.Join(inst.LogPath(), fmt.Sprintf("exec_%s.stdout", op.ID())), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return err
}
defer stdout.Close()
- stderr, err := os.OpenFile(filepath.Join(inst.LogPath(), fmt.Sprintf("exec_%s.stderr", op.id)), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
+ stderr, err := os.OpenFile(filepath.Join(inst.LogPath(), fmt.Sprintf("exec_%s.stderr", op.ID())), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return err
}
@@ -535,10 +536,10 @@ func containerExecPost(d *Daemon, r *http.Request) response.Response {
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, project, operationClassTask, db.OperationCommandExec, resources, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationCommandExec, resources, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
diff --git a/lxd/container_instance_types.go b/lxd/container_instance_types.go
index 93beb809e0..085e844779 100644
--- a/lxd/container_instance_types.go
+++ b/lxd/container_instance_types.go
@@ -11,6 +11,7 @@ import (
"gopkg.in/yaml.v2"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/task"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
@@ -74,7 +75,7 @@ func instanceRefreshTypesTask(d *Daemon) (task.Func, task.Schedule) {
// returning in case the context expires.
_, hasCancellationSupport := interface{}(&http.Request{}).(util.ContextAwareRequest)
f := func(ctx context.Context) {
- opRun := func(op *operation) error {
+ opRun := func(op *operations.Operation) error {
if hasCancellationSupport {
return instanceRefreshTypes(ctx, d)
}
@@ -91,7 +92,7 @@ func instanceRefreshTypesTask(d *Daemon) (task.Func, task.Schedule) {
}
}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationInstanceTypesUpdate, nil, nil, opRun, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationInstanceTypesUpdate, nil, nil, opRun, nil, nil)
if err != nil {
logger.Error("Failed to start instance types update operation", log.Ctx{"err": err})
return
diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index 0f8fb885a3..975d30307e 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -33,6 +33,7 @@ import (
"github.com/lxc/lxd/lxd/events"
"github.com/lxc/lxd/lxd/instance/instancetype"
"github.com/lxc/lxd/lxd/maas"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/project"
"github.com/lxc/lxd/lxd/state"
driver "github.com/lxc/lxd/lxd/storage"
@@ -629,7 +630,7 @@ type containerLXC struct {
node string
// Progress tracking
- op *operation
+ op *operations.Operation
expiryDate time.Time
}
@@ -6906,7 +6907,7 @@ func (c *containerLXC) StoragePool() (string, error) {
}
// Progress tracking
-func (c *containerLXC) SetOperation(op *operation) {
+func (c *containerLXC) SetOperation(op *operations.Operation) {
c.op = op
}
@@ -6924,7 +6925,7 @@ func (c *containerLXC) updateProgress(progress string) {
return
}
- meta := c.op.metadata
+ meta := c.op.Metadata()
if meta == nil {
meta = make(map[string]interface{})
}
diff --git a/lxd/container_post.go b/lxd/container_post.go
index 938ca06865..e9ead927bb 100644
--- a/lxd/container_post.go
+++ b/lxd/container_post.go
@@ -15,6 +15,7 @@ import (
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/instance/instancetype"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
driver "github.com/lxc/lxd/lxd/storage"
"github.com/lxc/lxd/shared"
@@ -225,21 +226,21 @@ func containerPost(d *Daemon, r *http.Request) response.Response {
return response.InternalError(err)
}
- op, err := operationCreate(d.cluster, project, operationClassTask, db.OperationContainerMigrate, resources, nil, ws.Do, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationContainerMigrate, resources, nil, ws.Do, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
// Pull mode
- op, err := operationCreate(d.cluster, project, operationClassWebsocket, db.OperationContainerMigrate, resources, ws.Metadata(), ws.Do, nil, ws.Connect)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassWebsocket, db.OperationContainerMigrate, resources, ws.Metadata(), ws.Do, nil, ws.Connect)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
// Check that the name isn't already in use
@@ -248,19 +249,19 @@ func containerPost(d *Daemon, r *http.Request) response.Response {
return response.Conflict(fmt.Errorf("Name '%s' already in use", req.Name))
}
- run := func(*operation) error {
+ run := func(*operations.Operation) error {
return inst.Rename(req.Name)
}
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, project, operationClassTask, db.OperationContainerRename, resources, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationContainerRename, resources, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
// Move a non-ceph container to another cluster node.
@@ -294,7 +295,7 @@ func containerPostClusteringMigrate(d *Daemon, c Instance, oldName, newName, new
return response.SmartError(err)
}
- run := func(*operation) error {
+ run := func(*operations.Operation) error {
// Connect to the source host, i.e. ourselves (the node the container is running on).
source, err := cluster.Connect(sourceAddress, cert, false)
if err != nil {
@@ -398,17 +399,17 @@ func containerPostClusteringMigrate(d *Daemon, c Instance, oldName, newName, new
resources := map[string][]string{}
resources["containers"] = []string{oldName}
- op, err := operationCreate(d.cluster, c.Project(), operationClassTask, db.OperationContainerMigrate, resources, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, c.Project(), operations.OperationClassTask, db.OperationContainerMigrate, resources, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
// Special case migrating a container backed by ceph across two cluster nodes.
func containerPostClusteringMigrateWithCeph(d *Daemon, c Instance, project, oldName, newName, newNode string, instanceType instancetype.Type) response.Response {
- run := func(*operation) error {
+ run := func(*operations.Operation) error {
// If source node is online (i.e. we're serving the request on
// it, and c != nil), let's unmap the RBD volume locally
if c != nil {
@@ -506,12 +507,12 @@ func containerPostClusteringMigrateWithCeph(d *Daemon, c Instance, project, oldN
resources := map[string][]string{}
resources["containers"] = []string{oldName}
- op, err := operationCreate(d.cluster, project, operationClassTask, db.OperationContainerMigrate, resources, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationContainerMigrate, resources, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
// Notification that a container was moved.
diff --git a/lxd/container_put.go b/lxd/container_put.go
index 18edacba56..efc0c27a3b 100644
--- a/lxd/container_put.go
+++ b/lxd/container_put.go
@@ -9,6 +9,7 @@ import (
"github.com/lxc/lxd/lxd/db"
deviceConfig "github.com/lxc/lxd/lxd/device/config"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
"github.com/lxc/lxd/lxd/state"
"github.com/lxc/lxd/lxd/util"
@@ -63,11 +64,11 @@ func containerPut(d *Daemon, r *http.Request) response.Response {
architecture = 0
}
- var do func(*operation) error
+ var do func(*operations.Operation) error
var opType db.OperationType
if configRaw.Restore == "" {
// Update container configuration
- do = func(op *operation) error {
+ do = func(op *operations.Operation) error {
args := db.ContainerArgs{
Architecture: architecture,
Config: configRaw.Config,
@@ -90,7 +91,7 @@ func containerPut(d *Daemon, r *http.Request) response.Response {
opType = db.OperationContainerUpdate
} else {
// Snapshot Restore
- do = func(op *operation) error {
+ do = func(op *operations.Operation) error {
return containerSnapRestore(d.State(), project, name, configRaw.Restore, configRaw.Stateful)
}
@@ -100,12 +101,12 @@ func containerPut(d *Daemon, r *http.Request) response.Response {
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, project, operationClassTask, opType, resources, nil, do, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, opType, resources, nil, do, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func containerSnapRestore(s *state.State, project, name, snap string, stateful bool) error {
diff --git a/lxd/container_snapshot.go b/lxd/container_snapshot.go
index cada540241..99841487db 100644
--- a/lxd/container_snapshot.go
+++ b/lxd/container_snapshot.go
@@ -14,6 +14,7 @@ import (
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/instance/instancetype"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
@@ -147,7 +148,7 @@ func containerSnapshotsPost(d *Daemon, r *http.Request) response.Response {
}
}
- snapshot := func(op *operation) error {
+ snapshot := func(op *operations.Operation) error {
args := db.ContainerArgs{
Project: inst.Project(),
Architecture: inst.Architecture(),
@@ -179,12 +180,12 @@ func containerSnapshotsPost(d *Daemon, r *http.Request) response.Response {
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, project, operationClassTask, db.OperationSnapshotCreate, resources, nil, snapshot, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationSnapshotCreate, resources, nil, snapshot, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func containerSnapshotHandler(d *Daemon, r *http.Request) response.Response {
@@ -253,12 +254,12 @@ func snapshotPut(d *Daemon, r *http.Request, sc container, name string) response
return response.InternalError(err)
}
- var do func(op *operation) error
+ var do func(op *operations.Operation) error
_, err = rj.GetString("expires_at")
if err != nil {
// Skip updating the snapshot since the requested key wasn't provided
- do = func(op *operation) error {
+ do = func(op *operations.Operation) error {
return nil
}
} else {
@@ -275,7 +276,7 @@ func snapshotPut(d *Daemon, r *http.Request, sc container, name string) response
}
// Update container configuration
- do = func(op *operation) error {
+ do = func(op *operations.Operation) error {
args := db.ContainerArgs{
Architecture: sc.Architecture(),
Config: sc.LocalConfig(),
@@ -303,13 +304,13 @@ func snapshotPut(d *Daemon, r *http.Request, sc container, name string) response
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, sc.Project(), operationClassTask, opType, resources, nil,
+ op, err := operations.OperationCreate(d.cluster, sc.Project(), operations.OperationClassTask, opType, resources, nil,
do, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func snapshotGet(sc container, name string) response.Response {
@@ -381,21 +382,21 @@ func snapshotPost(d *Daemon, r *http.Request, sc container, containerName string
return response.InternalError(err)
}
- op, err := operationCreate(d.cluster, sc.Project(), operationClassTask, db.OperationSnapshotTransfer, resources, nil, ws.Do, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, sc.Project(), operations.OperationClassTask, db.OperationSnapshotTransfer, resources, nil, ws.Do, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
// Pull mode
- op, err := operationCreate(d.cluster, sc.Project(), operationClassWebsocket, db.OperationSnapshotTransfer, resources, ws.Metadata(), ws.Do, nil, ws.Connect)
+ op, err := operations.OperationCreate(d.cluster, sc.Project(), operations.OperationClassWebsocket, db.OperationSnapshotTransfer, resources, ws.Metadata(), ws.Do, nil, ws.Connect)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
newName, err := raw.GetString("name")
@@ -416,33 +417,33 @@ func snapshotPost(d *Daemon, r *http.Request, sc container, containerName string
return response.Conflict(fmt.Errorf("Name '%s' already in use", fullName))
}
- rename := func(op *operation) error {
+ rename := func(op *operations.Operation) error {
return sc.Rename(fullName)
}
resources := map[string][]string{}
resources["containers"] = []string{containerName}
- op, err := operationCreate(d.cluster, sc.Project(), operationClassTask, db.OperationSnapshotRename, resources, nil, rename, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, sc.Project(), operations.OperationClassTask, db.OperationSnapshotRename, resources, nil, rename, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func snapshotDelete(sc container, name string) response.Response {
- remove := func(op *operation) error {
+ remove := func(op *operations.Operation) error {
return sc.Delete()
}
resources := map[string][]string{}
resources["containers"] = []string{sc.Name()}
- op, err := operationCreate(sc.DaemonState().Cluster, sc.Project(), operationClassTask, db.OperationSnapshotDelete, resources, nil, remove, nil, nil)
+ op, err := operations.OperationCreate(sc.DaemonState().Cluster, sc.Project(), operations.OperationClassTask, db.OperationSnapshotDelete, resources, nil, remove, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
diff --git a/lxd/container_state.go b/lxd/container_state.go
index 5dd64ddf0c..9995803153 100644
--- a/lxd/container_state.go
+++ b/lxd/container_state.go
@@ -9,6 +9,7 @@ import (
"github.com/gorilla/mux"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
@@ -81,11 +82,11 @@ func containerStatePut(d *Daemon, r *http.Request) response.Response {
}
var opType db.OperationType
- var do func(*operation) error
+ var do func(*operations.Operation) error
switch shared.ContainerAction(raw.Action) {
case shared.Start:
opType = db.OperationContainerStart
- do = func(op *operation) error {
+ do = func(op *operations.Operation) error {
c.SetOperation(op)
if err = c.Start(raw.Stateful); err != nil {
return err
@@ -95,7 +96,7 @@ func containerStatePut(d *Daemon, r *http.Request) response.Response {
case shared.Stop:
opType = db.OperationContainerStop
if raw.Stateful {
- do = func(op *operation) error {
+ do = func(op *operations.Operation) error {
c.SetOperation(op)
err := c.Stop(raw.Stateful)
if err != nil {
@@ -105,7 +106,7 @@ func containerStatePut(d *Daemon, r *http.Request) response.Response {
return nil
}
} else if raw.Timeout == 0 || raw.Force {
- do = func(op *operation) error {
+ do = func(op *operations.Operation) error {
c.SetOperation(op)
err = c.Stop(false)
if err != nil {
@@ -115,7 +116,7 @@ func containerStatePut(d *Daemon, r *http.Request) response.Response {
return nil
}
} else {
- do = func(op *operation) error {
+ do = func(op *operations.Operation) error {
c.SetOperation(op)
if c.IsFrozen() {
err := c.Unfreeze()
@@ -134,7 +135,7 @@ func containerStatePut(d *Daemon, r *http.Request) response.Response {
}
case shared.Restart:
opType = db.OperationContainerRestart
- do = func(op *operation) error {
+ do = func(op *operations.Operation) error {
c.SetOperation(op)
ephemeral := c.IsEphemeral()
@@ -193,7 +194,7 @@ func containerStatePut(d *Daemon, r *http.Request) response.Response {
}
opType = db.OperationContainerFreeze
- do = func(op *operation) error {
+ do = func(op *operations.Operation) error {
c.SetOperation(op)
return c.Freeze()
}
@@ -203,7 +204,7 @@ func containerStatePut(d *Daemon, r *http.Request) response.Response {
}
opType = db.OperationContainerUnfreeze
- do = func(op *operation) error {
+ do = func(op *operations.Operation) error {
c.SetOperation(op)
return c.Unfreeze()
}
@@ -214,10 +215,10 @@ func containerStatePut(d *Daemon, r *http.Request) response.Response {
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, project, operationClassTask, opType, resources, nil, do, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, opType, resources, nil, do, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
diff --git a/lxd/containers_post.go b/lxd/containers_post.go
index ed46731abf..59dfb9ef51 100644
--- a/lxd/containers_post.go
+++ b/lxd/containers_post.go
@@ -22,6 +22,7 @@ import (
"github.com/lxc/lxd/lxd/device/config"
"github.com/lxc/lxd/lxd/instance/instancetype"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
@@ -99,7 +100,7 @@ func createFromImage(d *Daemon, project string, req *api.InstancesPost) response
return response.BadRequest(err)
}
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
args := db.ContainerArgs{
Project: project,
Config: req.Config,
@@ -147,12 +148,12 @@ func createFromImage(d *Daemon, project string, req *api.InstancesPost) response
resources := map[string][]string{}
resources["containers"] = []string{req.Name}
- op, err := operationCreate(d.cluster, project, operationClassTask, db.OperationContainerCreate, resources, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationContainerCreate, resources, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func createFromNone(d *Daemon, project string, req *api.InstancesPost) response.Response {
@@ -180,7 +181,7 @@ func createFromNone(d *Daemon, project string, req *api.InstancesPost) response.
args.Architecture = architecture
}
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
_, err := containerCreateAsEmpty(d, args)
return err
}
@@ -188,12 +189,12 @@ func createFromNone(d *Daemon, project string, req *api.InstancesPost) response.
resources := map[string][]string{}
resources["containers"] = []string{req.Name}
- op, err := operationCreate(d.cluster, project, operationClassTask, db.OperationContainerCreate, resources, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationContainerCreate, resources, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func createFromMigration(d *Daemon, project string, req *api.InstancesPost) response.Response {
@@ -409,7 +410,7 @@ func createFromMigration(d *Daemon, project string, req *api.InstancesPost) resp
return response.InternalError(err)
}
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
// And finally run the migration.
err = sink.Do(op)
if err != nil {
@@ -434,20 +435,20 @@ func createFromMigration(d *Daemon, project string, req *api.InstancesPost) resp
resources := map[string][]string{}
resources["containers"] = []string{req.Name}
- var op *operation
+ var op *operations.Operation
if push {
- op, err = operationCreate(d.cluster, project, operationClassWebsocket, db.OperationContainerCreate, resources, sink.Metadata(), run, nil, sink.Connect)
+ op, err = operations.OperationCreate(d.cluster, project, operations.OperationClassWebsocket, db.OperationContainerCreate, resources, sink.Metadata(), run, nil, sink.Connect)
if err != nil {
return response.InternalError(err)
}
} else {
- op, err = operationCreate(d.cluster, project, operationClassTask, db.OperationContainerCreate, resources, nil, run, nil, nil)
+ op, err = operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationContainerCreate, resources, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func createFromCopy(d *Daemon, project string, req *api.InstancesPost) response.Response {
@@ -597,7 +598,7 @@ func createFromCopy(d *Daemon, project string, req *api.InstancesPost) response.
Stateful: req.Stateful,
}
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
instanceOnly := req.Source.InstanceOnly || req.Source.ContainerOnly
_, err := containerCreateAsCopy(d.State(), args, source, instanceOnly, req.Source.Refresh)
if err != nil {
@@ -609,12 +610,12 @@ func createFromCopy(d *Daemon, project string, req *api.InstancesPost) response.
resources := map[string][]string{}
resources["containers"] = []string{req.Name, req.Source.Source}
- op, err := operationCreate(d.cluster, targetProject, operationClassTask, db.OperationContainerCreate, resources, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, targetProject, operations.OperationClassTask, db.OperationContainerCreate, resources, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func createFromBackup(d *Daemon, project string, data io.Reader, pool string) response.Response {
@@ -645,7 +646,7 @@ func createFromBackup(d *Daemon, project string, data io.Reader, pool string) re
bInfo.Pool = pool
}
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
defer f.Close()
// Dump tarball to storage
@@ -693,13 +694,13 @@ func createFromBackup(d *Daemon, project string, data io.Reader, pool string) re
resources := map[string][]string{}
resources["containers"] = []string{bInfo.Name}
- op, err := operationCreate(d.cluster, project, operationClassTask, db.OperationBackupRestore,
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationBackupRestore,
resources, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func containersPost(d *Daemon, r *http.Request) response.Response {
@@ -756,7 +757,7 @@ func containersPost(d *Daemon, r *http.Request) response.Response {
}
opAPI := op.Get()
- return ForwardedOperationResponse(project, &opAPI)
+ return operations.ForwardedOperationResponse(project, &opAPI)
}
}
diff --git a/lxd/daemon_images.go b/lxd/daemon_images.go
index d5a0f1e9a0..51e126d4d1 100644
--- a/lxd/daemon_images.go
+++ b/lxd/daemon_images.go
@@ -17,6 +17,7 @@ import (
"github.com/lxc/lxd/client"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/sys"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
@@ -79,7 +80,7 @@ func imageGetStreamCache(d *Daemon) (map[string]*imageStreamCacheEntry, error) {
}
// ImageDownload resolves the image fingerprint and if not in the database, downloads it
-func (d *Daemon) ImageDownload(op *operation, server string, protocol string, certificate string, secret string, alias string, imageType string, forContainer bool, autoUpdate bool, storagePool string, preferCached bool, project string) (*api.Image, error) {
+func (d *Daemon) ImageDownload(op *operations.Operation, server string, protocol string, certificate string, secret string, alias string, imageType string, forContainer bool, autoUpdate bool, storagePool string, preferCached bool, project string) (*api.Image, error) {
var err error
var ctxMap log.Ctx
@@ -356,7 +357,7 @@ func (d *Daemon) ImageDownload(op *operation, server string, protocol string, ce
if op == nil {
ctxMap = log.Ctx{"alias": alias, "server": server}
} else {
- ctxMap = log.Ctx{"trigger": op.url, "image": fp, "operation": op.id, "alias": alias, "server": server}
+ ctxMap = log.Ctx{"trigger": op.URL(), "image": fp, "operation": op.ID(), "alias": alias, "server": server}
}
logger.Info("Downloading image", ctxMap)
@@ -379,7 +380,7 @@ func (d *Daemon) ImageDownload(op *operation, server string, protocol string, ce
return
}
- meta := op.metadata
+ meta := op.Metadata()
if meta == nil {
meta = make(map[string]interface{})
}
@@ -393,7 +394,7 @@ func (d *Daemon) ImageDownload(op *operation, server string, protocol string, ce
var canceler *cancel.Canceler
if op != nil {
canceler = cancel.NewCanceler()
- op.canceler = canceler
+ op.SetCanceler(canceler)
}
if protocol == "lxd" || protocol == "simplestreams" {
diff --git a/lxd/images.go b/lxd/images.go
index 6b29a5216b..c48c7b65dc 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -30,6 +30,7 @@ import (
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/instance/instancetype"
"github.com/lxc/lxd/lxd/node"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
"github.com/lxc/lxd/lxd/state"
"github.com/lxc/lxd/lxd/task"
@@ -155,7 +156,7 @@ func compressFile(compress string, infile io.Reader, outfile io.Writer) error {
* This function takes a container or snapshot from the local image server and
* exports it as an image.
*/
-func imgPostContInfo(d *Daemon, r *http.Request, req api.ImagesPost, op *operation, builddir string) (*api.Image, error) {
+func imgPostContInfo(d *Daemon, r *http.Request, req api.ImagesPost, op *operations.Operation, builddir string) (*api.Image, error) {
info := api.Image{}
info.Type = "container"
info.Properties = map[string]string{}
@@ -316,7 +317,7 @@ func imgPostContInfo(d *Daemon, r *http.Request, req api.ImagesPost, op *operati
return &info, nil
}
-func imgPostRemoteInfo(d *Daemon, req api.ImagesPost, op *operation, project string) (*api.Image, error) {
+func imgPostRemoteInfo(d *Daemon, req api.ImagesPost, op *operations.Operation, project string) (*api.Image, error) {
var err error
var hash string
@@ -354,7 +355,7 @@ func imgPostRemoteInfo(d *Daemon, req api.ImagesPost, op *operation, project str
return info, nil
}
-func imgPostURLInfo(d *Daemon, req api.ImagesPost, op *operation, project string) (*api.Image, error) {
+func imgPostURLInfo(d *Daemon, req api.ImagesPost, op *operations.Operation, project string) (*api.Image, error) {
var err error
if req.Source.URL == "" {
@@ -720,7 +721,7 @@ func imagesPost(d *Daemon, r *http.Request) response.Response {
}
// Begin background operation
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
var err error
var info *api.Image
@@ -786,13 +787,13 @@ func imagesPost(d *Daemon, r *http.Request) response.Response {
return nil
}
- op, err := operationCreate(d.cluster, project, operationClassTask, db.OperationImageDownload, nil, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationImageDownload, nil, nil, run, nil, nil)
if err != nil {
cleanup(builddir, post)
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func getImageMetadata(fname string) (*api.ImageMetadata, string, error) {
@@ -940,11 +941,11 @@ func imagesGet(d *Daemon, r *http.Request) response.Response {
func autoUpdateImagesTask(d *Daemon) (task.Func, task.Schedule) {
f := func(ctx context.Context) {
- opRun := func(op *operation) error {
+ opRun := func(op *operations.Operation) error {
return autoUpdateImages(ctx, d)
}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationImagesUpdate, nil, nil, opRun, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationImagesUpdate, nil, nil, opRun, nil, nil)
if err != nil {
logger.Error("Failed to start image update operation", log.Ctx{"err": err})
return
@@ -1041,7 +1042,7 @@ func autoUpdateImagesInProject(ctx context.Context, d *Daemon, project string) e
// Update a single image. The operation can be nil, if no progress tracking is needed.
// Returns whether the image has been updated.
-func autoUpdateImage(d *Daemon, op *operation, id int, info *api.Image, project string) error {
+func autoUpdateImage(d *Daemon, op *operations.Operation, id int, info *api.Image, project string) error {
fingerprint := info.Fingerprint
_, source, err := d.cluster.ImageSourceGet(id)
if err != nil {
@@ -1168,11 +1169,11 @@ func autoUpdateImage(d *Daemon, op *operation, id int, info *api.Image, project
func pruneExpiredImagesTask(d *Daemon) (task.Func, task.Schedule) {
f := func(ctx context.Context) {
- opRun := func(op *operation) error {
+ opRun := func(op *operations.Operation) error {
return pruneExpiredImages(ctx, d)
}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationImagesExpire, nil, nil, opRun, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationImagesExpire, nil, nil, opRun, nil, nil)
if err != nil {
logger.Error("Failed to start expired image operation", log.Ctx{"err": err})
return
@@ -1221,7 +1222,7 @@ func pruneExpiredImagesTask(d *Daemon) (task.Func, task.Schedule) {
}
func pruneLeftoverImages(d *Daemon) {
- opRun := func(op *operation) error {
+ opRun := func(op *operations.Operation) error {
// Get all images
images, err := d.cluster.ImagesGet("default", false)
if err != nil {
@@ -1250,7 +1251,7 @@ func pruneLeftoverImages(d *Daemon) {
return nil
}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationImagesPruneLeftover, nil, nil, opRun, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationImagesPruneLeftover, nil, nil, opRun, nil, nil)
if err != nil {
logger.Error("Failed to start image leftover cleanup operation", log.Ctx{"err": err})
return
@@ -1452,7 +1453,7 @@ func imageDelete(d *Daemon, r *http.Request) response.Response {
return nil
}
- rmimg := func(op *operation) error {
+ rmimg := func(op *operations.Operation) error {
if isClusterNotification(r) {
return deleteFromDisk()
}
@@ -1463,12 +1464,12 @@ func imageDelete(d *Daemon, r *http.Request) response.Response {
resources := map[string][]string{}
resources["images"] = []string{fingerprint}
- op, err := operationCreate(d.cluster, project, operationClassTask, db.OperationImageDelete, resources, nil, rmimg, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationImageDelete, resources, nil, rmimg, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
// Helper to delete an image file from the local images directory.
@@ -1502,12 +1503,12 @@ func doImageGet(db *db.Cluster, project, fingerprint string, public bool) (*api.
}
func imageValidSecret(fingerprint string, secret string) bool {
- for _, op := range operations {
- if op.resources == nil {
+ for _, op := range operations.Operations() {
+ if op.Resources() == nil {
continue
}
- opImages, ok := op.resources["images"]
+ opImages, ok := op.Resources()["images"]
if !ok {
continue
}
@@ -1516,7 +1517,7 @@ func imageValidSecret(fingerprint string, secret string) bool {
continue
}
- opSecret, ok := op.metadata["secret"]
+ opSecret, ok := op.Metadata()["secret"]
if !ok {
continue
}
@@ -1962,12 +1963,12 @@ func imageSecret(d *Daemon, r *http.Request) response.Response {
resources := map[string][]string{}
resources["images"] = []string{imgInfo.Fingerprint}
- op, err := operationCreate(d.cluster, project, operationClassToken, db.OperationImageToken, resources, meta, nil, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassToken, db.OperationImageToken, resources, meta, nil, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func imageImportFromNode(imagesDir string, client lxd.InstanceServer, fingerprint string) error {
@@ -2035,16 +2036,16 @@ func imageRefresh(d *Daemon, r *http.Request) response.Response {
}
// Begin background operation
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
return autoUpdateImage(d, op, imageId, imageInfo, project)
}
- op, err := operationCreate(d.cluster, project, operationClassTask, db.OperationImageRefresh, nil, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, project, operations.OperationClassTask, db.OperationImageRefresh, nil, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func autoSyncImagesTask(d *Daemon) (task.Func, task.Schedule) {
@@ -2068,11 +2069,11 @@ func autoSyncImagesTask(d *Daemon) (task.Func, task.Schedule) {
return
}
- opRun := func(op *operation) error {
+ opRun := func(op *operations.Operation) error {
return autoSyncImages(ctx, d)
}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationImagesSynchronize, nil, nil, opRun, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationImagesSynchronize, nil, nil, opRun, nil, nil)
if err != nil {
logger.Error("Failed to start image synchronization operation", log.Ctx{"err": err})
return
diff --git a/lxd/instance_interface.go b/lxd/instance_interface.go
index e228d56e30..8f1dadc8cd 100644
--- a/lxd/instance_interface.go
+++ b/lxd/instance_interface.go
@@ -10,6 +10,7 @@ import (
"github.com/lxc/lxd/lxd/device"
"github.com/lxc/lxd/lxd/device/config"
"github.com/lxc/lxd/lxd/instance/instancetype"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/state"
"github.com/lxc/lxd/shared/api"
)
@@ -106,7 +107,7 @@ type Instance interface {
// Progress reporting
- SetOperation(op *operation)
+ SetOperation(op *operations.Operation)
// FIXME: Those should be internal functions
// Needed for migration for now.
diff --git a/lxd/logging.go b/lxd/logging.go
index 9644984d86..0b9dc40fec 100644
--- a/lxd/logging.go
+++ b/lxd/logging.go
@@ -8,6 +8,7 @@ import (
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/instance/instancetype"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/state"
"github.com/lxc/lxd/lxd/task"
"github.com/lxc/lxd/shared"
@@ -20,11 +21,11 @@ import (
// and will run once every 24h.
func expireLogsTask(state *state.State) (task.Func, task.Schedule) {
f := func(ctx context.Context) {
- opRun := func(op *operation) error {
+ opRun := func(op *operations.Operation) error {
return expireLogs(ctx, state)
}
- op, err := operationCreate(state.Cluster, "", operationClassTask, db.OperationLogsExpire, nil, nil, opRun, nil, nil)
+ op, err := operations.OperationCreate(state.Cluster, "", operations.OperationClassTask, db.OperationLogsExpire, nil, nil, opRun, nil, nil)
if err != nil {
logger.Error("Failed to start log expiry operation", log.Ctx{"err": err})
return
diff --git a/lxd/main.go b/lxd/main.go
index 743a43e31b..ce49cd5dc6 100644
--- a/lxd/main.go
+++ b/lxd/main.go
@@ -8,6 +8,7 @@ import (
"github.com/spf13/cobra"
"github.com/lxc/lxd/lxd/events"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
"github.com/lxc/lxd/shared/logger"
"github.com/lxc/lxd/shared/logging"
@@ -44,6 +45,9 @@ func (c *cmdGlobal) Run(cmd *cobra.Command, args []string) error {
// Set debug and verbose for the events package
events.Init(debug, verbose)
+ // Set debug for the operations package
+ operations.Init(debug)
+
// Set debug for the response package
response.Init(debug)
diff --git a/lxd/migrate.go b/lxd/migrate.go
index 6f5ead8459..92963b05f5 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -19,6 +19,7 @@ import (
"github.com/gorilla/websocket"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/idmap"
"github.com/lxc/lxd/shared/logger"
@@ -145,7 +146,7 @@ func (s *migrationSourceWs) Metadata() interface{} {
return secrets
}
-func (s *migrationSourceWs) Connect(op *operation, r *http.Request, w http.ResponseWriter) error {
+func (s *migrationSourceWs) Connect(op *operations.Operation, r *http.Request, w http.ResponseWriter) error {
secret := r.FormValue("secret")
if secret == "" {
return fmt.Errorf("missing secret")
@@ -317,7 +318,7 @@ func (s *migrationSink) Metadata() interface{} {
return secrets
}
-func (s *migrationSink) Connect(op *operation, r *http.Request, w http.ResponseWriter) error {
+func (s *migrationSink) Connect(op *operations.Operation, r *http.Request, w http.ResponseWriter) error {
secret := r.FormValue("secret")
if secret == "" {
return fmt.Errorf("missing secret")
diff --git a/lxd/migrate_container.go b/lxd/migrate_container.go
index c1ba4d8c20..575229393f 100644
--- a/lxd/migrate_container.go
+++ b/lxd/migrate_container.go
@@ -18,6 +18,7 @@ import (
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/instance/instancetype"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
@@ -328,7 +329,7 @@ func (s *migrationSourceWs) preDumpLoop(args *preDumpLoopArgs) (bool, error) {
return final, nil
}
-func (s *migrationSourceWs) Do(migrateOp *operation) error {
+func (s *migrationSourceWs) Do(migrateOp *operations.Operation) error {
<-s.allConnected
criuType := migration.CRIUType_CRIU_RSYNC.Enum()
@@ -543,14 +544,14 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
}
state := s.instance.DaemonState()
- actionScriptOp, err := operationCreate(
+ actionScriptOp, err := operations.OperationCreate(
state.Cluster,
s.instance.Project(),
- operationClassWebsocket,
+ operations.OperationClassWebsocket,
db.OperationContainerLiveMigrate,
nil,
nil,
- func(op *operation) error {
+ func(op *operations.Operation) error {
result := <-restoreSuccess
if !result {
return fmt.Errorf("restore failed, failing CRIU")
@@ -558,7 +559,7 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
return nil
},
nil,
- func(op *operation, r *http.Request, w http.ResponseWriter) error {
+ func(op *operations.Operation, r *http.Request, w http.ResponseWriter) error {
secret := r.FormValue("secret")
if secret == "" {
return fmt.Errorf("missing secret")
@@ -584,7 +585,7 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
return abort(err)
}
- err = writeActionScript(checkpointDir, actionScriptOp.url, actionScriptOpSecret, state.OS.ExecPath)
+ err = writeActionScript(checkpointDir, actionScriptOp.URL(), actionScriptOpSecret, state.OS.ExecPath)
if err != nil {
os.RemoveAll(checkpointDir)
return abort(err)
@@ -775,7 +776,7 @@ func NewMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) {
return &sink, nil
}
-func (c *migrationSink) Do(migrateOp *operation) error {
+func (c *migrationSink) Do(migrateOp *operations.Operation) error {
if c.src.instance.Type() != instancetype.Container {
return fmt.Errorf("Instance not container type")
}
diff --git a/lxd/migrate_storage_volumes.go b/lxd/migrate_storage_volumes.go
index 027fe08367..05ba8e7186 100644
--- a/lxd/migrate_storage_volumes.go
+++ b/lxd/migrate_storage_volumes.go
@@ -7,6 +7,7 @@ import (
"github.com/gorilla/websocket"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
"github.com/lxc/lxd/shared/logger"
@@ -32,7 +33,7 @@ func NewStorageMigrationSource(storage storage, volumeOnly bool) (*migrationSour
return &ret, nil
}
-func (s *migrationSourceWs) DoStorage(migrateOp *operation) error {
+func (s *migrationSourceWs) DoStorage(migrateOp *operations.Operation) error {
<-s.allConnected
// Storage needs to start unconditionally now, since we need to
@@ -223,7 +224,7 @@ func NewStorageMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) {
return &sink, nil
}
-func (c *migrationSink) DoStorage(migrateOp *operation) error {
+func (c *migrationSink) DoStorage(migrateOp *operations.Operation) error {
var err error
if c.push {
diff --git a/lxd/operations.go b/lxd/operations.go
index 5837ec740b..d5a83fb4b3 100644
--- a/lxd/operations.go
+++ b/lxd/operations.go
@@ -4,25 +4,19 @@ import (
"fmt"
"net/http"
"strings"
- "sync"
- "time"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
- "github.com/pborman/uuid"
- "github.com/pkg/errors"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
- "github.com/lxc/lxd/lxd/events"
"github.com/lxc/lxd/lxd/node"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
- "github.com/lxc/lxd/shared/cancel"
"github.com/lxc/lxd/shared/logger"
- "github.com/lxc/lxd/shared/version"
)
var operationCmd = APIEndpoint{
@@ -50,447 +44,6 @@ var operationWebsocket = APIEndpoint{
Get: APIEndpointAction{Handler: operationWebsocketGet, AllowUntrusted: true},
}
-var operationsLock sync.Mutex
-var operations map[string]*operation = make(map[string]*operation)
-
-type operationClass int
-
-const (
- operationClassTask operationClass = 1
- operationClassWebsocket operationClass = 2
- operationClassToken operationClass = 3
-)
-
-func (t operationClass) String() string {
- return map[operationClass]string{
- operationClassTask: "task",
- operationClassWebsocket: "websocket",
- operationClassToken: "token",
- }[t]
-}
-
-type operation struct {
- project string
- id string
- class operationClass
- createdAt time.Time
- updatedAt time.Time
- status api.StatusCode
- url string
- resources map[string][]string
- metadata map[string]interface{}
- err string
- readonly bool
- canceler *cancel.Canceler
- description string
- permission string
-
- // Those functions are called at various points in the operation lifecycle
- onRun func(*operation) error
- onCancel func(*operation) error
- onConnect func(*operation, *http.Request, http.ResponseWriter) error
-
- // Channels used for error reporting and state tracking of background actions
- chanDone chan error
-
- // Locking for concurent access to the operation
- lock sync.Mutex
-
- cluster *db.Cluster
-}
-
-func (op *operation) done() {
- if op.readonly {
- return
- }
-
- op.lock.Lock()
- op.readonly = true
- op.onRun = nil
- op.onCancel = nil
- op.onConnect = nil
- close(op.chanDone)
- op.lock.Unlock()
-
- time.AfterFunc(time.Second*5, func() {
- operationsLock.Lock()
- _, ok := operations[op.id]
- if !ok {
- operationsLock.Unlock()
- return
- }
-
- delete(operations, op.id)
- operationsLock.Unlock()
-
- err := op.cluster.Transaction(func(tx *db.ClusterTx) error {
- return tx.OperationRemove(op.id)
- })
- if err != nil {
- logger.Warnf("Failed to delete operation %s: %s", op.id, err)
- }
- })
-}
-
-func (op *operation) Run() (chan error, error) {
- if op.status != api.Pending {
- return nil, fmt.Errorf("Only pending operations can be started")
- }
-
- chanRun := make(chan error, 1)
-
- op.lock.Lock()
- op.status = api.Running
-
- if op.onRun != nil {
- go func(op *operation, chanRun chan error) {
- err := op.onRun(op)
- if err != nil {
- op.lock.Lock()
- op.status = api.Failure
- op.err = response.SmartError(err).String()
- op.lock.Unlock()
- op.done()
- chanRun <- err
-
- logger.Debugf("Failure for %s operation: %s: %s", op.class.String(), op.id, err)
-
- _, md, _ := op.Render()
- events.Send(op.project, "operation", md)
- return
- }
-
- op.lock.Lock()
- op.status = api.Success
- op.lock.Unlock()
- op.done()
- chanRun <- nil
-
- op.lock.Lock()
- logger.Debugf("Success for %s operation: %s", op.class.String(), op.id)
- _, md, _ := op.Render()
- events.Send(op.project, "operation", md)
- op.lock.Unlock()
- }(op, chanRun)
- }
- op.lock.Unlock()
-
- logger.Debugf("Started %s operation: %s", op.class.String(), op.id)
- _, md, _ := op.Render()
- events.Send(op.project, "operation", md)
-
- return chanRun, nil
-}
-
-func (op *operation) Cancel() (chan error, error) {
- if op.status != api.Running {
- return nil, fmt.Errorf("Only running operations can be cancelled")
- }
-
- if !op.mayCancel() {
- return nil, fmt.Errorf("This operation can't be cancelled")
- }
-
- chanCancel := make(chan error, 1)
-
- op.lock.Lock()
- oldStatus := op.status
- op.status = api.Cancelling
- op.lock.Unlock()
-
- if op.onCancel != nil {
- go func(op *operation, oldStatus api.StatusCode, chanCancel chan error) {
- err := op.onCancel(op)
- if err != nil {
- op.lock.Lock()
- op.status = oldStatus
- op.lock.Unlock()
- chanCancel <- err
-
- logger.Debugf("Failed to cancel %s operation: %s: %s", op.class.String(), op.id, err)
- _, md, _ := op.Render()
- events.Send(op.project, "operation", md)
- return
- }
-
- op.lock.Lock()
- op.status = api.Cancelled
- op.lock.Unlock()
- op.done()
- chanCancel <- nil
-
- logger.Debugf("Cancelled %s operation: %s", op.class.String(), op.id)
- _, md, _ := op.Render()
- events.Send(op.project, "operation", md)
- }(op, oldStatus, chanCancel)
- }
-
- logger.Debugf("Cancelling %s operation: %s", op.class.String(), op.id)
- _, md, _ := op.Render()
- events.Send(op.project, "operation", md)
-
- if op.canceler != nil {
- err := op.canceler.Cancel()
- if err != nil {
- return nil, err
- }
- }
-
- if op.onCancel == nil {
- op.lock.Lock()
- op.status = api.Cancelled
- op.lock.Unlock()
- op.done()
- chanCancel <- nil
- }
-
- logger.Debugf("Cancelled %s operation: %s", op.class.String(), op.id)
- _, md, _ = op.Render()
- events.Send(op.project, "operation", md)
-
- return chanCancel, nil
-}
-
-func (op *operation) Connect(r *http.Request, w http.ResponseWriter) (chan error, error) {
- if op.class != operationClassWebsocket {
- return nil, fmt.Errorf("Only websocket operations can be connected")
- }
-
- if op.status != api.Running {
- return nil, fmt.Errorf("Only running operations can be connected")
- }
-
- chanConnect := make(chan error, 1)
-
- op.lock.Lock()
-
- go func(op *operation, chanConnect chan error) {
- err := op.onConnect(op, r, w)
- if err != nil {
- chanConnect <- err
-
- logger.Debugf("Failed to handle %s operation: %s: %s", op.class.String(), op.id, err)
- return
- }
-
- chanConnect <- nil
-
- logger.Debugf("Handled %s operation: %s", op.class.String(), op.id)
- }(op, chanConnect)
- op.lock.Unlock()
-
- logger.Debugf("Connected %s operation: %s", op.class.String(), op.id)
-
- return chanConnect, nil
-}
-
-func (op *operation) mayCancel() bool {
- if op.class == operationClassToken {
- return true
- }
-
- if op.onCancel != nil {
- return true
- }
-
- if op.canceler != nil && op.canceler.Cancelable() {
- return true
- }
-
- return false
-}
-
-func (op *operation) Render() (string, *api.Operation, error) {
- // Setup the resource URLs
- resources := op.resources
- if resources != nil {
- tmpResources := make(map[string][]string)
- for key, value := range resources {
- var values []string
- for _, c := range value {
- values = append(values, fmt.Sprintf("/%s/%s/%s", version.APIVersion, key, c))
- }
- tmpResources[key] = values
- }
- resources = tmpResources
- }
-
- // Local server name
- var err error
- var serverName string
- err = op.cluster.Transaction(func(tx *db.ClusterTx) error {
- serverName, err = tx.NodeName()
- return err
- })
- if err != nil {
- return "", nil, err
- }
-
- return op.url, &api.Operation{
- ID: op.id,
- Class: op.class.String(),
- Description: op.description,
- CreatedAt: op.createdAt,
- UpdatedAt: op.updatedAt,
- Status: op.status.String(),
- StatusCode: op.status,
- Resources: resources,
- Metadata: op.metadata,
- MayCancel: op.mayCancel(),
- Err: op.err,
- Location: serverName,
- }, nil
-}
-
-func (op *operation) WaitFinal(timeout int) (bool, error) {
- // Check current state
- if op.status.IsFinal() {
- return true, nil
- }
-
- // Wait indefinitely
- if timeout == -1 {
- <-op.chanDone
- return true, nil
- }
-
- // Wait until timeout
- if timeout > 0 {
- timer := time.NewTimer(time.Duration(timeout) * time.Second)
- select {
- case <-op.chanDone:
- return true, nil
-
- case <-timer.C:
- return false, nil
- }
- }
-
- return false, nil
-}
-
-func (op *operation) UpdateResources(opResources map[string][]string) error {
- if op.status != api.Pending && op.status != api.Running {
- return fmt.Errorf("Only pending or running operations can be updated")
- }
-
- if op.readonly {
- return fmt.Errorf("Read-only operations can't be updated")
- }
-
- op.lock.Lock()
- op.updatedAt = time.Now()
- op.resources = opResources
- op.lock.Unlock()
-
- logger.Debugf("Updated resources for %s operation: %s", op.class.String(), op.id)
- _, md, _ := op.Render()
- events.Send(op.project, "operation", md)
-
- return nil
-}
-
-func (op *operation) UpdateMetadata(opMetadata interface{}) error {
- if op.status != api.Pending && op.status != api.Running {
- return fmt.Errorf("Only pending or running operations can be updated")
- }
-
- if op.readonly {
- return fmt.Errorf("Read-only operations can't be updated")
- }
-
- newMetadata, err := shared.ParseMetadata(opMetadata)
- if err != nil {
- return err
- }
-
- op.lock.Lock()
- op.updatedAt = time.Now()
- op.metadata = newMetadata
- op.lock.Unlock()
-
- logger.Debugf("Updated metadata for %s operation: %s", op.class.String(), op.id)
- _, md, _ := op.Render()
- events.Send(op.project, "operation", md)
-
- return nil
-}
-
-func operationCreate(cluster *db.Cluster, project string, opClass operationClass, opType db.OperationType, opResources map[string][]string, opMetadata interface{}, onRun func(*operation) error, onCancel func(*operation) error, onConnect func(*operation, *http.Request, http.ResponseWriter) error) (*operation, error) {
- // Main attributes
- op := operation{}
- op.project = project
- op.id = uuid.NewRandom().String()
- op.description = opType.Description()
- op.permission = opType.Permission()
- op.class = opClass
- op.createdAt = time.Now()
- op.updatedAt = op.createdAt
- op.status = api.Pending
- op.url = fmt.Sprintf("/%s/operations/%s", version.APIVersion, op.id)
- op.resources = opResources
- op.chanDone = make(chan error)
- op.cluster = cluster
-
- newMetadata, err := shared.ParseMetadata(opMetadata)
- if err != nil {
- return nil, err
- }
- op.metadata = newMetadata
-
- // Callback functions
- op.onRun = onRun
- op.onCancel = onCancel
- op.onConnect = onConnect
-
- // Sanity check
- if op.class != operationClassWebsocket && op.onConnect != nil {
- return nil, fmt.Errorf("Only websocket operations can have a Connect hook")
- }
-
- if op.class == operationClassWebsocket && op.onConnect == nil {
- return nil, fmt.Errorf("Websocket operations must have a Connect hook")
- }
-
- if op.class == operationClassToken && op.onRun != nil {
- return nil, fmt.Errorf("Token operations can't have a Run hook")
- }
-
- if op.class == operationClassToken && op.onCancel != nil {
- return nil, fmt.Errorf("Token operations can't have a Cancel hook")
- }
-
- operationsLock.Lock()
- operations[op.id] = &op
- operationsLock.Unlock()
-
- err = op.cluster.Transaction(func(tx *db.ClusterTx) error {
- _, err := tx.OperationAdd(project, op.id, opType)
- return err
- })
- if err != nil {
- return nil, errors.Wrapf(err, "failed to add operation %s to database", op.id)
- }
-
- logger.Debugf("New %s operation: %s", op.class.String(), op.id)
- _, md, _ := op.Render()
- events.Send(op.project, "operation", md)
-
- return &op, nil
-}
-
-func operationGetInternal(id string) (*operation, error) {
- operationsLock.Lock()
- op, ok := operations[id]
- operationsLock.Unlock()
-
- if !ok {
- return nil, fmt.Errorf("Operation '%s' doesn't exist", id)
- }
-
- return op, nil
-}
-
// API functions
func operationGet(d *Daemon, r *http.Request) response.Response {
id := mux.Vars(r)["id"]
@@ -498,7 +51,7 @@ func operationGet(d *Daemon, r *http.Request) response.Response {
var body *api.Operation
// First check if the query is for a local operation from this node
- op, err := operationGetInternal(id)
+ op, err := operations.OperationGetInternal(id)
if err == nil {
_, body, err = op.Render()
if err != nil {
@@ -541,15 +94,15 @@ func operationDelete(d *Daemon, r *http.Request) response.Response {
id := mux.Vars(r)["id"]
// First check if the query is for a local operation from this node
- op, err := operationGetInternal(id)
+ op, err := operations.OperationGetInternal(id)
if err == nil {
- if op.permission != "" {
- project := op.project
+ if op.Permission() != "" {
+ project := op.Project()
if project == "" {
project = "default"
}
- if !d.userHasPermission(r, project, op.permission) {
+ if !d.userHasPermission(r, project, op.Permission()) {
return response.Forbidden(nil)
}
}
@@ -597,24 +150,24 @@ func operationsGet(d *Daemon, r *http.Request) response.Response {
localOperationURLs := func() (shared.Jmap, error) {
// Get all the operations
- operationsLock.Lock()
- ops := operations
- operationsLock.Unlock()
+ operations.Lock()
+ localOps := operations.Operations()
+ operations.Unlock()
// Build a list of URLs
body := shared.Jmap{}
- for _, v := range ops {
- if v.project != "" && v.project != project {
+ for _, v := range localOps {
+ if v.Project() != "" && v.Project() != project {
continue
}
- status := strings.ToLower(v.status.String())
+ status := strings.ToLower(v.Status().String())
_, ok := body[status]
if !ok {
body[status] = make([]string, 0)
}
- body[status] = append(body[status].([]string), v.url)
+ body[status] = append(body[status].([]string), v.URL())
}
return body, nil
@@ -622,18 +175,18 @@ func operationsGet(d *Daemon, r *http.Request) response.Response {
localOperations := func() (shared.Jmap, error) {
// Get all the operations
- operationsLock.Lock()
- ops := operations
- operationsLock.Unlock()
+ operations.Lock()
+ localOps := operations.Operations()
+ operations.Unlock()
// Build a list of operations
body := shared.Jmap{}
- for _, v := range ops {
- if v.project != "" && v.project != project {
+ for _, v := range localOps {
+ if v.Project() != "" && v.Project() != project {
continue
}
- status := strings.ToLower(v.status.String())
+ status := strings.ToLower(v.Status().String())
_, ok := body[status]
if !ok {
body[status] = make([]*api.Operation, 0)
@@ -772,7 +325,7 @@ func operationWaitGet(d *Daemon, r *http.Request) response.Response {
}
// First check if the query is for a local operation from this node
- op, err := operationGetInternal(id)
+ op, err := operations.OperationGetInternal(id)
if err == nil {
_, err = op.WaitFinal(timeout)
if err != nil {
@@ -818,7 +371,7 @@ func operationWaitGet(d *Daemon, r *http.Request) response.Response {
type operationWebSocket struct {
req *http.Request
- op *operation
+ op *operations.Operation
}
func (r *operationWebSocket) Render(w http.ResponseWriter) error {
@@ -863,7 +416,7 @@ func operationWebsocketGet(d *Daemon, r *http.Request) response.Response {
id := mux.Vars(r)["id"]
// First check if the query is for a local operation from this node
- op, err := operationGetInternal(id)
+ op, err := operations.OperationGetInternal(id)
if err == nil {
return &operationWebSocket{r, op}
}
@@ -900,5 +453,5 @@ func operationWebsocketGet(d *Daemon, r *http.Request) response.Response {
return response.SmartError(err)
}
- return &forwardedOperationWebSocket{req: r, id: id, source: source}
+ return &forwardedOperationWebSocket{r, id, source}
}
diff --git a/lxd/operations/operations.go b/lxd/operations/operations.go
new file mode 100644
index 0000000000..da98f7e9aa
--- /dev/null
+++ b/lxd/operations/operations.go
@@ -0,0 +1,543 @@
+package operations
+
+import (
+ "fmt"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/events"
+ "github.com/lxc/lxd/lxd/response"
+ "github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/api"
+ "github.com/lxc/lxd/shared/cancel"
+ "github.com/lxc/lxd/shared/logger"
+ "github.com/lxc/lxd/shared/version"
+ "github.com/pborman/uuid"
+ "github.com/pkg/errors"
+)
+
+var debug bool
+
+var operationsLock sync.Mutex
+var operations map[string]*Operation = make(map[string]*Operation)
+
+type operationClass int
+
+const (
+ // OperationClassTask represents the Task OperationClass
+ OperationClassTask operationClass = 1
+ // OperationClassWebsocket represents the Websocket OperationClass
+ OperationClassWebsocket operationClass = 2
+ // OperationClassToken represents the Token OperationClass
+ OperationClassToken operationClass = 3
+)
+
+func (t operationClass) String() string {
+ return map[operationClass]string{
+ OperationClassTask: "task",
+ OperationClassWebsocket: "websocket",
+ OperationClassToken: "token",
+ }[t]
+}
+
+// Init sets the debug value for the operations package.
+func Init(d bool) {
+ debug = d
+}
+
+// Lock locks the operations mutex.
+func Lock() {
+ operationsLock.Lock()
+}
+
+// Unlock unlocks the operations mutex.
+func Unlock() {
+ operationsLock.Unlock()
+}
+
+// Operations returns a map of operations.
+func Operations() map[string]*Operation {
+ return operations
+}
+
+// OperationGetInternal returns the operation with the given id. It returns an
+// error if it doesn't exist.
+func OperationGetInternal(id string) (*Operation, error) {
+ operationsLock.Lock()
+ op, ok := operations[id]
+ operationsLock.Unlock()
+
+ if !ok {
+ return nil, fmt.Errorf("Operation '%s' doesn't exist", id)
+ }
+
+ return op, nil
+}
+
+// Operation represents an operation.
+type Operation struct {
+ project string
+ id string
+ class operationClass
+ createdAt time.Time
+ updatedAt time.Time
+ status api.StatusCode
+ url string
+ resources map[string][]string
+ metadata map[string]interface{}
+ err string
+ readonly bool
+ canceler *cancel.Canceler
+ description string
+ permission string
+
+ // Those functions are called at various points in the Operation lifecycle
+ onRun func(*Operation) error
+ onCancel func(*Operation) error
+ onConnect func(*Operation, *http.Request, http.ResponseWriter) error
+
+ // Channels used for error reporting and state tracking of background actions
+ chanDone chan error
+
+ // Locking for concurent access to the Operation
+ lock sync.Mutex
+
+ cluster *db.Cluster
+}
+
+// OperationCreate creates a new operation and returns it. If it cannot be
+// created, it returns an error.
+func OperationCreate(cluster *db.Cluster, project string, opClass operationClass, opType db.OperationType, opResources map[string][]string, opMetadata interface{}, onRun func(*Operation) error, onCancel func(*Operation) error, onConnect func(*Operation, *http.Request, http.ResponseWriter) error) (*Operation, error) {
+ // Main attributes
+ op := Operation{}
+ op.project = project
+ op.id = uuid.NewRandom().String()
+ op.description = opType.Description()
+ op.permission = opType.Permission()
+ op.class = opClass
+ op.createdAt = time.Now()
+ op.updatedAt = op.createdAt
+ op.status = api.Pending
+ op.url = fmt.Sprintf("/%s/operations/%s", version.APIVersion, op.id)
+ op.resources = opResources
+ op.chanDone = make(chan error)
+ op.cluster = cluster
+
+ newMetadata, err := shared.ParseMetadata(opMetadata)
+ if err != nil {
+ return nil, err
+ }
+ op.metadata = newMetadata
+
+ // Callback functions
+ op.onRun = onRun
+ op.onCancel = onCancel
+ op.onConnect = onConnect
+
+ // Sanity check
+ if op.class != OperationClassWebsocket && op.onConnect != nil {
+ return nil, fmt.Errorf("Only websocket operations can have a Connect hook")
+ }
+
+ if op.class == OperationClassWebsocket && op.onConnect == nil {
+ return nil, fmt.Errorf("Websocket operations must have a Connect hook")
+ }
+
+ if op.class == OperationClassToken && op.onRun != nil {
+ return nil, fmt.Errorf("Token operations can't have a Run hook")
+ }
+
+ if op.class == OperationClassToken && op.onCancel != nil {
+ return nil, fmt.Errorf("Token operations can't have a Cancel hook")
+ }
+
+ operationsLock.Lock()
+ operations[op.id] = &op
+ operationsLock.Unlock()
+
+ err = op.cluster.Transaction(func(tx *db.ClusterTx) error {
+ _, err := tx.OperationAdd(project, op.id, opType)
+ return err
+ })
+ if err != nil {
+ return nil, errors.Wrapf(err, "failed to add Operation %s to database", op.id)
+ }
+
+ logger.Debugf("New %s Operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "Operation", md)
+
+ return &op, nil
+}
+
+func (op *Operation) done() {
+ if op.readonly {
+ return
+ }
+
+ op.lock.Lock()
+ op.readonly = true
+ op.onRun = nil
+ op.onCancel = nil
+ op.onConnect = nil
+ close(op.chanDone)
+ op.lock.Unlock()
+
+ time.AfterFunc(time.Second*5, func() {
+ operationsLock.Lock()
+ _, ok := operations[op.id]
+ if !ok {
+ operationsLock.Unlock()
+ return
+ }
+
+ delete(operations, op.id)
+ operationsLock.Unlock()
+
+ err := op.cluster.Transaction(func(tx *db.ClusterTx) error {
+ return tx.OperationRemove(op.id)
+ })
+ if err != nil {
+ logger.Warnf("Failed to delete operation %s: %s", op.id, err)
+ }
+ })
+}
+
+// Run runs a pending operation. It returns an error if the operation cannot
+// be started.
+func (op *Operation) Run() (chan error, error) {
+ if op.status != api.Pending {
+ return nil, fmt.Errorf("Only pending operations can be started")
+ }
+
+ chanRun := make(chan error, 1)
+
+ op.lock.Lock()
+ op.status = api.Running
+
+ if op.onRun != nil {
+ go func(op *Operation, chanRun chan error) {
+ err := op.onRun(op)
+ if err != nil {
+ op.lock.Lock()
+ op.status = api.Failure
+ op.err = response.SmartError(err).String()
+ op.lock.Unlock()
+ op.done()
+ chanRun <- err
+
+ logger.Debugf("Failure for %s operation: %s: %s", op.class.String(), op.id, err)
+
+ _, md, _ := op.Render()
+ events.Send(op.project, "operation", md)
+ return
+ }
+
+ op.lock.Lock()
+ op.status = api.Success
+ op.lock.Unlock()
+ op.done()
+ chanRun <- nil
+
+ op.lock.Lock()
+ logger.Debugf("Success for %s operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "operation", md)
+ op.lock.Unlock()
+ }(op, chanRun)
+ }
+ op.lock.Unlock()
+
+ logger.Debugf("Started %s operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "operation", md)
+
+ return chanRun, nil
+}
+
+// Cancel cancels a running operation. If the operation cannot be cancelled, it
+// returns an error.
+func (op *Operation) Cancel() (chan error, error) {
+ if op.status != api.Running {
+ return nil, fmt.Errorf("Only running operations can be cancelled")
+ }
+
+ if !op.mayCancel() {
+ return nil, fmt.Errorf("This Operation can't be cancelled")
+ }
+
+ chanCancel := make(chan error, 1)
+
+ op.lock.Lock()
+ oldStatus := op.status
+ op.status = api.Cancelling
+ op.lock.Unlock()
+
+ if op.onCancel != nil {
+ go func(op *Operation, oldStatus api.StatusCode, chanCancel chan error) {
+ err := op.onCancel(op)
+ if err != nil {
+ op.lock.Lock()
+ op.status = oldStatus
+ op.lock.Unlock()
+ chanCancel <- err
+
+ logger.Debugf("Failed to cancel %s Operation: %s: %s", op.class.String(), op.id, err)
+ _, md, _ := op.Render()
+ events.Send(op.project, "Operation", md)
+ return
+ }
+
+ op.lock.Lock()
+ op.status = api.Cancelled
+ op.lock.Unlock()
+ op.done()
+ chanCancel <- nil
+
+ logger.Debugf("Cancelled %s Operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "Operation", md)
+ }(op, oldStatus, chanCancel)
+ }
+
+ logger.Debugf("Cancelling %s Operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "Operation", md)
+
+ if op.canceler != nil {
+ err := op.canceler.Cancel()
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ if op.onCancel == nil {
+ op.lock.Lock()
+ op.status = api.Cancelled
+ op.lock.Unlock()
+ op.done()
+ chanCancel <- nil
+ }
+
+ logger.Debugf("Cancelled %s Operation: %s", op.class.String(), op.id)
+ _, md, _ = op.Render()
+ events.Send(op.project, "Operation", md)
+
+ return chanCancel, nil
+}
+
+// Connect connects a websocket operation. If the operation is not a websocket
+// operation or the operation is not running, it returns an error.
+func (op *Operation) Connect(r *http.Request, w http.ResponseWriter) (chan error, error) {
+ if op.class != OperationClassWebsocket {
+ return nil, fmt.Errorf("Only websocket operations can be connected")
+ }
+
+ if op.status != api.Running {
+ return nil, fmt.Errorf("Only running operations can be connected")
+ }
+
+ chanConnect := make(chan error, 1)
+
+ op.lock.Lock()
+
+ go func(op *Operation, chanConnect chan error) {
+ err := op.onConnect(op, r, w)
+ if err != nil {
+ chanConnect <- err
+
+ logger.Debugf("Failed to handle %s Operation: %s: %s", op.class.String(), op.id, err)
+ return
+ }
+
+ chanConnect <- nil
+
+ logger.Debugf("Handled %s Operation: %s", op.class.String(), op.id)
+ }(op, chanConnect)
+ op.lock.Unlock()
+
+ logger.Debugf("Connected %s Operation: %s", op.class.String(), op.id)
+
+ return chanConnect, nil
+}
+
+func (op *Operation) mayCancel() bool {
+ if op.class == OperationClassToken {
+ return true
+ }
+
+ if op.onCancel != nil {
+ return true
+ }
+
+ if op.canceler != nil && op.canceler.Cancelable() {
+ return true
+ }
+
+ return false
+}
+
+// Render renders the operation structure.
+func (op *Operation) Render() (string, *api.Operation, error) {
+ // Setup the resource URLs
+ resources := op.resources
+ if resources != nil {
+ tmpResources := make(map[string][]string)
+ for key, value := range resources {
+ var values []string
+ for _, c := range value {
+ values = append(values, fmt.Sprintf("/%s/%s/%s", version.APIVersion, key, c))
+ }
+ tmpResources[key] = values
+ }
+ resources = tmpResources
+ }
+
+ // Local server name
+ var err error
+ var serverName string
+ err = op.cluster.Transaction(func(tx *db.ClusterTx) error {
+ serverName, err = tx.NodeName()
+ return err
+ })
+ if err != nil {
+ return "", nil, err
+ }
+
+ return op.url, &api.Operation{
+ ID: op.id,
+ Class: op.class.String(),
+ Description: op.description,
+ CreatedAt: op.createdAt,
+ UpdatedAt: op.updatedAt,
+ Status: op.status.String(),
+ StatusCode: op.status,
+ Resources: resources,
+ Metadata: op.metadata,
+ MayCancel: op.mayCancel(),
+ Err: op.err,
+ Location: serverName,
+ }, nil
+}
+
+// WaitFinal waits for the operation to be done. If timeout is -1, it will wait
+// indefinitely otherwise it will timeout after {timeout} seconds.
+func (op *Operation) WaitFinal(timeout int) (bool, error) {
+ // Check current state
+ if op.status.IsFinal() {
+ return true, nil
+ }
+
+ // Wait indefinitely
+ if timeout == -1 {
+ <-op.chanDone
+ return true, nil
+ }
+
+ // Wait until timeout
+ if timeout > 0 {
+ timer := time.NewTimer(time.Duration(timeout) * time.Second)
+ select {
+ case <-op.chanDone:
+ return true, nil
+
+ case <-timer.C:
+ return false, nil
+ }
+ }
+
+ return false, nil
+}
+
+// UpdateResources updates the resources of the operation. It returns an error
+// if the operation is not pending or running, or the operation is read-only.
+func (op *Operation) UpdateResources(opResources map[string][]string) error {
+ if op.status != api.Pending && op.status != api.Running {
+ return fmt.Errorf("Only pending or running operations can be updated")
+ }
+
+ if op.readonly {
+ return fmt.Errorf("Read-only operations can't be updated")
+ }
+
+ op.lock.Lock()
+ op.updatedAt = time.Now()
+ op.resources = opResources
+ op.lock.Unlock()
+
+ logger.Debugf("Updated resources for %s Operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "Operation", md)
+
+ return nil
+}
+
+// UpdateMetadata updates the metadata of the operation. It returns an error
+// if the operation is not pending or running, or the operation is read-only.
+func (op *Operation) UpdateMetadata(opMetadata interface{}) error {
+ if op.status != api.Pending && op.status != api.Running {
+ return fmt.Errorf("Only pending or running operations can be updated")
+ }
+
+ if op.readonly {
+ return fmt.Errorf("Read-only operations can't be updated")
+ }
+
+ newMetadata, err := shared.ParseMetadata(opMetadata)
+ if err != nil {
+ return err
+ }
+
+ op.lock.Lock()
+ op.updatedAt = time.Now()
+ op.metadata = newMetadata
+ op.lock.Unlock()
+
+ logger.Debugf("Updated metadata for %s Operation: %s", op.class.String(), op.id)
+ _, md, _ := op.Render()
+ events.Send(op.project, "Operation", md)
+
+ return nil
+}
+
+// ID returns the operation ID.
+func (op *Operation) ID() string {
+ return op.id
+}
+
+// Metadata returns the operation Metadata.
+func (op *Operation) Metadata() map[string]interface{} {
+ return op.metadata
+}
+
+// URL returns the operation URL.
+func (op *Operation) URL() string {
+ return op.url
+}
+
+// Resources returns the operation resources.
+func (op *Operation) Resources() map[string][]string {
+ return op.resources
+}
+
+// SetCanceler sets a canceler.
+func (op *Operation) SetCanceler(canceler *cancel.Canceler) {
+ op.canceler = canceler
+}
+
+// Permission returns the operation permission.
+func (op *Operation) Permission() string {
+ return op.permission
+}
+
+// Project returns the operation project.
+func (op *Operation) Project() string {
+ return op.project
+}
+
+// Status returns the operation status.
+func (op *Operation) Status() api.StatusCode {
+ return op.status
+}
diff --git a/lxd/operations/response.go b/lxd/operations/response.go
new file mode 100644
index 0000000000..d22c5b9f6a
--- /dev/null
+++ b/lxd/operations/response.go
@@ -0,0 +1,96 @@
+package operations
+
+import (
+ "fmt"
+ "net/http"
+
+ "github.com/lxc/lxd/lxd/response"
+ "github.com/lxc/lxd/lxd/util"
+ "github.com/lxc/lxd/shared/api"
+ "github.com/lxc/lxd/shared/version"
+)
+
+// Operation response
+type operationResponse struct {
+ op *Operation
+}
+
+// OperationResponse returns an operation response.
+func OperationResponse(op *Operation) response.Response {
+ return &operationResponse{op}
+}
+
+func (r *operationResponse) Render(w http.ResponseWriter) error {
+ _, err := r.op.Run()
+ if err != nil {
+ return err
+ }
+
+ url, md, err := r.op.Render()
+ if err != nil {
+ return err
+ }
+
+ body := api.ResponseRaw{
+ Type: api.AsyncResponse,
+ Status: api.OperationCreated.String(),
+ StatusCode: int(api.OperationCreated),
+ Operation: url,
+ Metadata: md,
+ }
+
+ w.Header().Set("Location", url)
+ w.WriteHeader(202)
+
+ return util.WriteJSON(w, body, debug)
+}
+
+func (r *operationResponse) String() string {
+ _, md, err := r.op.Render()
+ if err != nil {
+ return fmt.Sprintf("error: %s", err)
+ }
+
+ return md.ID
+}
+
+// Forwarded operation response.
+//
+// Returned when the operation has been created on another node
+type forwardedOperationResponse struct {
+ op *api.Operation
+ project string
+}
+
+// ForwardedOperationResponse creates a response that forwards the metadata of
+// an operation created on another node.
+func ForwardedOperationResponse(project string, op *api.Operation) response.Response {
+ return &forwardedOperationResponse{
+ op: op,
+ project: project,
+ }
+}
+
+func (r *forwardedOperationResponse) Render(w http.ResponseWriter) error {
+ url := fmt.Sprintf("/%s/operations/%s", version.APIVersion, r.op.ID)
+ if r.project != "" {
+ url += fmt.Sprintf("?project=%s", r.project)
+ }
+
+ body := api.ResponseRaw{
+ Type: api.AsyncResponse,
+ Status: api.OperationCreated.String(),
+ StatusCode: int(api.OperationCreated),
+ Operation: url,
+ Metadata: r.op,
+ }
+
+ w.Header().Set("Location", url)
+ w.WriteHeader(202)
+
+ return util.WriteJSON(w, body, debug)
+}
+
+func (r *forwardedOperationResponse) String() string {
+ return r.op.ID
+}
diff --git a/lxd/operations/websocket.go b/lxd/operations/websocket.go
new file mode 100644
index 0000000000..f16005da85
--- /dev/null
+++ b/lxd/operations/websocket.go
@@ -0,0 +1,63 @@
+package operations
+
+import (
+ "fmt"
+ "net/http"
+
+ "github.com/gorilla/websocket"
+ "github.com/lxc/lxd/lxd/response"
+ "github.com/lxc/lxd/shared"
+)
+
+type operationWebSocket struct {
+ req *http.Request
+ op *Operation
+}
+
+// OperationWebSocket returns a new websocket operation.
+func OperationWebSocket(req *http.Request, op *Operation) response.Response {
+ return &operationWebSocket{req, op}
+}
+
+func (r *operationWebSocket) Render(w http.ResponseWriter) error {
+ chanErr, err := r.op.Connect(r.req, w)
+ if err != nil {
+ return err
+ }
+
+ err = <-chanErr
+ return err
+}
+
+func (r *operationWebSocket) String() string {
+ _, md, err := r.op.Render()
+ if err != nil {
+ return fmt.Sprintf("error: %s", err)
+ }
+
+ return md.ID
+}
+
+type forwardedOperationWebSocket struct {
+ req *http.Request
+ id string
+ source *websocket.Conn // Connection to the node were the operation is running
+}
+
+// ForwardedOperationWebSocket returns a new forwarted websocket operation.
+func ForwardedOperationWebSocket(req *http.Request, id string, source *websocket.Conn) response.Response {
+ return &forwardedOperationWebSocket{req, id, source}
+}
+
+func (r *forwardedOperationWebSocket) Render(w http.ResponseWriter) error {
+ target, err := shared.WebsocketUpgrader.Upgrade(w, r.req, nil)
+ if err != nil {
+ return err
+ }
+ <-shared.WebsocketProxy(r.source, target)
+ return nil
+}
+
+func (r *forwardedOperationWebSocket) String() string {
+ return r.id
+}
diff --git a/lxd/response.go b/lxd/response.go
index 1f1e26dc6a..1cd459dbcb 100644
--- a/lxd/response.go
+++ b/lxd/response.go
@@ -1,16 +1,12 @@
package main
import (
- "fmt"
"net/http"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/instance/instancetype"
"github.com/lxc/lxd/lxd/response"
- "github.com/lxc/lxd/lxd/util"
- "github.com/lxc/lxd/shared/api"
- "github.com/lxc/lxd/shared/version"
)
// ForwardedResponseIfTargetIsRemote redirects a request to the request has a
@@ -78,88 +74,3 @@ func ForwardedResponseIfVolumeIsRemote(d *Daemon, r *http.Request, poolID int64,
}
return response.ForwardedResponse(client, r)
}
-
-// Operation response
-type operationResponse struct {
- op *operation
-}
-
-// OperationResponse returns an operation response.
-func OperationResponse(op *operation) response.Response {
- return &operationResponse{op}
-}
-
-func (r *operationResponse) Render(w http.ResponseWriter) error {
- _, err := r.op.Run()
- if err != nil {
- return err
- }
-
- url, md, err := r.op.Render()
- if err != nil {
- return err
- }
-
- body := api.ResponseRaw{
- Type: api.AsyncResponse,
- Status: api.OperationCreated.String(),
- StatusCode: int(api.OperationCreated),
- Operation: url,
- Metadata: md,
- }
-
- w.Header().Set("Location", url)
- w.WriteHeader(202)
-
- return util.WriteJSON(w, body, debug)
-}
-
-func (r *operationResponse) String() string {
- _, md, err := r.op.Render()
- if err != nil {
- return fmt.Sprintf("error: %s", err)
- }
-
- return md.ID
-}
-
-// Forwarded operation response.
-//
-// Returned when the operation has been created on another node
-type forwardedOperationResponse struct {
- op *api.Operation
- project string
-}
-
-// ForwardedOperationResponse creates a response that forwards the metadata of
-// an operation created on another node.
-func ForwardedOperationResponse(project string, op *api.Operation) response.Response {
- return &forwardedOperationResponse{
- op: op,
- project: project,
- }
-}
-
-func (r *forwardedOperationResponse) Render(w http.ResponseWriter) error {
- url := fmt.Sprintf("/%s/operations/%s", version.APIVersion, r.op.ID)
- if r.project != "" {
- url += fmt.Sprintf("?project=%s", r.project)
- }
-
- body := api.ResponseRaw{
- Type: api.AsyncResponse,
- Status: api.OperationCreated.String(),
- StatusCode: int(api.OperationCreated),
- Operation: url,
- Metadata: r.op,
- }
-
- w.Header().Set("Location", url)
- w.WriteHeader(202)
-
- return util.WriteJSON(w, body, debug)
-}
-
-func (r *forwardedOperationResponse) String() string {
- return r.op.ID
-}
diff --git a/lxd/storage.go b/lxd/storage.go
index c36778f8f8..d4b9749ca6 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -15,6 +15,7 @@ import (
"github.com/lxc/lxd/lxd/device"
"github.com/lxc/lxd/lxd/instance/instancetype"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/state"
driver "github.com/lxc/lxd/lxd/storage"
"github.com/lxc/lxd/shared"
@@ -248,10 +249,10 @@ type storage interface {
// already present on the target instance as an exercise for the
// enterprising developer.
MigrationSource(args MigrationSourceArgs) (MigrationStorageSourceDriver, error)
- MigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error
+ MigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error
StorageMigrationSource(args MigrationSourceArgs) (MigrationStorageSourceDriver, error)
- StorageMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error
+ StorageMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error
}
func storageCoreInit(driver string) (storage, error) {
@@ -739,8 +740,8 @@ func resetContainerDiskIdmap(container container, srcIdmap *idmap.IdmapSet) erro
return nil
}
-func progressWrapperRender(op *operation, key string, description string, progressInt int64, speedInt int64) {
- meta := op.metadata
+func progressWrapperRender(op *operations.Operation, key string, description string, progressInt int64, speedInt int64) {
+ meta := op.Metadata()
if meta == nil {
meta = make(map[string]interface{})
}
@@ -757,7 +758,7 @@ func progressWrapperRender(op *operation, key string, description string, progre
}
// StorageProgressReader reports the read progress.
-func StorageProgressReader(op *operation, key string, description string) func(io.ReadCloser) io.ReadCloser {
+func StorageProgressReader(op *operations.Operation, key string, description string) func(io.ReadCloser) io.ReadCloser {
return func(reader io.ReadCloser) io.ReadCloser {
if op == nil {
return reader
@@ -779,7 +780,7 @@ func StorageProgressReader(op *operation, key string, description string) func(i
}
// StorageProgressWriter reports the write progress.
-func StorageProgressWriter(op *operation, key string, description string) func(io.WriteCloser) io.WriteCloser {
+func StorageProgressWriter(op *operations.Operation, key string, description string) func(io.WriteCloser) io.WriteCloser {
return func(writer io.WriteCloser) io.WriteCloser {
if op == nil {
return writer
diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go
index f339b710a3..9233d27b60 100644
--- a/lxd/storage_btrfs.go
+++ b/lxd/storage_btrfs.go
@@ -17,6 +17,7 @@ import (
"golang.org/x/sys/unix"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/project"
"github.com/lxc/lxd/lxd/state"
driver "github.com/lxc/lxd/lxd/storage"
@@ -2481,7 +2482,7 @@ func (s *storageBtrfs) MigrationSource(args MigrationSourceArgs) (MigrationStora
return sourceDriver, nil
}
-func (s *storageBtrfs) MigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageBtrfs) MigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
if s.s.OS.RunningInUserNS {
return rsyncMigrationSink(conn, op, args)
}
@@ -2961,7 +2962,7 @@ func (s *storageBtrfs) StorageMigrationSource(args MigrationSourceArgs) (Migrati
return rsyncStorageMigrationSource(args)
}
-func (s *storageBtrfs) StorageMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageBtrfs) StorageMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
return rsyncStorageMigrationSink(conn, op, args)
}
diff --git a/lxd/storage_ceph.go b/lxd/storage_ceph.go
index 6a15cfd92e..2172d6e8be 100644
--- a/lxd/storage_ceph.go
+++ b/lxd/storage_ceph.go
@@ -16,6 +16,7 @@ import (
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/project"
driver "github.com/lxc/lxd/lxd/storage"
"github.com/lxc/lxd/shared"
@@ -2696,7 +2697,7 @@ func (s *storageCeph) StorageMigrationSource(args MigrationSourceArgs) (Migratio
return rsyncStorageMigrationSource(args)
}
-func (s *storageCeph) StorageMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageCeph) StorageMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
return rsyncStorageMigrationSink(conn, op, args)
}
@@ -2881,7 +2882,7 @@ func (s *storageCeph) MigrationSource(args MigrationSourceArgs) (MigrationStorag
return &driver, nil
}
-func (s *storageCeph) MigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageCeph) MigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
// Check that we received a valid root disk device with a pool property
// set.
parentStoragePool := ""
diff --git a/lxd/storage_cephfs.go b/lxd/storage_cephfs.go
index 0c3f8b9aa3..ed3f219ece 100644
--- a/lxd/storage_cephfs.go
+++ b/lxd/storage_cephfs.go
@@ -14,6 +14,7 @@ import (
"github.com/pkg/errors"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/state"
driver "github.com/lxc/lxd/lxd/storage"
"github.com/lxc/lxd/shared"
@@ -728,7 +729,7 @@ func (s *storageCephFs) MigrationSource(args MigrationSourceArgs) (MigrationStor
return rsyncMigrationSource(args)
}
-func (s *storageCephFs) MigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageCephFs) MigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
return rsyncMigrationSink(conn, op, args)
}
@@ -824,7 +825,7 @@ func (s *storageCephFs) StorageMigrationSource(args MigrationSourceArgs) (Migrat
return rsyncStorageMigrationSource(args)
}
-func (s *storageCephFs) StorageMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageCephFs) StorageMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
return rsyncStorageMigrationSink(conn, op, args)
}
diff --git a/lxd/storage_dir.go b/lxd/storage_dir.go
index ff5973a8f7..c24eecaae4 100644
--- a/lxd/storage_dir.go
+++ b/lxd/storage_dir.go
@@ -13,6 +13,7 @@ import (
"golang.org/x/sys/unix"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/project"
driver "github.com/lxc/lxd/lxd/storage"
"github.com/lxc/lxd/lxd/storage/quota"
@@ -1306,7 +1307,7 @@ func (s *storageDir) MigrationSource(args MigrationSourceArgs) (MigrationStorage
return rsyncMigrationSource(args)
}
-func (s *storageDir) MigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageDir) MigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
return rsyncMigrationSink(conn, op, args)
}
@@ -1442,7 +1443,7 @@ func (s *storageDir) StorageMigrationSource(args MigrationSourceArgs) (Migration
return rsyncStorageMigrationSource(args)
}
-func (s *storageDir) StorageMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageDir) StorageMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
return rsyncStorageMigrationSink(conn, op, args)
}
diff --git a/lxd/storage_lvm.go b/lxd/storage_lvm.go
index b3ef0b20eb..d41704a8bf 100644
--- a/lxd/storage_lvm.go
+++ b/lxd/storage_lvm.go
@@ -14,6 +14,7 @@ import (
"github.com/pkg/errors"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/project"
driver "github.com/lxc/lxd/lxd/storage"
"github.com/lxc/lxd/shared"
@@ -2068,7 +2069,7 @@ func (s *storageLvm) MigrationSource(args MigrationSourceArgs) (MigrationStorage
return rsyncMigrationSource(args)
}
-func (s *storageLvm) MigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageLvm) MigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
return rsyncMigrationSink(conn, op, args)
}
@@ -2265,7 +2266,7 @@ func (s *storageLvm) StorageMigrationSource(args MigrationSourceArgs) (Migration
return rsyncStorageMigrationSource(args)
}
-func (s *storageLvm) StorageMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageLvm) StorageMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
return rsyncStorageMigrationSink(conn, op, args)
}
diff --git a/lxd/storage_migration.go b/lxd/storage_migration.go
index 51b37c68e7..666ec02287 100644
--- a/lxd/storage_migration.go
+++ b/lxd/storage_migration.go
@@ -10,6 +10,7 @@ import (
deviceConfig "github.com/lxc/lxd/lxd/device/config"
"github.com/lxc/lxd/lxd/instance/instancetype"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/project"
driver "github.com/lxc/lxd/lxd/storage"
"github.com/lxc/lxd/shared"
@@ -23,7 +24,7 @@ type MigrationStorageSourceDriver interface {
/* send any bits of the container/snapshots that are possible while the
* container is still running.
*/
- SendWhileRunning(conn *websocket.Conn, op *operation, bwlimit string, containerOnly bool) error
+ SendWhileRunning(conn *websocket.Conn, op *operations.Operation, bwlimit string, containerOnly bool) error
/* send the final bits (e.g. a final delta snapshot for zfs, btrfs, or
* do a final rsync) of the fs after the container has been
@@ -37,7 +38,7 @@ type MigrationStorageSourceDriver interface {
*/
Cleanup()
- SendStorageVolume(conn *websocket.Conn, op *operation, bwlimit string, storage storage, volumeOnly bool) error
+ SendStorageVolume(conn *websocket.Conn, op *operations.Operation, bwlimit string, storage storage, volumeOnly bool) error
}
type rsyncStorageSourceDriver struct {
@@ -46,7 +47,7 @@ type rsyncStorageSourceDriver struct {
rsyncFeatures []string
}
-func (s rsyncStorageSourceDriver) SendStorageVolume(conn *websocket.Conn, op *operation, bwlimit string, storage storage, volumeOnly bool) error {
+func (s rsyncStorageSourceDriver) SendStorageVolume(conn *websocket.Conn, op *operations.Operation, bwlimit string, storage storage, volumeOnly bool) error {
ourMount, err := storage.StoragePoolVolumeMount()
if err != nil {
return err
@@ -90,7 +91,7 @@ func (s rsyncStorageSourceDriver) SendStorageVolume(conn *websocket.Conn, op *op
return nil
}
-func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operation, bwlimit string, containerOnly bool) error {
+func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operations.Operation, bwlimit string, containerOnly bool) error {
ctName, _, _ := shared.ContainerGetParentAndSnapshotName(s.container.Name())
if !containerOnly {
@@ -220,7 +221,7 @@ func snapshotProtobufToContainerArgs(project string, containerName string, snap
return args
}
-func rsyncStorageMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func rsyncStorageMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
err := args.Storage.StoragePoolVolumeCreate()
if err != nil {
return err
@@ -281,7 +282,7 @@ func rsyncStorageMigrationSink(conn *websocket.Conn, op *operation, args Migrati
return RsyncRecv(path, conn, wrapper, args.RsyncFeatures)
}
-func rsyncMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
ourStart, err := args.Instance.StorageStart()
if err != nil {
return err
diff --git a/lxd/storage_migration_btrfs.go b/lxd/storage_migration_btrfs.go
index 166fd8599c..15a3c936c3 100644
--- a/lxd/storage_migration_btrfs.go
+++ b/lxd/storage_migration_btrfs.go
@@ -9,6 +9,7 @@ import (
"github.com/gorilla/websocket"
+ "github.com/lxc/lxd/lxd/operations"
driver "github.com/lxc/lxd/lxd/storage"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/logger"
@@ -67,7 +68,7 @@ func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string
return err
}
-func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operation, bwlimit string, containerOnly bool) error {
+func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operations.Operation, bwlimit string, containerOnly bool) error {
_, containerPool, _ := s.container.Storage().GetContainerPoolInfo()
containerName := s.container.Name()
containersPath := driver.GetContainerMountPoint("default", containerPool, "")
@@ -178,7 +179,7 @@ func (s *btrfsMigrationSourceDriver) Cleanup() {
}
}
-func (s *btrfsMigrationSourceDriver) SendStorageVolume(conn *websocket.Conn, op *operation, bwlimit string, storage storage, volumeOnly bool) error {
+func (s *btrfsMigrationSourceDriver) SendStorageVolume(conn *websocket.Conn, op *operations.Operation, bwlimit string, storage storage, volumeOnly bool) error {
msg := fmt.Sprintf("Function not implemented")
logger.Errorf(msg)
return fmt.Errorf(msg)
diff --git a/lxd/storage_migration_ceph.go b/lxd/storage_migration_ceph.go
index 90fa2b80a5..aa13e326d4 100644
--- a/lxd/storage_migration_ceph.go
+++ b/lxd/storage_migration_ceph.go
@@ -9,6 +9,7 @@ import (
"github.com/gorilla/websocket"
"github.com/pborman/uuid"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/project"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/logger"
@@ -73,7 +74,7 @@ func (s *rbdMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn, bwl
}
func (s *rbdMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn,
- op *operation, bwlimit string, containerOnly bool) error {
+ op *operations.Operation, bwlimit string, containerOnly bool) error {
containerName := s.container.Name()
if s.container.IsSnapshot() {
// ContainerSnapshotStart() will create the clone that is
@@ -149,7 +150,7 @@ func (s *rbdMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn,
return nil
}
-func (s *rbdMigrationSourceDriver) SendStorageVolume(conn *websocket.Conn, op *operation, bwlimit string, storage storage, volumeOnly bool) error {
+func (s *rbdMigrationSourceDriver) SendStorageVolume(conn *websocket.Conn, op *operations.Operation, bwlimit string, storage storage, volumeOnly bool) error {
msg := fmt.Sprintf("Function not implemented")
logger.Errorf(msg)
return fmt.Errorf(msg)
diff --git a/lxd/storage_migration_zfs.go b/lxd/storage_migration_zfs.go
index fe94bf6eab..f3bc582efe 100644
--- a/lxd/storage_migration_zfs.go
+++ b/lxd/storage_migration_zfs.go
@@ -9,6 +9,7 @@ import (
"github.com/gorilla/websocket"
"github.com/pborman/uuid"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/project"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/logger"
@@ -78,7 +79,7 @@ func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zf
return err
}
-func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operation, bwlimit string, containerOnly bool) error {
+func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operations.Operation, bwlimit string, containerOnly bool) error {
if s.instance.IsSnapshot() {
_, snapOnlyName, _ := shared.ContainerGetParentAndSnapshotName(s.instance.Name())
snapshotName := fmt.Sprintf("snapshot-%s", snapOnlyName)
@@ -139,7 +140,7 @@ func (s *zfsMigrationSourceDriver) Cleanup() {
}
}
-func (s *zfsMigrationSourceDriver) SendStorageVolume(conn *websocket.Conn, op *operation, bwlimit string, storage storage, volumeOnly bool) error {
+func (s *zfsMigrationSourceDriver) SendStorageVolume(conn *websocket.Conn, op *operations.Operation, bwlimit string, storage storage, volumeOnly bool) error {
msg := fmt.Sprintf("Function not implemented")
logger.Errorf(msg)
return fmt.Errorf(msg)
diff --git a/lxd/storage_mock.go b/lxd/storage_mock.go
index 3b9520928e..d3ab28b093 100644
--- a/lxd/storage_mock.go
+++ b/lxd/storage_mock.go
@@ -6,6 +6,7 @@ import (
"github.com/gorilla/websocket"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/state"
"github.com/lxc/lxd/shared/api"
"github.com/lxc/lxd/shared/ioprogress"
@@ -211,7 +212,7 @@ func (s *storageMock) MigrationSource(args MigrationSourceArgs) (MigrationStorag
return nil, nil
}
-func (s *storageMock) MigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageMock) MigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
return nil
}
@@ -231,7 +232,7 @@ func (s *storageMock) StorageMigrationSource(args MigrationSourceArgs) (Migratio
return nil, nil
}
-func (s *storageMock) StorageMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageMock) StorageMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
return nil
}
diff --git a/lxd/storage_volumes.go b/lxd/storage_volumes.go
index 4987d4521f..6b4378bf0d 100644
--- a/lxd/storage_volumes.go
+++ b/lxd/storage_volumes.go
@@ -12,6 +12,7 @@ import (
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
@@ -290,16 +291,16 @@ func doVolumeCreateOrCopy(d *Daemon, poolName string, req *api.StorageVolumesPos
return response.EmptySyncResponse
}
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
return doWork()
}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationVolumeCopy, nil, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationVolumeCopy, nil, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
@@ -410,7 +411,7 @@ func doVolumeMigration(d *Daemon, poolName string, req *api.StorageVolumesPost)
resources := map[string][]string{}
resources["storage_volumes"] = []string{fmt.Sprintf("%s/volumes/custom/%s", poolName, req.Name)}
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
// And finally run the migration.
err = sink.DoStorage(op)
if err != nil {
@@ -421,20 +422,20 @@ func doVolumeMigration(d *Daemon, poolName string, req *api.StorageVolumesPost)
return nil
}
- var op *operation
+ var op *operations.Operation
if push {
- op, err = operationCreate(d.cluster, "", operationClassWebsocket, db.OperationVolumeCreate, resources, sink.Metadata(), run, nil, sink.Connect)
+ op, err = operations.OperationCreate(d.cluster, "", operations.OperationClassWebsocket, db.OperationVolumeCreate, resources, sink.Metadata(), run, nil, sink.Connect)
if err != nil {
return response.InternalError(err)
}
} else {
- op, err = operationCreate(d.cluster, "", operationClassTask, db.OperationVolumeCopy, resources, nil, run, nil, nil)
+ op, err = operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationVolumeCopy, resources, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
// /1.0/storage-pools/{name}/volumes/{type}/{name}
@@ -543,21 +544,21 @@ func storagePoolVolumeTypePost(d *Daemon, r *http.Request, volumeTypeName string
return response.InternalError(err)
}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationVolumeMigrate, resources, nil, ws.DoStorage, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationVolumeMigrate, resources, nil, ws.DoStorage, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
// Pull mode
- op, err := operationCreate(d.cluster, "", operationClassWebsocket, db.OperationVolumeMigrate, resources, ws.Metadata(), ws.DoStorage, nil, ws.Connect)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassWebsocket, db.OperationVolumeMigrate, resources, ws.Metadata(), ws.DoStorage, nil, ws.Connect)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
// Check that the name isn't already in use.
@@ -632,16 +633,16 @@ func storagePoolVolumeTypePost(d *Daemon, r *http.Request, volumeTypeName string
return response.SyncResponseLocation(true, nil, fmt.Sprintf("/%s/storage-pools/%s/volumes/%s", version.APIVersion, poolName, storagePoolVolumeAPIEndpointCustom))
}
- run := func(op *operation) error {
+ run := func(op *operations.Operation) error {
return doWork()
}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationVolumeMove, nil, nil, run, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationVolumeMove, nil, nil, run, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func storagePoolVolumeTypeContainerPost(d *Daemon, r *http.Request) response.Response {
diff --git a/lxd/storage_volumes_snapshot.go b/lxd/storage_volumes_snapshot.go
index 5011f36d4c..db8f50fc81 100644
--- a/lxd/storage_volumes_snapshot.go
+++ b/lxd/storage_volumes_snapshot.go
@@ -9,6 +9,7 @@ import (
"github.com/gorilla/mux"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/response"
driver "github.com/lxc/lxd/lxd/storage"
"github.com/lxc/lxd/lxd/util"
@@ -128,7 +129,7 @@ func storagePoolVolumeSnapshotsTypePost(d *Daemon, r *http.Request) response.Res
volWritable := storage.GetStoragePoolVolumeWritable()
fullSnapName := fmt.Sprintf("%s%s%s", volumeName, shared.SnapshotDelimiter, req.Name)
req.Name = fullSnapName
- snapshot := func(op *operation) error {
+ snapshot := func(op *operations.Operation) error {
dbArgs := &db.StorageVolumeArgs{
Name: fullSnapName,
PoolName: poolName,
@@ -153,12 +154,12 @@ func storagePoolVolumeSnapshotsTypePost(d *Daemon, r *http.Request) response.Res
resources := map[string][]string{}
resources["storage_volumes"] = []string{volumeName}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationVolumeSnapshotCreate, resources, nil, snapshot, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationVolumeSnapshotCreate, resources, nil, snapshot, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func storagePoolVolumeSnapshotsTypeGet(d *Daemon, r *http.Request) response.Response {
@@ -299,7 +300,7 @@ func storagePoolVolumeSnapshotTypePost(d *Daemon, r *http.Request) response.Resp
return response.NotFound(err)
}
- snapshotRename := func(op *operation) error {
+ snapshotRename := func(op *operations.Operation) error {
err = s.StoragePoolVolumeSnapshotRename(req.Name)
if err != nil {
return err
@@ -311,12 +312,12 @@ func storagePoolVolumeSnapshotTypePost(d *Daemon, r *http.Request) response.Resp
resources := map[string][]string{}
resources["storage_volume_snapshots"] = []string{volumeName}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationVolumeSnapshotDelete, resources, nil, snapshotRename, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationVolumeSnapshotDelete, resources, nil, snapshotRename, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func storagePoolVolumeSnapshotTypeGet(d *Daemon, r *http.Request) response.Response {
@@ -434,9 +435,9 @@ func storagePoolVolumeSnapshotTypePut(d *Daemon, r *http.Request) response.Respo
return response.BadRequest(err)
}
- var do func(*operation) error
+ var do func(*operations.Operation) error
var opDescription db.OperationType
- do = func(op *operation) error {
+ do = func(op *operations.Operation) error {
err = storagePoolVolumeSnapshotUpdate(d.State(), poolName, volume.Name, volumeType, req.Description)
if err != nil {
return err
@@ -449,12 +450,12 @@ func storagePoolVolumeSnapshotTypePut(d *Daemon, r *http.Request) response.Respo
resources := map[string][]string{}
resources["storage_volume_snapshots"] = []string{volumeName}
- op, err := operationCreate(d.cluster, "", operationClassTask, opDescription, resources, nil, do, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, opDescription, resources, nil, do, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
func storagePoolVolumeSnapshotTypeDelete(d *Daemon, r *http.Request) response.Response {
@@ -503,7 +504,7 @@ func storagePoolVolumeSnapshotTypeDelete(d *Daemon, r *http.Request) response.Re
return response.NotFound(err)
}
- snapshotDelete := func(op *operation) error {
+ snapshotDelete := func(op *operations.Operation) error {
err = s.StoragePoolVolumeSnapshotDelete()
if err != nil {
return err
@@ -515,10 +516,10 @@ func storagePoolVolumeSnapshotTypeDelete(d *Daemon, r *http.Request) response.Re
resources := map[string][]string{}
resources["storage_volume_snapshots"] = []string{volumeName}
- op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationVolumeSnapshotDelete, resources, nil, snapshotDelete, nil, nil)
+ op, err := operations.OperationCreate(d.cluster, "", operations.OperationClassTask, db.OperationVolumeSnapshotDelete, resources, nil, snapshotDelete, nil, nil)
if err != nil {
return response.InternalError(err)
}
- return OperationResponse(op)
+ return operations.OperationResponse(op)
}
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index aff449d39a..8476d7a05e 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -15,6 +15,7 @@ import (
"golang.org/x/sys/unix"
"github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/project"
driver "github.com/lxc/lxd/lxd/storage"
"github.com/lxc/lxd/lxd/util"
@@ -2566,7 +2567,7 @@ func (s *storageZfs) MigrationSource(args MigrationSourceArgs) (MigrationStorage
return &driver, nil
}
-func (s *storageZfs) MigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageZfs) MigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
poolName := s.getOnDiskPoolName()
zfsName := fmt.Sprintf("containers/%s", project.Prefix(args.Instance.Project(), args.Instance.Name()))
zfsRecv := func(zfsName string, writeWrapper func(io.WriteCloser) io.WriteCloser) error {
@@ -3204,7 +3205,7 @@ func (s *storageZfs) StorageMigrationSource(args MigrationSourceArgs) (Migration
return rsyncStorageMigrationSource(args)
}
-func (s *storageZfs) StorageMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+func (s *storageZfs) StorageMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
return rsyncStorageMigrationSink(conn, op, args)
}
From 966eda70bb58694dc14fb412da9cc07cf6eee085 Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Fri, 27 Sep 2019 11:20:26 +0200
Subject: [PATCH 2/2] test: Add operations package to static analysis
Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
test/suites/static_analysis.sh | 1 +
1 file changed, 1 insertion(+)
diff --git a/test/suites/static_analysis.sh b/test/suites/static_analysis.sh
index e7ec6f686e..bc70da67dd 100644
--- a/test/suites/static_analysis.sh
+++ b/test/suites/static_analysis.sh
@@ -82,6 +82,7 @@ test_static_analysis() {
golint -set_exit_status lxd/maas
#golint -set_exit_status lxd/migration
golint -set_exit_status lxd/node
+ golint -set_exit_status lxd/operations
golint -set_exit_status lxd/response
golint -set_exit_status lxd/state
golint -set_exit_status lxd/storage
More information about the lxc-devel
mailing list