[lxc-devel] [lxd/master] Clustering move ceph based containers

freeekanayaka on Github lxc-bot at linuxcontainers.org
Thu Mar 15 13:26:11 UTC 2018


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 777 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180315/5208f308/attachment.bin>
-------------- next part --------------
From 510992c6c67c5c4475f4d08dd84329578569cf4d Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 14 Mar 2018 11:28:26 +0000
Subject: [PATCH 1/5] Move isRootDiskDevice and containerGetRootDiskDevice to
 lxd/shared

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/container.go              | 34 ++--------------------------------
 lxd/container_lxc.go          | 12 ++++++------
 lxd/containers_post.go        |  6 +++---
 lxd/patches.go                |  6 +++---
 lxd/profiles_utils.go         |  7 ++++---
 lxd/storage_btrfs.go          |  4 ++--
 lxd/storage_ceph_migration.go |  4 ++--
 lxd/storage_migration.go      |  6 +++---
 lxd/storage_zfs.go            |  4 ++--
 shared/container.go           | 34 ++++++++++++++++++++++++++++++++++
 10 files changed, 61 insertions(+), 56 deletions(-)

diff --git a/lxd/container.go b/lxd/container.go
index c03a9dc03..d1890e1a2 100644
--- a/lxd/container.go
+++ b/lxd/container.go
@@ -281,36 +281,6 @@ func containerValidConfig(os *sys.OS, config map[string]string, profile bool, ex
 	return nil
 }
 
-func isRootDiskDevice(device types.Device) bool {
-	if device["type"] == "disk" && device["path"] == "/" && device["source"] == "" {
-		return true
-	}
-
-	return false
-}
-
-func containerGetRootDiskDevice(devices types.Devices) (string, types.Device, error) {
-	var devName string
-	var dev types.Device
-
-	for n, d := range devices {
-		if isRootDiskDevice(d) {
-			if devName != "" {
-				return "", types.Device{}, fmt.Errorf("More than one root device found.")
-			}
-
-			devName = n
-			dev = d
-		}
-	}
-
-	if devName != "" {
-		return devName, dev, nil
-	}
-
-	return "", types.Device{}, fmt.Errorf("No root device could be found.")
-}
-
 func containerValidDevices(db *db.Cluster, devices types.Devices, profile bool, expanded bool) error {
 	// Empty device list
 	if devices == nil {
@@ -447,7 +417,7 @@ func containerValidDevices(db *db.Cluster, devices types.Devices, profile bool,
 
 	// Checks on the expanded config
 	if expanded {
-		_, _, err := containerGetRootDiskDevice(devices)
+		_, _, err := shared.GetRootDiskDevice(devices)
 		if err != nil {
 			return err
 		}
@@ -931,7 +901,7 @@ func containerCreateInternal(s *state.State, args db.ContainerArgs) (container,
 
 func containerConfigureInternal(c container) error {
 	// Find the root device
-	_, rootDiskDevice, err := containerGetRootDiskDevice(c.ExpandedDevices())
+	_, rootDiskDevice, err := shared.GetRootDiskDevice(c.ExpandedDevices())
 	if err != nil {
 		return err
 	}
diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index 66cb2aa1a..b73b0c9e5 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -315,7 +315,7 @@ func containerLXCCreate(s *state.State, args db.ContainerArgs) (container, error
 	}
 
 	// Retrieve the container's storage pool
-	_, rootDiskDevice, err := containerGetRootDiskDevice(c.expandedDevices)
+	_, rootDiskDevice, err := shared.GetRootDiskDevice(c.expandedDevices)
 	if err != nil {
 		c.Delete()
 		return nil, err
@@ -1500,7 +1500,7 @@ func (c *containerLXC) initLXC(config bool) error {
 			// bump network index
 			networkidx++
 		} else if m["type"] == "disk" {
-			isRootfs := isRootDiskDevice(m)
+			isRootfs := shared.IsRootDiskDevice(m)
 
 			// source paths
 			srcPath := shared.HostPath(m["source"])
@@ -3719,19 +3719,19 @@ func (c *containerLXC) Update(args db.ContainerArgs, userRequested bool) error {
 	}
 
 	// Retrieve old root disk devices.
-	oldLocalRootDiskDeviceKey, oldLocalRootDiskDevice, _ := containerGetRootDiskDevice(oldLocalDevices)
+	oldLocalRootDiskDeviceKey, oldLocalRootDiskDevice, _ := shared.GetRootDiskDevice(oldLocalDevices)
 	var oldProfileRootDiskDevices []string
 	for k, v := range oldExpandedDevices {
-		if isRootDiskDevice(v) && k != oldLocalRootDiskDeviceKey && !shared.StringInSlice(k, oldProfileRootDiskDevices) {
+		if shared.IsRootDiskDevice(v) && k != oldLocalRootDiskDeviceKey && !shared.StringInSlice(k, oldProfileRootDiskDevices) {
 			oldProfileRootDiskDevices = append(oldProfileRootDiskDevices, k)
 		}
 	}
 
 	// Retrieve new root disk devices.
-	newLocalRootDiskDeviceKey, newLocalRootDiskDevice, _ := containerGetRootDiskDevice(c.localDevices)
+	newLocalRootDiskDeviceKey, newLocalRootDiskDevice, _ := shared.GetRootDiskDevice(c.localDevices)
 	var newProfileRootDiskDevices []string
 	for k, v := range c.expandedDevices {
-		if isRootDiskDevice(v) && k != newLocalRootDiskDeviceKey && !shared.StringInSlice(k, newProfileRootDiskDevices) {
+		if shared.IsRootDiskDevice(v) && k != newLocalRootDiskDeviceKey && !shared.StringInSlice(k, newProfileRootDiskDevices) {
 			newProfileRootDiskDevices = append(newProfileRootDiskDevices, k)
 		}
 	}
diff --git a/lxd/containers_post.go b/lxd/containers_post.go
index c31aba8de..ec5e664a4 100644
--- a/lxd/containers_post.go
+++ b/lxd/containers_post.go
@@ -200,7 +200,7 @@ func createFromMigration(d *Daemon, req *api.ContainersPost) Response {
 	storagePool := ""
 	storagePoolProfile := ""
 
-	localRootDiskDeviceKey, localRootDiskDevice, _ := containerGetRootDiskDevice(req.Devices)
+	localRootDiskDeviceKey, localRootDiskDevice, _ := shared.GetRootDiskDevice(req.Devices)
 	if localRootDiskDeviceKey != "" {
 		storagePool = localRootDiskDevice["pool"]
 	}
@@ -224,7 +224,7 @@ func createFromMigration(d *Daemon, req *api.ContainersPost) Response {
 				return SmartError(err)
 			}
 
-			k, v, _ := containerGetRootDiskDevice(p.Devices)
+			k, v, _ := shared.GetRootDiskDevice(p.Devices)
 			if k != "" && v["pool"] != "" {
 				// Keep going as we want the last one in the profile chain
 				storagePool = v["pool"]
@@ -306,7 +306,7 @@ func createFromMigration(d *Daemon, req *api.ContainersPost) Response {
 			return InternalError(err)
 		}
 
-		_, rootDiskDevice, err := containerGetRootDiskDevice(cM.ExpandedDevices())
+		_, rootDiskDevice, err := shared.GetRootDiskDevice(cM.ExpandedDevices())
 		if err != nil {
 			return InternalError(err)
 		}
diff --git a/lxd/patches.go b/lxd/patches.go
index c6885d4ef..31068eca3 100644
--- a/lxd/patches.go
+++ b/lxd/patches.go
@@ -1725,7 +1725,7 @@ func updatePoolPropertyForAllObjects(d *Daemon, poolName string, allcontainers [
 			}
 
 			// Check for a root disk device entry
-			k, _, _ := containerGetRootDiskDevice(p.Devices)
+			k, _, _ := shared.GetRootDiskDevice(p.Devices)
 			if k != "" {
 				if p.Devices[k]["pool"] != "" {
 					continue
@@ -1820,14 +1820,14 @@ func updatePoolPropertyForAllObjects(d *Daemon, poolName string, allcontainers [
 
 		// Check if the container already has a valid root device entry (profile or previous upgrade)
 		expandedDevices := c.ExpandedDevices()
-		k, d, _ := containerGetRootDiskDevice(expandedDevices)
+		k, d, _ := shared.GetRootDiskDevice(expandedDevices)
 		if k != "" && d["pool"] != "" {
 			continue
 		}
 
 		// Look for a local root device entry
 		localDevices := c.LocalDevices()
-		k, _, _ = containerGetRootDiskDevice(localDevices)
+		k, _, _ = shared.GetRootDiskDevice(localDevices)
 		if k != "" {
 			localDevices[k]["pool"] = poolName
 		} else {
diff --git a/lxd/profiles_utils.go b/lxd/profiles_utils.go
index 7a515114c..f252df1b2 100644
--- a/lxd/profiles_utils.go
+++ b/lxd/profiles_utils.go
@@ -5,6 +5,7 @@ import (
 	"reflect"
 
 	"github.com/lxc/lxd/lxd/db"
+	"github.com/lxc/lxd/shared"
 	"github.com/lxc/lxd/shared/api"
 )
 
@@ -23,14 +24,14 @@ func doProfileUpdate(d *Daemon, name string, id int64, profile *api.Profile, req
 	containers := getContainersWithProfile(d.State(), name)
 
 	// Check if the root device is supposed to be changed or removed.
-	oldProfileRootDiskDeviceKey, oldProfileRootDiskDevice, _ := containerGetRootDiskDevice(profile.Devices)
-	_, newProfileRootDiskDevice, _ := containerGetRootDiskDevice(req.Devices)
+	oldProfileRootDiskDeviceKey, oldProfileRootDiskDevice, _ := shared.GetRootDiskDevice(profile.Devices)
+	_, newProfileRootDiskDevice, _ := shared.GetRootDiskDevice(req.Devices)
 	if len(containers) > 0 && oldProfileRootDiskDevice["pool"] != "" && newProfileRootDiskDevice["pool"] == "" || (oldProfileRootDiskDevice["pool"] != newProfileRootDiskDevice["pool"]) {
 		// Check for containers using the device
 		for _, container := range containers {
 			// Check if the device is locally overridden
 			localDevices := container.LocalDevices()
-			k, v, _ := containerGetRootDiskDevice(localDevices)
+			k, v, _ := shared.GetRootDiskDevice(localDevices)
 			if k != "" && v["pool"] != "" {
 				continue
 			}
diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go
index 1e70aa628..e9006629a 100644
--- a/lxd/storage_btrfs.go
+++ b/lxd/storage_btrfs.go
@@ -2124,7 +2124,7 @@ func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots [
 	// retrieve it from the expanded devices.
 	parentStoragePool := ""
 	parentExpandedDevices := container.ExpandedDevices()
-	parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := containerGetRootDiskDevice(parentExpandedDevices)
+	parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := shared.GetRootDiskDevice(parentExpandedDevices)
 	if parentLocalRootDiskDeviceKey != "" {
 		parentStoragePool = parentLocalRootDiskDevice["pool"]
 	}
@@ -2144,7 +2144,7 @@ func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots [
 			// profile on the new instance as well we don't need to
 			// do anything.
 			if args.Devices != nil {
-				snapLocalRootDiskDeviceKey, _, _ := containerGetRootDiskDevice(args.Devices)
+				snapLocalRootDiskDeviceKey, _, _ := shared.GetRootDiskDevice(args.Devices)
 				if snapLocalRootDiskDeviceKey != "" {
 					args.Devices[snapLocalRootDiskDeviceKey]["pool"] = parentStoragePool
 				}
diff --git a/lxd/storage_ceph_migration.go b/lxd/storage_ceph_migration.go
index 4e4016203..58a53764a 100644
--- a/lxd/storage_ceph_migration.go
+++ b/lxd/storage_ceph_migration.go
@@ -247,7 +247,7 @@ func (s *storageCeph) MigrationSink(live bool, c container,
 	// set.
 	parentStoragePool := ""
 	parentExpandedDevices := c.ExpandedDevices()
-	parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := containerGetRootDiskDevice(parentExpandedDevices)
+	parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := shared.GetRootDiskDevice(parentExpandedDevices)
 	if parentLocalRootDiskDeviceKey != "" {
 		parentStoragePool = parentLocalRootDiskDevice["pool"]
 	}
@@ -313,7 +313,7 @@ func (s *storageCeph) MigrationSink(live bool, c container,
 		// disk device for the snapshot comes from a profile on the new
 		// instance as well we don't need to do anything.
 		if args.Devices != nil {
-			snapLocalRootDiskDeviceKey, _, _ := containerGetRootDiskDevice(args.Devices)
+			snapLocalRootDiskDeviceKey, _, _ := shared.GetRootDiskDevice(args.Devices)
 			if snapLocalRootDiskDeviceKey != "" {
 				args.Devices[snapLocalRootDiskDeviceKey]["pool"] = parentStoragePool
 			}
diff --git a/lxd/storage_migration.go b/lxd/storage_migration.go
index e8a966319..975a8d021 100644
--- a/lxd/storage_migration.go
+++ b/lxd/storage_migration.go
@@ -140,7 +140,7 @@ func rsyncMigrationSink(live bool, container container, snapshots []*migration.S
 	// disk device so we can simply retrieve it from the expanded devices.
 	parentStoragePool := ""
 	parentExpandedDevices := container.ExpandedDevices()
-	parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := containerGetRootDiskDevice(parentExpandedDevices)
+	parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := shared.GetRootDiskDevice(parentExpandedDevices)
 	if parentLocalRootDiskDeviceKey != "" {
 		parentStoragePool = parentLocalRootDiskDevice["pool"]
 	}
@@ -162,7 +162,7 @@ func rsyncMigrationSink(live bool, container container, snapshots []*migration.S
 				// profile on the new instance as well we don't need to
 				// do anything.
 				if args.Devices != nil {
-					snapLocalRootDiskDeviceKey, _, _ := containerGetRootDiskDevice(args.Devices)
+					snapLocalRootDiskDeviceKey, _, _ := shared.GetRootDiskDevice(args.Devices)
 					if snapLocalRootDiskDeviceKey != "" {
 						args.Devices[snapLocalRootDiskDeviceKey]["pool"] = parentStoragePool
 					}
@@ -201,7 +201,7 @@ func rsyncMigrationSink(live bool, container container, snapshots []*migration.S
 				// profile on the new instance as well we don't need to
 				// do anything.
 				if args.Devices != nil {
-					snapLocalRootDiskDeviceKey, _, _ := containerGetRootDiskDevice(args.Devices)
+					snapLocalRootDiskDeviceKey, _, _ := shared.GetRootDiskDevice(args.Devices)
 					if snapLocalRootDiskDeviceKey != "" {
 						args.Devices[snapLocalRootDiskDeviceKey]["pool"] = parentStoragePool
 					}
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index 9a0fd8ed6..e071108bc 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -2295,7 +2295,7 @@ func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*
 	// retrieve it from the expanded devices.
 	parentStoragePool := ""
 	parentExpandedDevices := container.ExpandedDevices()
-	parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := containerGetRootDiskDevice(parentExpandedDevices)
+	parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := shared.GetRootDiskDevice(parentExpandedDevices)
 	if parentLocalRootDiskDeviceKey != "" {
 		parentStoragePool = parentLocalRootDiskDevice["pool"]
 	}
@@ -2314,7 +2314,7 @@ func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*
 		// profile on the new instance as well we don't need to
 		// do anything.
 		if args.Devices != nil {
-			snapLocalRootDiskDeviceKey, _, _ := containerGetRootDiskDevice(args.Devices)
+			snapLocalRootDiskDeviceKey, _, _ := shared.GetRootDiskDevice(args.Devices)
 			if snapLocalRootDiskDeviceKey != "" {
 				args.Devices[snapLocalRootDiskDeviceKey]["pool"] = parentStoragePool
 			}
diff --git a/shared/container.go b/shared/container.go
index 79a3ce4a4..3836b80f3 100644
--- a/shared/container.go
+++ b/shared/container.go
@@ -87,6 +87,40 @@ func IsAny(value string) error {
 	return nil
 }
 
+// IsRootDiskDevice returns true if the given device representation is
+// configured as root disk for a container. It typically get passed a specific
+// entry of api.Container.Devices.
+func IsRootDiskDevice(device map[string]string) bool {
+	if device["type"] == "disk" && device["path"] == "/" && device["source"] == "" {
+		return true
+	}
+
+	return false
+}
+
+// GetRootDiskDevice returns the container device that is configured as root disk
+func GetRootDiskDevice(devices map[string]map[string]string) (string, map[string]string, error) {
+	var devName string
+	var dev map[string]string
+
+	for n, d := range devices {
+		if IsRootDiskDevice(d) {
+			if devName != "" {
+				return "", nil, fmt.Errorf("More than one root device found.")
+			}
+
+			devName = n
+			dev = d
+		}
+	}
+
+	if devName != "" {
+		return devName, dev, nil
+	}
+
+	return "", nil, fmt.Errorf("No root device could be found.")
+}
+
 // KnownContainerConfigKeys maps all fully defined, well-known config keys
 // to an appropriate checker function, which validates whether or not a
 // given value is syntactically legal.

From 4e4416e1f0a1de97d6032c963e6ae0d5ff2c24a9 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 14 Mar 2018 14:42:05 +0000
Subject: [PATCH 2/5] Add ContainerNodeMove db helper for re-linking db entries
 of a ceph container

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/db/containers.go      | 104 +++++++++++++++++++++++++++++++++++++++++++++-
 lxd/db/containers_test.go |  27 ++++++++++++
 2 files changed, 130 insertions(+), 1 deletion(-)

diff --git a/lxd/db/containers.go b/lxd/db/containers.go
index 9e3519ce0..765ae4d26 100644
--- a/lxd/db/containers.go
+++ b/lxd/db/containers.go
@@ -5,9 +5,11 @@ import (
 	"fmt"
 	"time"
 
+	"github.com/lxc/lxd/lxd/db/query"
 	"github.com/lxc/lxd/lxd/types"
 	"github.com/lxc/lxd/shared"
 	"github.com/lxc/lxd/shared/logger"
+	"github.com/pkg/errors"
 
 	log "github.com/lxc/lxd/shared/log15"
 )
@@ -168,6 +170,93 @@ SELECT containers.name, nodes.name
 	return result, nil
 }
 
+// ContainerID returns the ID of the container with the given name.
+func (c *ClusterTx) ContainerID(name string) (int64, error) {
+	stmt := "SELECT id FROM containers WHERE name=?"
+	ids, err := query.SelectIntegers(c.tx, stmt, name)
+	if err != nil {
+		return -1, err
+	}
+	switch len(ids) {
+	case 0:
+		return -1, NoSuchObjectError
+	case 1:
+		return int64(ids[0]), nil
+	default:
+		return -1, fmt.Errorf("more than one container has the given name")
+	}
+}
+
+// ContainerNodeMove changes the node associated with a container.
+//
+// It's meant to be used when moving a non-running container backed by ceph
+// from one cluster node to another.
+func (c *ClusterTx) ContainerNodeMove(oldName, newName, newNode string) error {
+	// First check that the container to be moved is backed by a ceph
+	// volume.
+	poolName, err := c.ContainerPool(oldName)
+	if err != nil {
+		return errors.Wrap(err, "failed to get container's storage pool name")
+	}
+	poolID, err := c.StoragePoolID(poolName)
+	if err != nil {
+		return errors.Wrap(err, "failed to get container's storage pool ID")
+	}
+	poolDriver, err := c.StoragePoolDriver(poolID)
+	if err != nil {
+		return errors.Wrap(err, "failed to get container's storage pool driver")
+	}
+	if poolDriver != "ceph" {
+		return fmt.Errorf("container's storage pool is not of type ceph")
+	}
+
+	// Update the name of the container and the node ID it is associated
+	// with.
+	containerID, err := c.ContainerID(oldName)
+	if err != nil {
+		return errors.Wrap(err, "failed to get container's ID")
+	}
+
+	node, err := c.NodeByName(newNode)
+	if err != nil {
+		return errors.Wrap(err, "failed to get new node's info")
+	}
+
+	stmt := "UPDATE containers SET node_id=?, name=? WHERE id=?"
+	result, err := c.tx.Exec(stmt, node.ID, newName, containerID)
+	if err != nil {
+		return errors.Wrap(err, "failed to update container's name and node ID")
+	}
+	n, err := result.RowsAffected()
+	if err != nil {
+		return errors.Wrap(err, "failed to get rows affected by container update")
+	}
+	if n != 1 {
+		return fmt.Errorf("unexpected number of updated rows in containers table: %d", n)
+	}
+
+	// Update the container's storage volume name (since this is ceph,
+	// there's a clone of the volume for each node).
+	count, err := c.NodesCount()
+	if err != nil {
+		return errors.Wrap(err, "failed to get node's count")
+	}
+	stmt = "UPDATE storage_volumes SET name=? WHERE storage_pool_id=? AND type=?"
+	result, err = c.tx.Exec(stmt, newName, poolID, StoragePoolVolumeTypeContainer)
+	if err != nil {
+		return errors.Wrap(err, "failed to update container's volume name")
+	}
+	n, err = result.RowsAffected()
+	if err != nil {
+		return errors.Wrap(err, "failed to get rows affected by container volume update")
+	}
+	if n != int64(count) {
+		return fmt.Errorf("unexpected number of updated rows in volumes table: %d", n)
+	}
+
+	return nil
+}
+
 func (c *Cluster) ContainerRemove(name string) error {
 	id, err := c.ContainerId(name)
 	if err != nil {
@@ -648,7 +737,20 @@ func (c *Cluster) ContainerNextSnapshot(name string) int {
 }
 
 // Get the storage pool of a given container.
+//
+// This is a non-transactional variant of ClusterTx.ContainerPool().
 func (c *Cluster) ContainerPool(containerName string) (string, error) {
+	var poolName string
+	err := c.Transaction(func(tx *ClusterTx) error {
+		var err error
+		poolName, err = tx.ContainerPool(containerName)
+		return err
+	})
+	return poolName, err
+}
+
+// ContainerPool returns the storage pool of a given container.
+func (c *ClusterTx) ContainerPool(containerName string) (string, error) {
 	// Get container storage volume. Since container names are globally
 	// unique, and their storage volumes carry the same name, their storage
 	// volumes are unique too.
@@ -659,7 +761,7 @@ WHERE storage_volumes.node_id=? AND storage_volumes.name=? AND storage_volumes.t
 	inargs := []interface{}{c.nodeID, containerName, StoragePoolVolumeTypeContainer}
 	outargs := []interface{}{&poolName}
 
-	err := dbQueryRowScan(c.db, query, inargs, outargs)
+	err := c.tx.QueryRow(query, inargs...).Scan(outargs...)
 	if err != nil {
 		if err == sql.ErrNoRows {
 			return "", NoSuchObjectError
diff --git a/lxd/db/containers_test.go b/lxd/db/containers_test.go
index 9f8e035d6..291a58992 100644
--- a/lxd/db/containers_test.go
+++ b/lxd/db/containers_test.go
@@ -5,6 +5,7 @@ import (
 	"time"
 
 	"github.com/lxc/lxd/lxd/db"
+	"github.com/lxc/lxd/lxd/types"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 )
@@ -62,6 +63,32 @@ func TestContainersByNodeName(t *testing.T) {
 		}, result)
 }
 
+func TestContainerPool(t *testing.T) {
+	cluster, cleanup := db.NewTestCluster(t)
+	defer cleanup()
+
+	poolID, err := cluster.StoragePoolCreate("default", "", "dir", nil)
+	require.NoError(t, err)
+	_, err = cluster.StoragePoolVolumeCreate("c1", "", db.StoragePoolVolumeTypeContainer, poolID, nil)
+	require.NoError(t, err)
+
+	args := db.ContainerArgs{
+		Name: "c1",
+		Devices: types.Devices{
+			"root": types.Device{
+				"path": "/",
+				"pool": "default",
+				"type": "disk",
+			},
+		},
+	}
+	_, err = cluster.ContainerCreate(args)
+	require.NoError(t, err)
+	poolName, err := cluster.ContainerPool("c1")
+	require.NoError(t, err)
+	assert.Equal(t, "default", poolName)
+}
+
 func addContainer(t *testing.T, tx *db.ClusterTx, nodeID int64, name string) {
 	stmt := `
 INSERT INTO containers(node_id, name, architecture, type) VALUES (?, ?, 1, ?)

From 517314ee27cbcef9005d55de384f05d451e4d7a9 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 14 Mar 2018 14:56:57 +0000
Subject: [PATCH 3/5] Add support for moving ceph-based containers in a cluster

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 client/lxd_containers.go |   7 +-
 lxc/move.go              | 124 +++++++++++++++++++++++-
 lxd/api_internal.go      |   1 +
 lxd/container_post.go    | 247 +++++++++++++++++++++++++++++++++++++++++++++--
 4 files changed, 364 insertions(+), 15 deletions(-)

diff --git a/client/lxd_containers.go b/client/lxd_containers.go
index c84567f6e..233f6c807 100644
--- a/client/lxd_containers.go
+++ b/client/lxd_containers.go
@@ -555,7 +555,12 @@ func (r *ProtocolLXD) MigrateContainer(name string, container api.ContainerPost)
 	}
 
 	// Send the request
-	op, _, err := r.queryOperation("POST", fmt.Sprintf("/containers/%s", url.QueryEscape(name)), container, "")
+	path := fmt.Sprintf("/containers/%s", url.QueryEscape(name))
+	if r.clusterTarget != "" {
+		path += fmt.Sprintf("?target=%s", r.clusterTarget)
+	}
+
+	op, _, err := r.queryOperation("POST", path, container, "")
 	if err != nil {
 		return nil, err
 	}
diff --git a/lxc/move.go b/lxc/move.go
index ab568d795..d929427bc 100644
--- a/lxc/move.go
+++ b/lxc/move.go
@@ -9,6 +9,7 @@ import (
 	"github.com/lxc/lxd/shared/api"
 	"github.com/lxc/lxd/shared/gnuflag"
 	"github.com/lxc/lxd/shared/i18n"
+	"github.com/pkg/errors"
 )
 
 type moveCmd struct {
@@ -46,7 +47,7 @@ func (c *moveCmd) flags() {
 }
 
 func (c *moveCmd) run(conf *config.Config, args []string) error {
-	if len(args) != 2 {
+	if len(args) != 2 && c.target == "" {
 		return errArgs
 	}
 
@@ -61,9 +62,14 @@ func (c *moveCmd) run(conf *config.Config, args []string) error {
 		return err
 	}
 
-	destRemote, destName, err := conf.ParseRemote(args[1])
-	if err != nil {
-		return err
+	destRemote := sourceRemote
+	destName := ""
+	if len(args) == 2 {
+		var err error
+		destRemote, destName, err = conf.ParseRemote(args[1])
+		if err != nil {
+			return err
+		}
 	}
 
 	// Target node and destination remote can't be used together.
@@ -104,6 +110,26 @@ func (c *moveCmd) run(conf *config.Config, args []string) error {
 		return op.Wait()
 	}
 
+	sourceResource := args[0]
+	destResource := sourceResource
+	if len(args) == 2 {
+		destResource = args[1]
+	}
+
+	// If the target option was specified, we're moving a container from a
+	// cluster node to another. In case the rootfs of the container is
+	// backed by ceph, we want to re-use the same ceph volume. This assumes
+	// that the container is not running.
+	if c.target != "" {
+		moved, err := maybeMoveCephContaner(conf, sourceResource, destResource, c.target)
+		if err != nil {
+			return err
+		}
+		if moved {
+			return nil
+		}
+	}
+
 	cpy := copyCmd{}
 	cpy.target = c.target
 
@@ -111,7 +137,7 @@ func (c *moveCmd) run(conf *config.Config, args []string) error {
 
 	// A move is just a copy followed by a delete; however, we want to
 	// keep the volatile entries around since we are moving the container.
-	err = cpy.copyContainer(conf, args[0], args[1], true, -1, stateful, c.containerOnly, mode)
+	err = cpy.copyContainer(conf, sourceResource, destResource, true, -1, stateful, c.containerOnly, mode)
 	if err != nil {
 		return err
 	}
@@ -120,3 +146,91 @@ func (c *moveCmd) run(conf *config.Config, args []string) error {
 	del.force = true
 	return del.run(conf, args[:1])
 }
+
+// Helper to check if the container to be moved is backed by a ceph storage
+// pool, and use the special POST /containers/<name>?target=<node> API if so.
+//
+// It returns false if the container is not backed by ceph, true otherwise.
+func maybeMoveCephContaner(conf *config.Config, sourceResource, destResource, target string) (bool, error) {
+	// Parse the source.
+	sourceRemote, sourceName, err := conf.ParseRemote(sourceResource)
+	if err != nil {
+		return false, err
+	}
+
+	// Parse the destination.
+	destRemote, destName, err := conf.ParseRemote(destResource)
+	if err != nil {
+		return false, err
+	}
+
+	if sourceRemote != destRemote {
+		return false, fmt.Errorf(
+			i18n.G("You must use the same source and destination remote when using --target"))
+	}
+
+	// Make sure we have a container or snapshot name.
+	if sourceName == "" {
+		return false, fmt.Errorf(i18n.G("You must specify a source container name"))
+	}
+
+	// The destination name is optional.
+	if destName == "" {
+		destName = sourceName
+	}
+
+	// Connect to the source host
+	source, err := conf.GetContainerServer(sourceRemote)
+	if err != nil {
+		return false, err
+	}
+
+	if shared.IsSnapshot(sourceName) {
+		// TODO: implement moving snapshots.
+		return false, fmt.Errorf("Moving ceph snapshots between cluster nodes is not yet implemented")
+	}
+
+	// Check if the container to be moved is backed by ceph.
+	container, _, err := source.GetContainer(sourceName)
+	if err != nil {
+		// If we are unable to connect, we assume that the source node
+		// is offline, and we'll try to perform the migration. If the
+		// container turns out to not be backed by ceph, the migrate
+		// API will still return an error.
+		if !strings.Contains(err.Error(), "Unable to connect") {
+			return false, errors.Wrapf(err, "Failed to get container %s", sourceName)
+		}
+	}
+	if container != nil {
+		_, device, err := shared.GetRootDiskDevice(container.Devices)
+		if err != nil {
+			return false, errors.Wrapf(err, "Failed parse root disk device")
+		}
+
+		poolName, ok := device["pool"]
+		if !ok {
+			return false, nil
+		}
+
+		pool, _, err := source.GetStoragePool(poolName)
+		if err != nil {
+			return false, errors.Wrapf(err, "Failed get root disk device pool %s", poolName)
+		}
+		if pool.Driver != "ceph" {
+			return false, nil
+		}
+	}
+
+	// The migrate API will do the right thing when passed a target.
+	source = source.UseTarget(target)
+	req := api.ContainerPost{Name: destName, Migration: true}
+	op, err := source.MigrateContainer(sourceName, req)
+	if err != nil {
+		return false, err
+	}
+	err = op.Wait()
+	if err != nil {
+		return false, err
+	}
+	return true, nil
+}
diff --git a/lxd/api_internal.go b/lxd/api_internal.go
index 8627bf853..2db81f756 100644
--- a/lxd/api_internal.go
+++ b/lxd/api_internal.go
@@ -30,6 +30,7 @@ var apiInternal = []Command{
 	internalContainersCmd,
 	internalSQLCmd,
 	internalClusterAcceptCmd,
+	internalClusterContainerMovedCmd,
 }
 
 func internalReady(d *Daemon, r *http.Request) Response {
diff --git a/lxd/container_post.go b/lxd/container_post.go
index f77fbd04d..bca51c245 100644
--- a/lxd/container_post.go
+++ b/lxd/container_post.go
@@ -3,30 +3,103 @@ package main
 import (
 	"bytes"
 	"encoding/json"
+	"fmt"
 	"io/ioutil"
 	"net/http"
 
 	"github.com/gorilla/mux"
+	"github.com/pkg/errors"
 
+	"github.com/lxc/lxd/lxd/cluster"
+	"github.com/lxc/lxd/lxd/db"
 	"github.com/lxc/lxd/shared"
 	"github.com/lxc/lxd/shared/api"
+	"github.com/lxc/lxd/shared/logger"
 )
 
 func containerPost(d *Daemon, r *http.Request) Response {
 	name := mux.Vars(r)["name"]
+	targetNode := r.FormValue("target")
 
-	// Handle requests targeted to a container on a different node
-	response, err := ForwardedResponseIfContainerIsRemote(d, r, name)
-	if err != nil {
-		return SmartError(err)
+	sourceNodeOffline := false
+	targetNodeOffline := false
+
+	// A POST to /containers/<name>?target=<node> is meant to be
+	// used to move a container backed by a ceph storage pool.
+	if targetNode != "" {
+		// Determine if either the source node (the one currently
+		// running the container) or the target node are offline.
+		//
+		// If the target node is offline, we return an error.
+		//
+		// If the source node is offline, we'll assume that the
+		// container is not running and it's safe to move it.
+		//
+		// TODO: add some sort of "force" flag to the API, to signal
+		//       that the user really wants to move the container even
+		//       if we can't know for sure that it's indeed not
+		//       running?
+		err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
+			// Load cluster configuration.
+			config, err := cluster.ConfigLoad(tx)
+			if err != nil {
+				return errors.Wrap(err, "Failed to load LXD config")
+			}
+
+			// Load target node.
+			node, err := tx.NodeByName(targetNode)
+			if err != nil {
+				return errors.Wrap(err, "Failed to get target node")
+			}
+			targetNodeOffline = node.IsOffline(config.OfflineThreshold())
+
+			// Load source node.
+			address, err := tx.ContainerNodeAddress(name)
+			if err != nil {
+				return errors.Wrap(err, "Failed to get address of container's node")
+			}
+			if address == "" {
+				// Local node
+				sourceNodeOffline = false
+				return nil
+			}
+			node, err = tx.NodeByAddress(address)
+			if err != nil {
+				return errors.Wrapf(err, "Failed to get source node for %s", address)
+			}
+			sourceNodeOffline = node.IsOffline(config.OfflineThreshold())
+
+			return nil
+		})
+		if err != nil {
+			return SmartError(err)
+		}
 	}
-	if response != nil {
-		return response
+
+	if targetNode != "" && targetNodeOffline {
+		return BadRequest(fmt.Errorf("Target node is offline"))
 	}
 
-	c, err := containerLoadByName(d.State(), name)
-	if err != nil {
-		return SmartError(err)
+	var c container
+
+	// For in-cluster migrations, only forward the request to the source
+	// node and load the container if the source node is online. We'll not
+	// check whether the container is running or try to unmap the RBD
+	// volume on it if the source node is offline.
+	if targetNode == "" || !sourceNodeOffline {
+		// Handle requests targeted to a container on a different node
+		response, err := ForwardedResponseIfContainerIsRemote(d, r, name)
+		if err != nil {
+			return SmartError(err)
+		}
+		if response != nil {
+			return response
+		}
+
+		c, err = containerLoadByName(d.State(), name)
+		if err != nil {
+			return SmartError(err)
+		}
 	}
 
 	body, err := ioutil.ReadAll(r.Body)
@@ -57,6 +130,12 @@ func containerPost(d *Daemon, r *http.Request) Response {
 	}
 
 	if req.Migration {
+		if targetNode != "" {
+			// A POST to /containers/<name>?target=<node> is meant to be
+			// used to move a container backed by a ceph storage pool.
+			return containerPostClusteringMigrateWithCeph(d, c, name, req.Name, targetNode)
+		}
+
 		ws, err := NewMigrationSource(c, stateful, req.ContainerOnly)
 		if err != nil {
 			return InternalError(err)
@@ -109,3 +188,153 @@ func containerPost(d *Daemon, r *http.Request) Response {
 
 	return OperationResponse(op)
 }
+
+// Special case migrating a container backed by ceph across two cluster nodes.
+func containerPostClusteringMigrateWithCeph(d *Daemon, c container, oldName, newName, newNode string) Response {
+	if c != nil && c.IsRunning() {
+		return BadRequest(fmt.Errorf("Container is running"))
+	}
+
+	run := func(*operation) error {
+		// If source node is online (i.e. we're serving the request on
+		// it, and c != nil), let's unmap the RBD volume locally
+		if c != nil {
+			logger.Debugf(`Renaming RBD storage volume for source container "%s" from `+
+				`"%s" to "%s"`, c.Name(), c.Name(), newName)
+			poolName, err := c.StoragePool()
+			if err != nil {
+				return errors.Wrap(err, "Failed to get source container's storage pool name")
+			}
+			_, pool, err := d.cluster.StoragePoolGet(poolName)
+			if err != nil {
+				return errors.Wrap(err, "Failed to get source container's storage pool")
+			}
+			if pool.Driver != "ceph" {
+				return fmt.Errorf("Source container's storage pool is not of type ceph")
+			}
+			si, err := storagePoolVolumeContainerLoadInit(d.State(), c.Name())
+			if err != nil {
+				return errors.Wrap(err, "Failed to initialize source container's storage pool")
+			}
+			s, ok := si.(*storageCeph)
+			if !ok {
+				return fmt.Errorf("Unexpected source container storage backend")
+			}
+			err = cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName, c.Name(),
+				storagePoolVolumeTypeNameContainer, s.UserName, true)
+			if err != nil {
+				return errors.Wrap(err, "Failed to unmap source container's RBD volume")
+			}
+
+		}
+
+		// Re-link the database entries against the new node name.
+		var poolName string
+		err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
+			err := tx.ContainerNodeMove(oldName, newName, newNode)
+			if err != nil {
+				return err
+			}
+			poolName, err = tx.ContainerPool(newName)
+			if err != nil {
+				return err
+			}
+			return nil
+		})
+		if err != nil {
+			return errors.Wrap(err, "Failed to relink container database data")
+		}
+
+		// Rename the RBD volume if necessary.
+		if newName != oldName {
+			s := storageCeph{}
+			_, s.pool, err = d.cluster.StoragePoolGet(poolName)
+			if err != nil {
+				return errors.Wrap(err, "Failed to get storage pool")
+			}
+			if err != nil {
+				return errors.Wrap(err, "Failed to get storage pool")
+			}
+			err = s.StoragePoolInit()
+			if err != nil {
+				return errors.Wrap(err, "Failed to initialize ceph storage pool")
+			}
+			err = cephRBDVolumeRename(s.ClusterName, s.OSDPoolName,
+				storagePoolVolumeTypeNameContainer, oldName, newName, s.UserName)
+			if err != nil {
+				return errors.Wrap(err, "Failed to rename ceph RBD volume")
+			}
+		}
+
+		// Create the container mount point on the target node
+		cert := d.endpoints.NetworkCert()
+		client, err := cluster.ConnectIfContainerIsRemote(d.cluster, newName, cert)
+		if err != nil {
+			return errors.Wrap(err, "Failed to connect to target node")
+		}
+		if client == nil {
+			err := containerPostCreateContainerMountPoint(d, newName)
+			if err != nil {
+				return errors.Wrap(err, "Failed to create mount point on target node")
+			}
+		} else {
+			path := fmt.Sprintf("/internal/cluster/container-moved/%s", newName)
+			resp, _, err := client.RawQuery("POST", path, nil, "")
+			if err != nil {
+				return errors.Wrap(err, "Failed to create mount point on target node")
+			}
+			if resp.StatusCode != 200 {
+				return fmt.Errorf("Failed to create mount point on target node: %s", resp.Error)
+			}
+		}
+
+		return nil
+	}
+
+	resources := map[string][]string{}
+	resources["containers"] = []string{oldName}
+	op, err := operationCreate(d.cluster, operationClassTask, "Moving container", resources, nil, run, nil, nil)
+	if err != nil {
+		return InternalError(err)
+	}
+
+	return OperationResponse(op)
+}
+
+var internalClusterContainerMovedCmd = Command{
+	name: "cluster/container-moved/{name}",
+	post: internalClusterContainerMovedPost,
+}
+
+// Notification that a container was moved.
+//
+// At the moment it used for ceph-based containers, where the target node needs
+// to create the appropriate mount points.
+func internalClusterContainerMovedPost(d *Daemon, r *http.Request) Response {
+	containerName := mux.Vars(r)["name"]
+	err := containerPostCreateContainerMountPoint(d, containerName)
+	if err != nil {
+		return SmartError(err)
+	}
+	return EmptySyncResponse
+}
+
+// Used after to create the appropriate mount point after a container has been
+// moved.
+func containerPostCreateContainerMountPoint(d *Daemon, containerName string) error {
+	c, err := containerLoadByName(d.State(), containerName)
+	if err != nil {
+		return errors.Wrap(err, "Failed to load moved container on target node")
+	}
+	poolName, err := c.StoragePool()
+	if err != nil {
+		return errors.Wrap(err, "Failed get pool name of moved container on target node")
+	}
+	// This is the target node itself.
+	containerMntPoint := getContainerMountPoint(poolName, containerName)
+	err = createContainerMountpoint(containerMntPoint, c.Path(), c.IsPrivileged())
+	if err != nil {
+		return errors.Wrap(err, "Failed to create container mount point on target node")
+	}
+	return nil
+}

From d182925838ed6be1cad381de8f34a834afec154f Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 15 Mar 2018 12:58:56 +0000
Subject: [PATCH 4/5] Fix broken copy of storage volume configs when joining a
 cluster

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/db/storage_pools.go | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/lxd/db/storage_pools.go b/lxd/db/storage_pools.go
index c838879de..8b2da0904 100644
--- a/lxd/db/storage_pools.go
+++ b/lxd/db/storage_pools.go
@@ -140,13 +140,12 @@ INSERT INTO storage_volumes(name, storage_pool_id, node_id, type, description)
 		return errors.Wrap(err, "failed to create node ceph volumes")
 	}
 
+	// Create entries of all the ceph volumes configs for the new node.
 	stmt = `
 SELECT id FROM storage_volumes WHERE storage_pool_id=? AND node_id=?
   ORDER BY name, type
 `
-
-	// Create entries of all the ceph volumes configs for the new node.
-	volumeIDs, err := query.SelectIntegers(c.tx, stmt, poolID, otherNodeID)
+	volumeIDs, err := query.SelectIntegers(c.tx, stmt, poolID, nodeID)
 	if err != nil {
 		return errors.Wrap(err, "failed to get joining node's ceph volume IDs")
 	}

From 879ddd0281e887df97c4dbe59df294ae7b9e5744 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 15 Mar 2018 13:07:11 +0000
Subject: [PATCH 5/5] Add integration tests

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 test/suites/clustering.sh | 48 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 48 insertions(+)

diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index 171018bed..79b5d6857 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -367,6 +367,54 @@ test_clustering_storage() {
     ! LXD_DIR="${LXD_ONE_DIR}" lxc storage show pool1 | grep -q rsync.bwlimit
   fi
 
+  # Test migration of ceph-based containers
+  if [ "${driver}" = "ceph" ]; then
+    LXD_DIR="${LXD_TWO_DIR}" ensure_import_testimage
+    LXD_DIR="${LXD_ONE_DIR}" lxc launch --target node2 -s pool1 testimage foo
+
+    # The container can't be moved if it's running
+    ! LXD_DIR="${LXD_TWO_DIR}" lxc move foo --target node1 || false
+
+    # Move the container to node1
+    LXD_DIR="${LXD_ONE_DIR}" lxc stop foo
+    LXD_DIR="${LXD_TWO_DIR}" lxc move foo --target node1
+    LXD_DIR="${LXD_TWO_DIR}" lxc info foo | grep -q "Location: node1"
+
+    # Start and stop the container on its new node1 host
+    LXD_DIR="${LXD_TWO_DIR}" lxc start foo
+    LXD_DIR="${LXD_TWO_DIR}" lxc stop foo
+
+    # Spawn a third node
+    setup_clustering_netns 3
+    LXD_THREE_DIR=$(mktemp -d -p "${TEST_DIR}" XXX)
+    chmod +x "${LXD_THREE_DIR}"
+    ns3="${prefix}3"
+    spawn_lxd_and_join_cluster "${ns3}" "${bridge}" "${cert}" 3 1 "${LXD_THREE_DIR}" "${driver}"
+
+    # Move the container to node3, renaming it
+    LXD_DIR="${LXD_TWO_DIR}" lxc move foo bar --target node3
+    LXD_DIR="${LXD_TWO_DIR}" lxc info bar | grep -q "Location: node3"
+
+    # Shutdown node 3, and wait for it to be considered offline.
+    LXD_DIR="${LXD_THREE_DIR}" lxc config set cluster.offline_threshold 5
+    LXD_DIR="${LXD_THREE_DIR}" lxd shutdown
+    sleep 10
+
+    # Move the container back to node2, even if node3 is offline
+    LXD_DIR="${LXD_ONE_DIR}" lxc move bar --target node2
+    LXD_DIR="${LXD_ONE_DIR}" lxc info bar | grep -q "Location: node2"
+
+    # Start and stop the container on its new node2 host
+    LXD_DIR="${LXD_TWO_DIR}" lxc start bar
+    LXD_DIR="${LXD_ONE_DIR}" lxc stop bar
+
+    LXD_DIR="${LXD_ONE_DIR}" lxc config set cluster.offline_threshold 20
+    LXD_DIR="${LXD_ONE_DIR}" lxc cluster delete node3 --force
+
+    LXD_DIR="${LXD_ONE_DIR}" lxc delete bar
+    LXD_DIR="${LXD_ONE_DIR}" lxc image delete testimage
+  fi
+
   # Delete the storage pool
   LXD_DIR="${LXD_ONE_DIR}" lxc storage delete pool1
   ! LXD_DIR="${LXD_ONE_DIR}" lxc storage list | grep -q pool1


More information about the lxc-devel mailing list