[lxc-devel] [lxd/master] Container scheduling honors pending operations
freeekanayaka on Github
lxc-bot at linuxcontainers.org
Thu Aug 16 11:06:23 UTC 2018
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 942 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180816/f3f04c9c/attachment.bin>
-------------- next part --------------
From 814a5fc031aabcf4d45fd40db2f2e7a7915eef33 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 16 Aug 2018 09:43:52 +0200
Subject: [PATCH 1/3] Add type column to operations table
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/db/cluster/schema.go | 3 ++-
lxd/db/cluster/update.go | 29 ++++++++++++++++++++---------
lxd/db/cluster/update_test.go | 25 +++++++++++++++++++++++++
3 files changed, 47 insertions(+), 10 deletions(-)
diff --git a/lxd/db/cluster/schema.go b/lxd/db/cluster/schema.go
index 5dc7a7272..cd1566898 100644
--- a/lxd/db/cluster/schema.go
+++ b/lxd/db/cluster/schema.go
@@ -167,6 +167,7 @@ CREATE TABLE operations (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
uuid TEXT NOT NULL,
node_id TEXT NOT NULL,
+ type INTEGER NOT NULL DEFAULT 0,
UNIQUE (uuid),
FOREIGN KEY (node_id) REFERENCES nodes (id) ON DELETE CASCADE
);
@@ -246,5 +247,5 @@ CREATE TABLE storage_volumes_config (
FOREIGN KEY (storage_volume_id) REFERENCES storage_volumes (id) ON DELETE CASCADE
);
-INSERT INTO schema (version, updated_at) VALUES (9, strftime("%s"))
+INSERT INTO schema (version, updated_at) VALUES (10, strftime("%s"))
`
diff --git a/lxd/db/cluster/update.go b/lxd/db/cluster/update.go
index 5d09b8280..c4fbc0a32 100644
--- a/lxd/db/cluster/update.go
+++ b/lxd/db/cluster/update.go
@@ -30,15 +30,26 @@ func SchemaDotGo() error {
var SchemaVersion = len(updates)
var updates = map[int]schema.Update{
- 1: updateFromV0,
- 2: updateFromV1,
- 3: updateFromV2,
- 4: updateFromV3,
- 5: updateFromV4,
- 6: updateFromV5,
- 7: updateFromV6,
- 8: updateFromV7,
- 9: updateFromV8,
+ 1: updateFromV0,
+ 2: updateFromV1,
+ 3: updateFromV2,
+ 4: updateFromV3,
+ 5: updateFromV4,
+ 6: updateFromV5,
+ 7: updateFromV6,
+ 8: updateFromV7,
+ 9: updateFromV8,
+ 10: updateFromV9,
+}
+
+// Add a new 'type' column to the operations table.
+func updateFromV9(tx *sql.Tx) error {
+ stmts := `
+ ALTER TABLE operations ADD COLUMN type INTEGER NOT NULL DEFAULT 0;
+ UPDATE operations SET type = 0;
+`
+ _, err := tx.Exec(stmts)
+ return err
}
// The lvm.thinpool_name and lvm.vg_name config keys are node-specific and need
diff --git a/lxd/db/cluster/update_test.go b/lxd/db/cluster/update_test.go
index cc8717b3e..310be112f 100644
--- a/lxd/db/cluster/update_test.go
+++ b/lxd/db/cluster/update_test.go
@@ -346,3 +346,28 @@ INSERT INTO storage_pools_config(storage_pool_id, node_id, key, value)
require.NoError(t, err)
assert.Equal(t, map[string]string{"zfs.clone_copy": "true"}, config)
}
+
+func TestUpdateFromV9(t *testing.T) {
+ schema := cluster.Schema()
+ db, err := schema.ExerciseUpdate(10, func(db *sql.DB) {
+ // Create a node.
+ _, err := db.Exec(
+ "INSERT INTO nodes VALUES (1, 'n1', '', '1.2.3.4:666', 1, 32, ?, 0)",
+ time.Now())
+
+ // Create an operation.
+ _, err = db.Exec("INSERT INTO operations VALUES (1, 'op1', 1)")
+ require.NoError(t, err)
+ })
+ require.NoError(t, err)
+
+ // Check that a type column has been added and that existing rows get type 0.
+ tx, err := db.Begin()
+ require.NoError(t, err)
+
+ defer tx.Rollback()
+
+ types, err := query.SelectIntegers(tx, `SELECT type FROM operations`)
+ require.NoError(t, err)
+ require.Equal(t, []int{0}, types)
+}
From 8785cfbb9d818f71bbacdc827e6a136e37444e89 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 16 Aug 2018 12:31:51 +0200
Subject: [PATCH 2/3] Fill the operations.type column when creating a new
operation
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/api_cluster.go | 4 +-
lxd/cluster/membership.go | 10 +--
lxd/container_backup.go | 6 +-
lxd/container_console.go | 3 +-
lxd/container_delete.go | 3 +-
lxd/container_exec.go | 5 +-
lxd/container_post.go | 10 +--
lxd/container_put.go | 8 +--
lxd/container_snapshot.go | 10 +--
lxd/container_state.go | 14 ++--
lxd/containers_post.go | 12 ++--
lxd/daemon_images_test.go | 3 +-
lxd/db/operations.go | 142 ++++++++++++++++++++++++++++++++++++--
lxd/db/operations_test.go | 8 ++-
lxd/images.go | 8 +--
lxd/migrate_container.go | 3 +-
lxd/operations.go | 6 +-
lxd/storage_volumes.go | 12 ++--
18 files changed, 203 insertions(+), 64 deletions(-)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 2750ade9d..1c5da1b6a 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -189,7 +189,7 @@ func clusterPutBootstrap(d *Daemon, req api.ClusterPut) Response {
resources := map[string][]string{}
resources["cluster"] = []string{}
- op, err := operationCreate(d.cluster, operationClassTask, "Creating bootstrap node", resources, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationClusterBootstrap, resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -426,7 +426,7 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) Response {
resources := map[string][]string{}
resources["cluster"] = []string{}
- op, err := operationCreate(d.cluster, operationClassTask, "Joining cluster", resources, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationClusterJoin, resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 564a374c0..f9e56897c 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -253,7 +253,7 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
// /cluster/nodes request which triggered this code.
var pools map[string]map[string]string
var networks map[string]map[string]string
- var operations []string
+ var operations []db.Operation
err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
pools, err = tx.StoragePoolsNodeConfig()
if err != nil {
@@ -263,7 +263,7 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
if err != nil {
return err
}
- operations, err = tx.OperationsUUIDs()
+ operations, err = tx.Operations()
if err != nil {
return err
}
@@ -396,10 +396,10 @@ func Join(state *state.State, gateway *Gateway, cert *shared.CertInfo, name stri
}
// Migrate outstanding operations.
- for _, uuid := range operations {
- _, err := tx.OperationAdd(uuid)
+ for _, operation := range operations {
+ _, err := tx.OperationAdd(operation.UUID, operation.Type)
if err != nil {
- return errors.Wrapf(err, "failed to migrate operation %s", uuid)
+ return errors.Wrapf(err, "failed to migrate operation %s", operation.UUID)
}
}
diff --git a/lxd/container_backup.go b/lxd/container_backup.go
index cd24c89a7..5eb612408 100644
--- a/lxd/container_backup.go
+++ b/lxd/container_backup.go
@@ -153,7 +153,7 @@ func containerBackupsPost(d *Daemon, r *http.Request) Response {
resources["backups"] = []string{req.Name}
op, err := operationCreate(d.cluster, operationClassTask,
- "Backing up container", resources, nil, backup, nil, nil)
+ db.OperationBackupCreate, resources, nil, backup, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -228,7 +228,7 @@ func containerBackupPost(d *Daemon, r *http.Request) Response {
resources["containers"] = []string{name}
op, err := operationCreate(d.cluster, operationClassTask,
- "Renaming container backup", resources, nil, rename, nil, nil)
+ db.OperationBackupRename, resources, nil, rename, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -268,7 +268,7 @@ func containerBackupDelete(d *Daemon, r *http.Request) Response {
resources["container"] = []string{name}
op, err := operationCreate(d.cluster, operationClassTask,
- "Removing container backup", resources, nil, remove, nil, nil)
+ db.OperationBackupRemove, resources, nil, remove, nil, nil)
if err != nil {
return InternalError(err)
}
diff --git a/lxd/container_console.go b/lxd/container_console.go
index 1cf708893..65d83c4be 100644
--- a/lxd/container_console.go
+++ b/lxd/container_console.go
@@ -16,6 +16,7 @@ import (
"gopkg.in/lxc/go-lxc.v2"
"github.com/lxc/lxd/lxd/cluster"
+ "github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
@@ -330,7 +331,7 @@ func containerConsolePost(d *Daemon, r *http.Request) Response {
resources := map[string][]string{}
resources["containers"] = []string{ws.container.Name()}
- op, err := operationCreate(d.cluster, operationClassWebsocket, "Showing console",
+ op, err := operationCreate(d.cluster, operationClassWebsocket, db.OperationConsoleShow,
resources, ws.Metadata(), ws.Do, nil, ws.Connect)
if err != nil {
return InternalError(err)
diff --git a/lxd/container_delete.go b/lxd/container_delete.go
index b39ada97f..f0be30f82 100644
--- a/lxd/container_delete.go
+++ b/lxd/container_delete.go
@@ -5,6 +5,7 @@ import (
"net/http"
"github.com/gorilla/mux"
+ "github.com/lxc/lxd/lxd/db"
)
func containerDelete(d *Daemon, r *http.Request) Response {
@@ -35,7 +36,7 @@ func containerDelete(d *Daemon, r *http.Request) Response {
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, operationClassTask, "Deleting container", resources, nil, rmct, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationContainerDelete, resources, nil, rmct, nil, nil)
if err != nil {
return InternalError(err)
}
diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index 78fee39b5..adc0c489c 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -17,6 +17,7 @@ import (
"github.com/gorilla/websocket"
"github.com/lxc/lxd/lxd/cluster"
+ "github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
"github.com/lxc/lxd/shared/logger"
@@ -455,7 +456,7 @@ func containerExecPost(d *Daemon, r *http.Request) Response {
resources := map[string][]string{}
resources["containers"] = []string{ws.container.Name()}
- op, err := operationCreate(d.cluster, operationClassWebsocket, "Executing command", resources, ws.Metadata(), ws.Do, nil, ws.Connect)
+ op, err := operationCreate(d.cluster, operationClassWebsocket, db.OperationCommandExec, resources, ws.Metadata(), ws.Do, nil, ws.Connect)
if err != nil {
return InternalError(err)
}
@@ -507,7 +508,7 @@ func containerExecPost(d *Daemon, r *http.Request) Response {
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, operationClassTask, "Executing command", resources, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationCommandExec, resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
diff --git a/lxd/container_post.go b/lxd/container_post.go
index 3a8082585..d0f9717e5 100644
--- a/lxd/container_post.go
+++ b/lxd/container_post.go
@@ -207,7 +207,7 @@ func containerPost(d *Daemon, r *http.Request) Response {
return InternalError(err)
}
- op, err := operationCreate(d.cluster, operationClassTask, "Migrating container", resources, nil, ws.Do, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationContainerMigrate, resources, nil, ws.Do, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -216,7 +216,7 @@ func containerPost(d *Daemon, r *http.Request) Response {
}
// Pull mode
- op, err := operationCreate(d.cluster, operationClassWebsocket, "Migrating container", resources, ws.Metadata(), ws.Do, nil, ws.Connect)
+ op, err := operationCreate(d.cluster, operationClassWebsocket, db.OperationContainerMigrate, resources, ws.Metadata(), ws.Do, nil, ws.Connect)
if err != nil {
return InternalError(err)
}
@@ -237,7 +237,7 @@ func containerPost(d *Daemon, r *http.Request) Response {
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, operationClassTask, "Renaming container", resources, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationContainerRename, resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -380,7 +380,7 @@ func containerPostClusteringMigrate(d *Daemon, c container, oldName, newName, ne
resources := map[string][]string{}
resources["containers"] = []string{oldName}
- op, err := operationCreate(d.cluster, operationClassTask, "Moving container", resources, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationContainerMigrate, resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -487,7 +487,7 @@ func containerPostClusteringMigrateWithCeph(d *Daemon, c container, oldName, new
resources := map[string][]string{}
resources["containers"] = []string{oldName}
- op, err := operationCreate(d.cluster, operationClassTask, "Moving container", resources, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationContainerMigrate, resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
diff --git a/lxd/container_put.go b/lxd/container_put.go
index e92469e96..f13d5cb7b 100644
--- a/lxd/container_put.go
+++ b/lxd/container_put.go
@@ -56,7 +56,7 @@ func containerPut(d *Daemon, r *http.Request) Response {
}
var do func(*operation) error
- var opDescription string
+ var opType db.OperationType
if configRaw.Restore == "" {
// Update container configuration
do = func(op *operation) error {
@@ -78,20 +78,20 @@ func containerPut(d *Daemon, r *http.Request) Response {
return nil
}
- opDescription = "Updating container"
+ opType = db.OperationSnapshotUpdate
} else {
// Snapshot Restore
do = func(op *operation) error {
return containerSnapRestore(d.State(), name, configRaw.Restore, configRaw.Stateful)
}
- opDescription = "Restoring snapshot"
+ opType = db.OperationSnapshotRestore
}
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, operationClassTask, opDescription, resources, nil, do, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, opType, resources, nil, do, nil, nil)
if err != nil {
return InternalError(err)
}
diff --git a/lxd/container_snapshot.go b/lxd/container_snapshot.go
index fcb6eb696..171b9cd34 100644
--- a/lxd/container_snapshot.go
+++ b/lxd/container_snapshot.go
@@ -141,7 +141,7 @@ func containerSnapshotsPost(d *Daemon, r *http.Request) Response {
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, operationClassTask, "Snapshotting container", resources, nil, snapshot, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationSnapshotCreate, resources, nil, snapshot, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -255,7 +255,7 @@ func snapshotPost(d *Daemon, r *http.Request, sc container, containerName string
return InternalError(err)
}
- op, err := operationCreate(d.cluster, operationClassTask, "Transferring snapshot", resources, nil, ws.Do, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationSnapshotTransfer, resources, nil, ws.Do, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -264,7 +264,7 @@ func snapshotPost(d *Daemon, r *http.Request, sc container, containerName string
}
// Pull mode
- op, err := operationCreate(d.cluster, operationClassWebsocket, "Transferring snapshot", resources, ws.Metadata(), ws.Do, nil, ws.Connect)
+ op, err := operationCreate(d.cluster, operationClassWebsocket, db.OperationSnapshotTransfer, resources, ws.Metadata(), ws.Do, nil, ws.Connect)
if err != nil {
return InternalError(err)
}
@@ -297,7 +297,7 @@ func snapshotPost(d *Daemon, r *http.Request, sc container, containerName string
resources := map[string][]string{}
resources["containers"] = []string{containerName}
- op, err := operationCreate(d.cluster, operationClassTask, "Renaming snapshot", resources, nil, rename, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationSnapshotRename, resources, nil, rename, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -313,7 +313,7 @@ func snapshotDelete(sc container, name string) Response {
resources := map[string][]string{}
resources["containers"] = []string{sc.Name()}
- op, err := operationCreate(sc.DaemonState().Cluster, operationClassTask, "Deleting snapshot", resources, nil, remove, nil, nil)
+ op, err := operationCreate(sc.DaemonState().Cluster, operationClassTask, db.OperationSnapshotDelete, resources, nil, remove, nil, nil)
if err != nil {
return InternalError(err)
}
diff --git a/lxd/container_state.go b/lxd/container_state.go
index e75aa68f7..50dfd5a19 100644
--- a/lxd/container_state.go
+++ b/lxd/container_state.go
@@ -67,11 +67,11 @@ func containerStatePut(d *Daemon, r *http.Request) Response {
return SmartError(err)
}
- var opDescription string
+ var opType db.OperationType
var do func(*operation) error
switch shared.ContainerAction(raw.Action) {
case shared.Start:
- opDescription = "Starting container"
+ opType = db.OperationContainerStart
do = func(op *operation) error {
c.SetOperation(op)
if err = c.Start(raw.Stateful); err != nil {
@@ -80,7 +80,7 @@ func containerStatePut(d *Daemon, r *http.Request) Response {
return nil
}
case shared.Stop:
- opDescription = "Stopping container"
+ opType = db.OperationContainerStop
if raw.Stateful {
do = func(op *operation) error {
c.SetOperation(op)
@@ -120,7 +120,7 @@ func containerStatePut(d *Daemon, r *http.Request) Response {
}
}
case shared.Restart:
- opDescription = "Restarting container"
+ opType = db.OperationContainerRestart
do = func(op *operation) error {
c.SetOperation(op)
ephemeral := c.IsEphemeral()
@@ -172,13 +172,13 @@ func containerStatePut(d *Daemon, r *http.Request) Response {
return nil
}
case shared.Freeze:
- opDescription = "Freezing container"
+ opType = db.OperationContainerFreeze
do = func(op *operation) error {
c.SetOperation(op)
return c.Freeze()
}
case shared.Unfreeze:
- opDescription = "Unfreezing container"
+ opType = db.OperationContainerUnfreeze
do = func(op *operation) error {
c.SetOperation(op)
return c.Unfreeze()
@@ -190,7 +190,7 @@ func containerStatePut(d *Daemon, r *http.Request) Response {
resources := map[string][]string{}
resources["containers"] = []string{name}
- op, err := operationCreate(d.cluster, operationClassTask, opDescription, resources, nil, do, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, opType, resources, nil, do, nil, nil)
if err != nil {
return InternalError(err)
}
diff --git a/lxd/containers_post.go b/lxd/containers_post.go
index 202a4eb4e..22f57d3a2 100644
--- a/lxd/containers_post.go
+++ b/lxd/containers_post.go
@@ -133,7 +133,7 @@ func createFromImage(d *Daemon, req *api.ContainersPost) Response {
resources := map[string][]string{}
resources["containers"] = []string{req.Name}
- op, err := operationCreate(d.cluster, operationClassTask, "Creating container", resources, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationContainerCreate, resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -168,7 +168,7 @@ func createFromNone(d *Daemon, req *api.ContainersPost) Response {
resources := map[string][]string{}
resources["containers"] = []string{req.Name}
- op, err := operationCreate(d.cluster, operationClassTask, "Creating container", resources, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationContainerCreate, resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -428,12 +428,12 @@ func createFromMigration(d *Daemon, req *api.ContainersPost) Response {
var op *operation
if push {
- op, err = operationCreate(d.cluster, operationClassWebsocket, "Creating container", resources, sink.Metadata(), run, nil, sink.Connect)
+ op, err = operationCreate(d.cluster, operationClassWebsocket, db.OperationContainerCreate, resources, sink.Metadata(), run, nil, sink.Connect)
if err != nil {
return InternalError(err)
}
} else {
- op, err = operationCreate(d.cluster, operationClassTask, "Creating container", resources, nil, run, nil, nil)
+ op, err = operationCreate(d.cluster, operationClassTask, db.OperationContainerCreate, resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -529,7 +529,7 @@ func createFromCopy(d *Daemon, req *api.ContainersPost) Response {
resources := map[string][]string{}
resources["containers"] = []string{req.Name, req.Source.Source}
- op, err := operationCreate(d.cluster, operationClassTask, "Creating container", resources, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationContainerCreate, resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -597,7 +597,7 @@ func createFromBackup(d *Daemon, data io.Reader) Response {
resources := map[string][]string{}
resources["containers"] = []string{bInfo.Name}
- op, err := operationCreate(d.cluster, operationClassTask, "Restoring backup",
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationBackupRestore,
resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
diff --git a/lxd/daemon_images_test.go b/lxd/daemon_images_test.go
index ba84669d8..3c3283afd 100644
--- a/lxd/daemon_images_test.go
+++ b/lxd/daemon_images_test.go
@@ -5,6 +5,7 @@ import (
"time"
"github.com/lxc/lxd/client"
+ "github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/shared/api"
"github.com/stretchr/testify/suite"
)
@@ -39,7 +40,7 @@ func (suite *daemonImagesTestSuite) TestUseCachedImagesIfAvailable() {
// Request an image with alias "test" and check that it's the
// one we created above.
- op, err := operationCreate(suite.d.cluster, operationClassTask, "Downloading image", map[string][]string{}, nil, nil, nil, nil)
+ op, err := operationCreate(suite.d.cluster, operationClassTask, db.OperationImageDownload, map[string][]string{}, nil, nil, nil, nil)
suite.Req.Nil(err)
image, err := suite.d.ImageDownload(op, "img.srv", "simplestreams", "", "", "test", false, false, "", true)
suite.Req.Nil(err)
diff --git a/lxd/db/operations.go b/lxd/db/operations.go
index 01ce13cce..3d9225e8c 100644
--- a/lxd/db/operations.go
+++ b/lxd/db/operations.go
@@ -7,12 +7,139 @@ import (
"github.com/pkg/errors"
)
+// OperationType is a numeric code indentifying the type of an Operation.
+type OperationType int64
+
+// Possible values for OperationType
+//
+// WARNING: The type codes are stored in the database, so this list of
+// definitions should be normally append-only. Any other change
+// requires a database update.
+const (
+ OperationUnknown OperationType = iota
+ OperationClusterBootstrap
+ OperationClusterJoin
+ OperationBackupCreate
+ OperationBackupRename
+ OperationBackupRestore
+ OperationBackupRemove
+ OperationConsoleShow
+ OperationContainerCreate
+ OperationContainerUpdate
+ OperationContainerRename
+ OperationContainerMigrate
+ OperationContainerLiveMigrate
+ OperationContainerFreeze
+ OperationContainerUnfreeze
+ OperationContainerDelete
+ OperationContainerStart
+ OperationContainerStop
+ OperationContainerRestart
+ OperationCommandExec
+ OperationSnapshotCreate
+ OperationSnapshotRename
+ OperationSnapshotRestore
+ OperationSnapshotTransfer
+ OperationSnapshotUpdate
+ OperationSnapshotDelete
+ OperationImageDownload
+ OperationImageDelete
+ OperationImageToken
+ OperationImageRefresh
+ OperationVolumeCopy
+ OperationVolumeCreate
+ OperationVolumeMigrate
+ OperationVolumeMove
+)
+
+// Description return a human-readable description of the operation type.
+func (t OperationType) Description() string {
+ switch t {
+ case OperationClusterBootstrap:
+ return "Creating bootstrap node"
+ case OperationClusterJoin:
+ return "Joining cluster"
+ case OperationBackupCreate:
+ return "Backing up container"
+ case OperationBackupRename:
+ return "Renaming container backup"
+ case OperationBackupRestore:
+ return "Restoring backup"
+ case OperationBackupRemove:
+ return "Removing container backup"
+ case OperationConsoleShow:
+ return "Showing console"
+ case OperationContainerCreate:
+ return "Creating container"
+ case OperationContainerUpdate:
+ return "Updating container"
+ case OperationContainerRename:
+ return "Renaming container"
+ case OperationContainerMigrate:
+ return "Migrating container"
+ case OperationContainerLiveMigrate:
+ return "Live-migrating container"
+ case OperationContainerFreeze:
+ return "Freezing container"
+ case OperationContainerUnfreeze:
+ return "Unfreezing container"
+ case OperationContainerDelete:
+ return "Deleting container"
+ case OperationContainerStart:
+ return "Starting container"
+ case OperationContainerStop:
+ return "Stopping container"
+ case OperationContainerRestart:
+ return "Restarting container"
+ case OperationCommandExec:
+ return "Executing command"
+ case OperationSnapshotCreate:
+ return "Snapshotting container"
+ case OperationSnapshotRename:
+ return "Renaming snapshot"
+ case OperationSnapshotRestore:
+ return "Restoring snapshot"
+ case OperationSnapshotTransfer:
+ return "Transferring snapshot"
+ case OperationSnapshotUpdate:
+ return "Updating snapshot"
+ case OperationSnapshotDelete:
+ return "Deleting snapshot"
+ case OperationImageDownload:
+ return "Downloading image"
+ case OperationImageDelete:
+ return "Deleting image"
+ case OperationImageToken:
+ return "Image download token"
+ case OperationImageRefresh:
+ return "Refreshing image"
+ case OperationVolumeCopy:
+ return "Copying storage volume"
+ case OperationVolumeCreate:
+ return "Creating storage volume"
+ case OperationVolumeMigrate:
+ return "Migrating storage volume"
+ case OperationVolumeMove:
+ return "Moving storage volume"
+ default:
+ return "Executing operation"
+
+ }
+
+}
+
// Operation holds information about a single LXD operation running on a node
// in the cluster.
type Operation struct {
- ID int64 // Stable database identifier
- UUID string // User-visible identifier
- NodeAddress string // Address of the node the operation is running on
+ ID int64 // Stable database identifier
+ UUID string // User-visible identifier
+ NodeAddress string // Address of the node the operation is running on
+ Type OperationType // Type of the operation
+}
+
+// Operations returns all operations associated with this node.
+func (c *ClusterTx) Operations() ([]Operation, error) {
+ return c.operations("node_id=?", c.nodeID)
}
// OperationsUUIDs returns the UUIDs of all operations associated with this
@@ -46,9 +173,9 @@ func (c *ClusterTx) OperationByUUID(uuid string) (Operation, error) {
}
// OperationAdd adds a new operations to the table.
-func (c *ClusterTx) OperationAdd(uuid string) (int64, error) {
- columns := []string{"uuid", "node_id"}
- values := []interface{}{uuid, c.nodeID}
+func (c *ClusterTx) OperationAdd(uuid string, typ OperationType) (int64, error) {
+ columns := []string{"uuid", "node_id", "type"}
+ values := []interface{}{uuid, c.nodeID, typ}
return query.UpsertObject(c.tx, "operations", columns, values)
}
@@ -77,10 +204,11 @@ func (c *ClusterTx) operations(where string, args ...interface{}) ([]Operation,
&operations[i].ID,
&operations[i].UUID,
&operations[i].NodeAddress,
+ &operations[i].Type,
}
}
stmt := `
-SELECT operations.id, uuid, nodes.address FROM operations JOIN nodes ON nodes.id = node_id `
+SELECT operations.id, uuid, nodes.address, type FROM operations JOIN nodes ON nodes.id = node_id `
if where != "" {
stmt += fmt.Sprintf("WHERE %s ", where)
}
diff --git a/lxd/db/operations_test.go b/lxd/db/operations_test.go
index 60ca96a75..267e121fd 100644
--- a/lxd/db/operations_test.go
+++ b/lxd/db/operations_test.go
@@ -13,13 +13,19 @@ func TestOperation(t *testing.T) {
tx, cleanup := db.NewTestClusterTx(t)
defer cleanup()
- id, err := tx.OperationAdd("abcd")
+ id, err := tx.OperationAdd("abcd", db.OperationContainerCreate)
require.NoError(t, err)
assert.Equal(t, int64(1), id)
+ operations, err := tx.Operations()
+ require.NoError(t, err)
+ assert.Len(t, operations, 1)
+ assert.Equal(t, operations[0].UUID, "abcd")
+
operation, err := tx.OperationByUUID("abcd")
require.NoError(t, err)
assert.Equal(t, id, operation.ID)
+ assert.Equal(t, db.OperationContainerCreate, operation.Type)
uuids, err := tx.OperationsUUIDs()
require.NoError(t, err)
diff --git a/lxd/images.go b/lxd/images.go
index f5d0d95b4..749cde9f5 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -726,7 +726,7 @@ func imagesPost(d *Daemon, r *http.Request) Response {
return nil
}
- op, err := operationCreate(d.cluster, operationClassTask, "Downloading image", nil, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationImageDownload, nil, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -1263,7 +1263,7 @@ func imageDelete(d *Daemon, r *http.Request) Response {
resources := map[string][]string{}
resources["images"] = []string{fingerprint}
- op, err := operationCreate(d.cluster, operationClassTask, "Deleting image", resources, nil, rmimg, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationImageDelete, resources, nil, rmimg, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -1745,7 +1745,7 @@ func imageSecret(d *Daemon, r *http.Request) Response {
resources := map[string][]string{}
resources["images"] = []string{imgInfo.Fingerprint}
- op, err := operationCreate(d.cluster, operationClassToken, "Image download token", resources, meta, nil, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassToken, db.OperationImageToken, resources, meta, nil, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -1821,7 +1821,7 @@ func imageRefresh(d *Daemon, r *http.Request) Response {
return autoUpdateImage(d, op, imageId, imageInfo)
}
- op, err := operationCreate(d.cluster, operationClassTask, "Refreshing image", nil, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationImageRefresh, nil, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
diff --git a/lxd/migrate_container.go b/lxd/migrate_container.go
index 2ec04d072..9cd8c6dcb 100644
--- a/lxd/migrate_container.go
+++ b/lxd/migrate_container.go
@@ -15,6 +15,7 @@ import (
"github.com/gorilla/websocket"
"gopkg.in/lxc/go-lxc.v2"
+ "github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/migration"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
@@ -488,7 +489,7 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
actionScriptOp, err := operationCreate(
state.Cluster,
operationClassWebsocket,
- "Live-migrating container",
+ db.OperationContainerLiveMigrate,
nil,
nil,
func(op *operation) error {
diff --git a/lxd/operations.go b/lxd/operations.go
index 9ed586451..793df4fdd 100644
--- a/lxd/operations.go
+++ b/lxd/operations.go
@@ -397,11 +397,11 @@ func (op *operation) UpdateMetadata(opMetadata interface{}) error {
return nil
}
-func operationCreate(cluster *db.Cluster, opClass operationClass, description string, opResources map[string][]string, opMetadata interface{}, onRun func(*operation) error, onCancel func(*operation) error, onConnect func(*operation, *http.Request, http.ResponseWriter) error) (*operation, error) {
+func operationCreate(cluster *db.Cluster, opClass operationClass, opType db.OperationType, opResources map[string][]string, opMetadata interface{}, onRun func(*operation) error, onCancel func(*operation) error, onConnect func(*operation, *http.Request, http.ResponseWriter) error) (*operation, error) {
// Main attributes
op := operation{}
op.id = uuid.NewRandom().String()
- op.description = description
+ op.description = opType.Description()
op.class = opClass
op.createdAt = time.Now()
op.updatedAt = op.createdAt
@@ -444,7 +444,7 @@ func operationCreate(cluster *db.Cluster, opClass operationClass, description st
operationsLock.Unlock()
err = op.cluster.Transaction(func(tx *db.ClusterTx) error {
- _, err := tx.OperationAdd(op.id)
+ _, err := tx.OperationAdd(op.id, opType)
return err
})
if err != nil {
diff --git a/lxd/storage_volumes.go b/lxd/storage_volumes.go
index 5f6e28420..9ee0392fd 100644
--- a/lxd/storage_volumes.go
+++ b/lxd/storage_volumes.go
@@ -227,7 +227,7 @@ func doVolumeCreateOrCopy(d *Daemon, poolName string, req *api.StorageVolumesPos
return doWork()
}
- op, err := operationCreate(d.cluster, operationClassTask, "Copying storage volume", nil, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationVolumeCopy, nil, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -355,12 +355,12 @@ func doVolumeMigration(d *Daemon, poolName string, req *api.StorageVolumesPost)
var op *operation
if push {
- op, err = operationCreate(d.cluster, operationClassWebsocket, "Creating storage volume", resources, sink.Metadata(), run, nil, sink.Connect)
+ op, err = operationCreate(d.cluster, operationClassWebsocket, db.OperationVolumeCreate, resources, sink.Metadata(), run, nil, sink.Connect)
if err != nil {
return InternalError(err)
}
} else {
- op, err = operationCreate(d.cluster, operationClassTask, "Copying storage volume", resources, nil, run, nil, nil)
+ op, err = operationCreate(d.cluster, operationClassTask, db.OperationVolumeCopy, resources, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -465,7 +465,7 @@ func storagePoolVolumeTypePost(d *Daemon, r *http.Request) Response {
return InternalError(err)
}
- op, err := operationCreate(d.cluster, operationClassTask, "Migrating storage volume", resources, nil, ws.DoStorage, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationVolumeMigrate, resources, nil, ws.DoStorage, nil, nil)
if err != nil {
return InternalError(err)
}
@@ -474,7 +474,7 @@ func storagePoolVolumeTypePost(d *Daemon, r *http.Request) Response {
}
// Pull mode
- op, err := operationCreate(d.cluster, operationClassWebsocket, "Migrating storage volume", resources, ws.Metadata(), ws.DoStorage, nil, ws.Connect)
+ op, err := operationCreate(d.cluster, operationClassWebsocket, db.OperationVolumeMigrate, resources, ws.Metadata(), ws.DoStorage, nil, ws.Connect)
if err != nil {
return InternalError(err)
}
@@ -544,7 +544,7 @@ func storagePoolVolumeTypePost(d *Daemon, r *http.Request) Response {
return doWork()
}
- op, err := operationCreate(d.cluster, operationClassTask, "Moving storage volume", nil, nil, run, nil, nil)
+ op, err := operationCreate(d.cluster, operationClassTask, db.OperationVolumeMove, nil, nil, run, nil, nil)
if err != nil {
return InternalError(err)
}
From b2d8888084a7d078dc0e0def1df442ed00af3fdd Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 16 Aug 2018 13:01:05 +0200
Subject: [PATCH 3/3] Consider pending containers when placing a new container
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/db/node.go | 20 ++++++++++++++++----
lxd/db/node_test.go | 20 ++++++++++++++++++++
2 files changed, 36 insertions(+), 4 deletions(-)
diff --git a/lxd/db/node.go b/lxd/db/node.go
index 356f5beb9..645d43bdc 100644
--- a/lxd/db/node.go
+++ b/lxd/db/node.go
@@ -391,8 +391,9 @@ func (c *ClusterTx) NodeOfflineThreshold() (time.Duration, error) {
return threshold, nil
}
-// NodeWithLeastContainers returns the name of the non-offline node with
-// with the least number of containers.
+// NodeWithLeastContainers returns the name of the non-offline node with with
+// the least number of containers (either already created or being created with
+// an operation).
func (c *ClusterTx) NodeWithLeastContainers() (string, error) {
threshold, err := c.NodeOfflineThreshold()
if err != nil {
@@ -409,10 +410,21 @@ func (c *ClusterTx) NodeWithLeastContainers() (string, error) {
if node.IsOffline(threshold) {
continue
}
- count, err := query.Count(c.tx, "containers", "node_id=?", node.ID)
+
+ // Fetch the number of containers already created on this node.
+ created, err := query.Count(c.tx, "containers", "node_id=?", node.ID)
if err != nil {
- return "", errors.Wrap(err, "failed to get containers count")
+ return "", errors.Wrap(err, "Failed to get containers count")
}
+
+ // Fetch the number of containers currently being created on this node.
+ pending, err := query.Count(
+ c.tx, "operations", "node_id=? AND type=?", node.ID, OperationContainerCreate)
+ if err != nil {
+ return "", errors.Wrap(err, "Failed to get pending containers count")
+ }
+
+ count := created + pending
if containers == -1 || count < containers {
containers = count
name = node.Name
diff --git a/lxd/db/node_test.go b/lxd/db/node_test.go
index d9afde80c..8d8c17dd2 100644
--- a/lxd/db/node_test.go
+++ b/lxd/db/node_test.go
@@ -260,3 +260,23 @@ INSERT INTO containers (id, node_id, name, architecture, type) VALUES (1, ?, 'fo
require.NoError(t, err)
assert.Equal(t, "buzz", name)
}
+
+// If there are 2 online nodes, and a container is pending on one of them,
+// return the address of the other one number of containers.
+func TestNodeWithLeastContainers_Pending(t *testing.T) {
+ tx, cleanup := db.NewTestClusterTx(t)
+ defer cleanup()
+
+ _, err := tx.NodeAdd("buzz", "1.2.3.4:666")
+ require.NoError(t, err)
+
+ // Add a pending container to the default node (ID 1)
+ _, err = tx.Tx().Exec(`
+INSERT INTO operations (id, uuid, node_id, type) VALUES (1, 'abc', 1, ?)
+`, db.OperationContainerCreate)
+ require.NoError(t, err)
+
+ name, err := tx.NodeWithLeastContainers()
+ require.NoError(t, err)
+ assert.Equal(t, "buzz", name)
+}
More information about the lxc-devel
mailing list