[lxc-devel] [lxd/master] Clustering move ceph based containers
freeekanayaka on Github
lxc-bot at linuxcontainers.org
Thu Mar 15 13:26:11 UTC 2018
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 777 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180315/5208f308/attachment.bin>
-------------- next part --------------
From 510992c6c67c5c4475f4d08dd84329578569cf4d Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 14 Mar 2018 11:28:26 +0000
Subject: [PATCH 1/5] Move isRootDiskDevice and containerGetRootDiskDevice to
lxd/shared
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/container.go | 34 ++--------------------------------
lxd/container_lxc.go | 12 ++++++------
lxd/containers_post.go | 6 +++---
lxd/patches.go | 6 +++---
lxd/profiles_utils.go | 7 ++++---
lxd/storage_btrfs.go | 4 ++--
lxd/storage_ceph_migration.go | 4 ++--
lxd/storage_migration.go | 6 +++---
lxd/storage_zfs.go | 4 ++--
shared/container.go | 34 ++++++++++++++++++++++++++++++++++
10 files changed, 61 insertions(+), 56 deletions(-)
diff --git a/lxd/container.go b/lxd/container.go
index c03a9dc03..d1890e1a2 100644
--- a/lxd/container.go
+++ b/lxd/container.go
@@ -281,36 +281,6 @@ func containerValidConfig(os *sys.OS, config map[string]string, profile bool, ex
return nil
}
-func isRootDiskDevice(device types.Device) bool {
- if device["type"] == "disk" && device["path"] == "/" && device["source"] == "" {
- return true
- }
-
- return false
-}
-
-func containerGetRootDiskDevice(devices types.Devices) (string, types.Device, error) {
- var devName string
- var dev types.Device
-
- for n, d := range devices {
- if isRootDiskDevice(d) {
- if devName != "" {
- return "", types.Device{}, fmt.Errorf("More than one root device found.")
- }
-
- devName = n
- dev = d
- }
- }
-
- if devName != "" {
- return devName, dev, nil
- }
-
- return "", types.Device{}, fmt.Errorf("No root device could be found.")
-}
-
func containerValidDevices(db *db.Cluster, devices types.Devices, profile bool, expanded bool) error {
// Empty device list
if devices == nil {
@@ -447,7 +417,7 @@ func containerValidDevices(db *db.Cluster, devices types.Devices, profile bool,
// Checks on the expanded config
if expanded {
- _, _, err := containerGetRootDiskDevice(devices)
+ _, _, err := shared.GetRootDiskDevice(devices)
if err != nil {
return err
}
@@ -931,7 +901,7 @@ func containerCreateInternal(s *state.State, args db.ContainerArgs) (container,
func containerConfigureInternal(c container) error {
// Find the root device
- _, rootDiskDevice, err := containerGetRootDiskDevice(c.ExpandedDevices())
+ _, rootDiskDevice, err := shared.GetRootDiskDevice(c.ExpandedDevices())
if err != nil {
return err
}
diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index 66cb2aa1a..b73b0c9e5 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -315,7 +315,7 @@ func containerLXCCreate(s *state.State, args db.ContainerArgs) (container, error
}
// Retrieve the container's storage pool
- _, rootDiskDevice, err := containerGetRootDiskDevice(c.expandedDevices)
+ _, rootDiskDevice, err := shared.GetRootDiskDevice(c.expandedDevices)
if err != nil {
c.Delete()
return nil, err
@@ -1500,7 +1500,7 @@ func (c *containerLXC) initLXC(config bool) error {
// bump network index
networkidx++
} else if m["type"] == "disk" {
- isRootfs := isRootDiskDevice(m)
+ isRootfs := shared.IsRootDiskDevice(m)
// source paths
srcPath := shared.HostPath(m["source"])
@@ -3719,19 +3719,19 @@ func (c *containerLXC) Update(args db.ContainerArgs, userRequested bool) error {
}
// Retrieve old root disk devices.
- oldLocalRootDiskDeviceKey, oldLocalRootDiskDevice, _ := containerGetRootDiskDevice(oldLocalDevices)
+ oldLocalRootDiskDeviceKey, oldLocalRootDiskDevice, _ := shared.GetRootDiskDevice(oldLocalDevices)
var oldProfileRootDiskDevices []string
for k, v := range oldExpandedDevices {
- if isRootDiskDevice(v) && k != oldLocalRootDiskDeviceKey && !shared.StringInSlice(k, oldProfileRootDiskDevices) {
+ if shared.IsRootDiskDevice(v) && k != oldLocalRootDiskDeviceKey && !shared.StringInSlice(k, oldProfileRootDiskDevices) {
oldProfileRootDiskDevices = append(oldProfileRootDiskDevices, k)
}
}
// Retrieve new root disk devices.
- newLocalRootDiskDeviceKey, newLocalRootDiskDevice, _ := containerGetRootDiskDevice(c.localDevices)
+ newLocalRootDiskDeviceKey, newLocalRootDiskDevice, _ := shared.GetRootDiskDevice(c.localDevices)
var newProfileRootDiskDevices []string
for k, v := range c.expandedDevices {
- if isRootDiskDevice(v) && k != newLocalRootDiskDeviceKey && !shared.StringInSlice(k, newProfileRootDiskDevices) {
+ if shared.IsRootDiskDevice(v) && k != newLocalRootDiskDeviceKey && !shared.StringInSlice(k, newProfileRootDiskDevices) {
newProfileRootDiskDevices = append(newProfileRootDiskDevices, k)
}
}
diff --git a/lxd/containers_post.go b/lxd/containers_post.go
index c31aba8de..ec5e664a4 100644
--- a/lxd/containers_post.go
+++ b/lxd/containers_post.go
@@ -200,7 +200,7 @@ func createFromMigration(d *Daemon, req *api.ContainersPost) Response {
storagePool := ""
storagePoolProfile := ""
- localRootDiskDeviceKey, localRootDiskDevice, _ := containerGetRootDiskDevice(req.Devices)
+ localRootDiskDeviceKey, localRootDiskDevice, _ := shared.GetRootDiskDevice(req.Devices)
if localRootDiskDeviceKey != "" {
storagePool = localRootDiskDevice["pool"]
}
@@ -224,7 +224,7 @@ func createFromMigration(d *Daemon, req *api.ContainersPost) Response {
return SmartError(err)
}
- k, v, _ := containerGetRootDiskDevice(p.Devices)
+ k, v, _ := shared.GetRootDiskDevice(p.Devices)
if k != "" && v["pool"] != "" {
// Keep going as we want the last one in the profile chain
storagePool = v["pool"]
@@ -306,7 +306,7 @@ func createFromMigration(d *Daemon, req *api.ContainersPost) Response {
return InternalError(err)
}
- _, rootDiskDevice, err := containerGetRootDiskDevice(cM.ExpandedDevices())
+ _, rootDiskDevice, err := shared.GetRootDiskDevice(cM.ExpandedDevices())
if err != nil {
return InternalError(err)
}
diff --git a/lxd/patches.go b/lxd/patches.go
index c6885d4ef..31068eca3 100644
--- a/lxd/patches.go
+++ b/lxd/patches.go
@@ -1725,7 +1725,7 @@ func updatePoolPropertyForAllObjects(d *Daemon, poolName string, allcontainers [
}
// Check for a root disk device entry
- k, _, _ := containerGetRootDiskDevice(p.Devices)
+ k, _, _ := shared.GetRootDiskDevice(p.Devices)
if k != "" {
if p.Devices[k]["pool"] != "" {
continue
@@ -1820,14 +1820,14 @@ func updatePoolPropertyForAllObjects(d *Daemon, poolName string, allcontainers [
// Check if the container already has a valid root device entry (profile or previous upgrade)
expandedDevices := c.ExpandedDevices()
- k, d, _ := containerGetRootDiskDevice(expandedDevices)
+ k, d, _ := shared.GetRootDiskDevice(expandedDevices)
if k != "" && d["pool"] != "" {
continue
}
// Look for a local root device entry
localDevices := c.LocalDevices()
- k, _, _ = containerGetRootDiskDevice(localDevices)
+ k, _, _ = shared.GetRootDiskDevice(localDevices)
if k != "" {
localDevices[k]["pool"] = poolName
} else {
diff --git a/lxd/profiles_utils.go b/lxd/profiles_utils.go
index 7a515114c..f252df1b2 100644
--- a/lxd/profiles_utils.go
+++ b/lxd/profiles_utils.go
@@ -5,6 +5,7 @@ import (
"reflect"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
)
@@ -23,14 +24,14 @@ func doProfileUpdate(d *Daemon, name string, id int64, profile *api.Profile, req
containers := getContainersWithProfile(d.State(), name)
// Check if the root device is supposed to be changed or removed.
- oldProfileRootDiskDeviceKey, oldProfileRootDiskDevice, _ := containerGetRootDiskDevice(profile.Devices)
- _, newProfileRootDiskDevice, _ := containerGetRootDiskDevice(req.Devices)
+ oldProfileRootDiskDeviceKey, oldProfileRootDiskDevice, _ := shared.GetRootDiskDevice(profile.Devices)
+ _, newProfileRootDiskDevice, _ := shared.GetRootDiskDevice(req.Devices)
if len(containers) > 0 && oldProfileRootDiskDevice["pool"] != "" && newProfileRootDiskDevice["pool"] == "" || (oldProfileRootDiskDevice["pool"] != newProfileRootDiskDevice["pool"]) {
// Check for containers using the device
for _, container := range containers {
// Check if the device is locally overridden
localDevices := container.LocalDevices()
- k, v, _ := containerGetRootDiskDevice(localDevices)
+ k, v, _ := shared.GetRootDiskDevice(localDevices)
if k != "" && v["pool"] != "" {
continue
}
diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go
index 1e70aa628..e9006629a 100644
--- a/lxd/storage_btrfs.go
+++ b/lxd/storage_btrfs.go
@@ -2124,7 +2124,7 @@ func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots [
// retrieve it from the expanded devices.
parentStoragePool := ""
parentExpandedDevices := container.ExpandedDevices()
- parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := containerGetRootDiskDevice(parentExpandedDevices)
+ parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := shared.GetRootDiskDevice(parentExpandedDevices)
if parentLocalRootDiskDeviceKey != "" {
parentStoragePool = parentLocalRootDiskDevice["pool"]
}
@@ -2144,7 +2144,7 @@ func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots [
// profile on the new instance as well we don't need to
// do anything.
if args.Devices != nil {
- snapLocalRootDiskDeviceKey, _, _ := containerGetRootDiskDevice(args.Devices)
+ snapLocalRootDiskDeviceKey, _, _ := shared.GetRootDiskDevice(args.Devices)
if snapLocalRootDiskDeviceKey != "" {
args.Devices[snapLocalRootDiskDeviceKey]["pool"] = parentStoragePool
}
diff --git a/lxd/storage_ceph_migration.go b/lxd/storage_ceph_migration.go
index 4e4016203..58a53764a 100644
--- a/lxd/storage_ceph_migration.go
+++ b/lxd/storage_ceph_migration.go
@@ -247,7 +247,7 @@ func (s *storageCeph) MigrationSink(live bool, c container,
// set.
parentStoragePool := ""
parentExpandedDevices := c.ExpandedDevices()
- parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := containerGetRootDiskDevice(parentExpandedDevices)
+ parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := shared.GetRootDiskDevice(parentExpandedDevices)
if parentLocalRootDiskDeviceKey != "" {
parentStoragePool = parentLocalRootDiskDevice["pool"]
}
@@ -313,7 +313,7 @@ func (s *storageCeph) MigrationSink(live bool, c container,
// disk device for the snapshot comes from a profile on the new
// instance as well we don't need to do anything.
if args.Devices != nil {
- snapLocalRootDiskDeviceKey, _, _ := containerGetRootDiskDevice(args.Devices)
+ snapLocalRootDiskDeviceKey, _, _ := shared.GetRootDiskDevice(args.Devices)
if snapLocalRootDiskDeviceKey != "" {
args.Devices[snapLocalRootDiskDeviceKey]["pool"] = parentStoragePool
}
diff --git a/lxd/storage_migration.go b/lxd/storage_migration.go
index e8a966319..975a8d021 100644
--- a/lxd/storage_migration.go
+++ b/lxd/storage_migration.go
@@ -140,7 +140,7 @@ func rsyncMigrationSink(live bool, container container, snapshots []*migration.S
// disk device so we can simply retrieve it from the expanded devices.
parentStoragePool := ""
parentExpandedDevices := container.ExpandedDevices()
- parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := containerGetRootDiskDevice(parentExpandedDevices)
+ parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := shared.GetRootDiskDevice(parentExpandedDevices)
if parentLocalRootDiskDeviceKey != "" {
parentStoragePool = parentLocalRootDiskDevice["pool"]
}
@@ -162,7 +162,7 @@ func rsyncMigrationSink(live bool, container container, snapshots []*migration.S
// profile on the new instance as well we don't need to
// do anything.
if args.Devices != nil {
- snapLocalRootDiskDeviceKey, _, _ := containerGetRootDiskDevice(args.Devices)
+ snapLocalRootDiskDeviceKey, _, _ := shared.GetRootDiskDevice(args.Devices)
if snapLocalRootDiskDeviceKey != "" {
args.Devices[snapLocalRootDiskDeviceKey]["pool"] = parentStoragePool
}
@@ -201,7 +201,7 @@ func rsyncMigrationSink(live bool, container container, snapshots []*migration.S
// profile on the new instance as well we don't need to
// do anything.
if args.Devices != nil {
- snapLocalRootDiskDeviceKey, _, _ := containerGetRootDiskDevice(args.Devices)
+ snapLocalRootDiskDeviceKey, _, _ := shared.GetRootDiskDevice(args.Devices)
if snapLocalRootDiskDeviceKey != "" {
args.Devices[snapLocalRootDiskDeviceKey]["pool"] = parentStoragePool
}
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index 9a0fd8ed6..e071108bc 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -2295,7 +2295,7 @@ func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*
// retrieve it from the expanded devices.
parentStoragePool := ""
parentExpandedDevices := container.ExpandedDevices()
- parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := containerGetRootDiskDevice(parentExpandedDevices)
+ parentLocalRootDiskDeviceKey, parentLocalRootDiskDevice, _ := shared.GetRootDiskDevice(parentExpandedDevices)
if parentLocalRootDiskDeviceKey != "" {
parentStoragePool = parentLocalRootDiskDevice["pool"]
}
@@ -2314,7 +2314,7 @@ func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*
// profile on the new instance as well we don't need to
// do anything.
if args.Devices != nil {
- snapLocalRootDiskDeviceKey, _, _ := containerGetRootDiskDevice(args.Devices)
+ snapLocalRootDiskDeviceKey, _, _ := shared.GetRootDiskDevice(args.Devices)
if snapLocalRootDiskDeviceKey != "" {
args.Devices[snapLocalRootDiskDeviceKey]["pool"] = parentStoragePool
}
diff --git a/shared/container.go b/shared/container.go
index 79a3ce4a4..3836b80f3 100644
--- a/shared/container.go
+++ b/shared/container.go
@@ -87,6 +87,40 @@ func IsAny(value string) error {
return nil
}
+// IsRootDiskDevice returns true if the given device representation is
+// configured as root disk for a container. It typically get passed a specific
+// entry of api.Container.Devices.
+func IsRootDiskDevice(device map[string]string) bool {
+ if device["type"] == "disk" && device["path"] == "/" && device["source"] == "" {
+ return true
+ }
+
+ return false
+}
+
+// GetRootDiskDevice returns the container device that is configured as root disk
+func GetRootDiskDevice(devices map[string]map[string]string) (string, map[string]string, error) {
+ var devName string
+ var dev map[string]string
+
+ for n, d := range devices {
+ if IsRootDiskDevice(d) {
+ if devName != "" {
+ return "", nil, fmt.Errorf("More than one root device found.")
+ }
+
+ devName = n
+ dev = d
+ }
+ }
+
+ if devName != "" {
+ return devName, dev, nil
+ }
+
+ return "", nil, fmt.Errorf("No root device could be found.")
+}
+
// KnownContainerConfigKeys maps all fully defined, well-known config keys
// to an appropriate checker function, which validates whether or not a
// given value is syntactically legal.
From 4e4416e1f0a1de97d6032c963e6ae0d5ff2c24a9 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 14 Mar 2018 14:42:05 +0000
Subject: [PATCH 2/5] Add ContainerNodeMove db helper for re-linking db entries
of a ceph container
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/db/containers.go | 104 +++++++++++++++++++++++++++++++++++++++++++++-
lxd/db/containers_test.go | 27 ++++++++++++
2 files changed, 130 insertions(+), 1 deletion(-)
diff --git a/lxd/db/containers.go b/lxd/db/containers.go
index 9e3519ce0..765ae4d26 100644
--- a/lxd/db/containers.go
+++ b/lxd/db/containers.go
@@ -5,9 +5,11 @@ import (
"fmt"
"time"
+ "github.com/lxc/lxd/lxd/db/query"
"github.com/lxc/lxd/lxd/types"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/logger"
+ "github.com/pkg/errors"
log "github.com/lxc/lxd/shared/log15"
)
@@ -168,6 +170,93 @@ SELECT containers.name, nodes.name
return result, nil
}
+// ContainerID returns the ID of the container with the given name.
+func (c *ClusterTx) ContainerID(name string) (int64, error) {
+ stmt := "SELECT id FROM containers WHERE name=?"
+ ids, err := query.SelectIntegers(c.tx, stmt, name)
+ if err != nil {
+ return -1, err
+ }
+ switch len(ids) {
+ case 0:
+ return -1, NoSuchObjectError
+ case 1:
+ return int64(ids[0]), nil
+ default:
+ return -1, fmt.Errorf("more than one container has the given name")
+ }
+}
+
+// ContainerNodeMove changes the node associated with a container.
+//
+// It's meant to be used when moving a non-running container backed by ceph
+// from one cluster node to another.
+func (c *ClusterTx) ContainerNodeMove(oldName, newName, newNode string) error {
+ // First check that the container to be moved is backed by a ceph
+ // volume.
+ poolName, err := c.ContainerPool(oldName)
+ if err != nil {
+ return errors.Wrap(err, "failed to get container's storage pool name")
+ }
+ poolID, err := c.StoragePoolID(poolName)
+ if err != nil {
+ return errors.Wrap(err, "failed to get container's storage pool ID")
+ }
+ poolDriver, err := c.StoragePoolDriver(poolID)
+ if err != nil {
+ return errors.Wrap(err, "failed to get container's storage pool driver")
+ }
+ if poolDriver != "ceph" {
+ return fmt.Errorf("container's storage pool is not of type ceph")
+ }
+
+ // Update the name of the container and the node ID it is associated
+ // with.
+ containerID, err := c.ContainerID(oldName)
+ if err != nil {
+ return errors.Wrap(err, "failed to get container's ID")
+ }
+
+ node, err := c.NodeByName(newNode)
+ if err != nil {
+ return errors.Wrap(err, "failed to get new node's info")
+ }
+
+ stmt := "UPDATE containers SET node_id=?, name=? WHERE id=?"
+ result, err := c.tx.Exec(stmt, node.ID, newName, containerID)
+ if err != nil {
+ return errors.Wrap(err, "failed to update container's name and node ID")
+ }
+ n, err := result.RowsAffected()
+ if err != nil {
+ return errors.Wrap(err, "failed to get rows affected by container update")
+ }
+ if n != 1 {
+ return fmt.Errorf("unexpected number of updated rows in containers table: %d", n)
+ }
+
+ // Update the container's storage volume name (since this is ceph,
+ // there's a clone of the volume for each node).
+ count, err := c.NodesCount()
+ if err != nil {
+ return errors.Wrap(err, "failed to get node's count")
+ }
+ stmt = "UPDATE storage_volumes SET name=? WHERE storage_pool_id=? AND type=?"
+ result, err = c.tx.Exec(stmt, newName, poolID, StoragePoolVolumeTypeContainer)
+ if err != nil {
+ return errors.Wrap(err, "failed to update container's volume name")
+ }
+ n, err = result.RowsAffected()
+ if err != nil {
+ return errors.Wrap(err, "failed to get rows affected by container volume update")
+ }
+ if n != int64(count) {
+ return fmt.Errorf("unexpected number of updated rows in volumes table: %d", n)
+ }
+
+ return nil
+}
+
func (c *Cluster) ContainerRemove(name string) error {
id, err := c.ContainerId(name)
if err != nil {
@@ -648,7 +737,20 @@ func (c *Cluster) ContainerNextSnapshot(name string) int {
}
// Get the storage pool of a given container.
+//
+// This is a non-transactional variant of ClusterTx.ContainerPool().
func (c *Cluster) ContainerPool(containerName string) (string, error) {
+ var poolName string
+ err := c.Transaction(func(tx *ClusterTx) error {
+ var err error
+ poolName, err = tx.ContainerPool(containerName)
+ return err
+ })
+ return poolName, err
+}
+
+// ContainerPool returns the storage pool of a given container.
+func (c *ClusterTx) ContainerPool(containerName string) (string, error) {
// Get container storage volume. Since container names are globally
// unique, and their storage volumes carry the same name, their storage
// volumes are unique too.
@@ -659,7 +761,7 @@ WHERE storage_volumes.node_id=? AND storage_volumes.name=? AND storage_volumes.t
inargs := []interface{}{c.nodeID, containerName, StoragePoolVolumeTypeContainer}
outargs := []interface{}{&poolName}
- err := dbQueryRowScan(c.db, query, inargs, outargs)
+ err := c.tx.QueryRow(query, inargs...).Scan(outargs...)
if err != nil {
if err == sql.ErrNoRows {
return "", NoSuchObjectError
diff --git a/lxd/db/containers_test.go b/lxd/db/containers_test.go
index 9f8e035d6..291a58992 100644
--- a/lxd/db/containers_test.go
+++ b/lxd/db/containers_test.go
@@ -5,6 +5,7 @@ import (
"time"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -62,6 +63,32 @@ func TestContainersByNodeName(t *testing.T) {
}, result)
}
+func TestContainerPool(t *testing.T) {
+ cluster, cleanup := db.NewTestCluster(t)
+ defer cleanup()
+
+ poolID, err := cluster.StoragePoolCreate("default", "", "dir", nil)
+ require.NoError(t, err)
+ _, err = cluster.StoragePoolVolumeCreate("c1", "", db.StoragePoolVolumeTypeContainer, poolID, nil)
+ require.NoError(t, err)
+
+ args := db.ContainerArgs{
+ Name: "c1",
+ Devices: types.Devices{
+ "root": types.Device{
+ "path": "/",
+ "pool": "default",
+ "type": "disk",
+ },
+ },
+ }
+ _, err = cluster.ContainerCreate(args)
+ require.NoError(t, err)
+ poolName, err := cluster.ContainerPool("c1")
+ require.NoError(t, err)
+ assert.Equal(t, "default", poolName)
+}
+
func addContainer(t *testing.T, tx *db.ClusterTx, nodeID int64, name string) {
stmt := `
INSERT INTO containers(node_id, name, architecture, type) VALUES (?, ?, 1, ?)
From 517314ee27cbcef9005d55de384f05d451e4d7a9 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 14 Mar 2018 14:56:57 +0000
Subject: [PATCH 3/5] Add support for moving ceph-based containers in a cluster
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
client/lxd_containers.go | 7 +-
lxc/move.go | 124 +++++++++++++++++++++++-
lxd/api_internal.go | 1 +
lxd/container_post.go | 247 +++++++++++++++++++++++++++++++++++++++++++++--
4 files changed, 364 insertions(+), 15 deletions(-)
diff --git a/client/lxd_containers.go b/client/lxd_containers.go
index c84567f6e..233f6c807 100644
--- a/client/lxd_containers.go
+++ b/client/lxd_containers.go
@@ -555,7 +555,12 @@ func (r *ProtocolLXD) MigrateContainer(name string, container api.ContainerPost)
}
// Send the request
- op, _, err := r.queryOperation("POST", fmt.Sprintf("/containers/%s", url.QueryEscape(name)), container, "")
+ path := fmt.Sprintf("/containers/%s", url.QueryEscape(name))
+ if r.clusterTarget != "" {
+ path += fmt.Sprintf("?target=%s", r.clusterTarget)
+ }
+
+ op, _, err := r.queryOperation("POST", path, container, "")
if err != nil {
return nil, err
}
diff --git a/lxc/move.go b/lxc/move.go
index ab568d795..d929427bc 100644
--- a/lxc/move.go
+++ b/lxc/move.go
@@ -9,6 +9,7 @@ import (
"github.com/lxc/lxd/shared/api"
"github.com/lxc/lxd/shared/gnuflag"
"github.com/lxc/lxd/shared/i18n"
+ "github.com/pkg/errors"
)
type moveCmd struct {
@@ -46,7 +47,7 @@ func (c *moveCmd) flags() {
}
func (c *moveCmd) run(conf *config.Config, args []string) error {
- if len(args) != 2 {
+ if len(args) != 2 && c.target == "" {
return errArgs
}
@@ -61,9 +62,14 @@ func (c *moveCmd) run(conf *config.Config, args []string) error {
return err
}
- destRemote, destName, err := conf.ParseRemote(args[1])
- if err != nil {
- return err
+ destRemote := sourceRemote
+ destName := ""
+ if len(args) == 2 {
+ var err error
+ destRemote, destName, err = conf.ParseRemote(args[1])
+ if err != nil {
+ return err
+ }
}
// Target node and destination remote can't be used together.
@@ -104,6 +110,26 @@ func (c *moveCmd) run(conf *config.Config, args []string) error {
return op.Wait()
}
+ sourceResource := args[0]
+ destResource := sourceResource
+ if len(args) == 2 {
+ destResource = args[1]
+ }
+
+ // If the target option was specified, we're moving a container from a
+ // cluster node to another. In case the rootfs of the container is
+ // backed by ceph, we want to re-use the same ceph volume. This assumes
+ // that the container is not running.
+ if c.target != "" {
+ moved, err := maybeMoveCephContaner(conf, sourceResource, destResource, c.target)
+ if err != nil {
+ return err
+ }
+ if moved {
+ return nil
+ }
+ }
+
cpy := copyCmd{}
cpy.target = c.target
@@ -111,7 +137,7 @@ func (c *moveCmd) run(conf *config.Config, args []string) error {
// A move is just a copy followed by a delete; however, we want to
// keep the volatile entries around since we are moving the container.
- err = cpy.copyContainer(conf, args[0], args[1], true, -1, stateful, c.containerOnly, mode)
+ err = cpy.copyContainer(conf, sourceResource, destResource, true, -1, stateful, c.containerOnly, mode)
if err != nil {
return err
}
@@ -120,3 +146,91 @@ func (c *moveCmd) run(conf *config.Config, args []string) error {
del.force = true
return del.run(conf, args[:1])
}
+
+// Helper to check if the container to be moved is backed by a ceph storage
+// pool, and use the special POST /containers/<name>?target=<node> API if so.
+//
+// It returns false if the container is not backed by ceph, true otherwise.
+func maybeMoveCephContaner(conf *config.Config, sourceResource, destResource, target string) (bool, error) {
+ // Parse the source.
+ sourceRemote, sourceName, err := conf.ParseRemote(sourceResource)
+ if err != nil {
+ return false, err
+ }
+
+ // Parse the destination.
+ destRemote, destName, err := conf.ParseRemote(destResource)
+ if err != nil {
+ return false, err
+ }
+
+ if sourceRemote != destRemote {
+ return false, fmt.Errorf(
+ i18n.G("You must use the same source and destination remote when using --target"))
+ }
+
+ // Make sure we have a container or snapshot name.
+ if sourceName == "" {
+ return false, fmt.Errorf(i18n.G("You must specify a source container name"))
+ }
+
+ // The destination name is optional.
+ if destName == "" {
+ destName = sourceName
+ }
+
+ // Connect to the source host
+ source, err := conf.GetContainerServer(sourceRemote)
+ if err != nil {
+ return false, err
+ }
+
+ if shared.IsSnapshot(sourceName) {
+ // TODO: implement moving snapshots.
+ return false, fmt.Errorf("Moving ceph snapshots between cluster nodes is not yet implemented")
+ }
+
+ // Check if the container to be moved is backed by ceph.
+ container, _, err := source.GetContainer(sourceName)
+ if err != nil {
+ // If we are unable to connect, we assume that the source node
+ // is offline, and we'll try to perform the migration. If the
+ // container turns out to not be backed by ceph, the migrate
+ // API will still return an error.
+ if !strings.Contains(err.Error(), "Unable to connect") {
+ return false, errors.Wrapf(err, "Failed to get container %s", sourceName)
+ }
+ }
+ if container != nil {
+ _, device, err := shared.GetRootDiskDevice(container.Devices)
+ if err != nil {
+ return false, errors.Wrapf(err, "Failed parse root disk device")
+ }
+
+ poolName, ok := device["pool"]
+ if !ok {
+ return false, nil
+ }
+
+ pool, _, err := source.GetStoragePool(poolName)
+ if err != nil {
+ return false, errors.Wrapf(err, "Failed get root disk device pool %s", poolName)
+ }
+ if pool.Driver != "ceph" {
+ return false, nil
+ }
+ }
+
+ // The migrate API will do the right thing when passed a target.
+ source = source.UseTarget(target)
+ req := api.ContainerPost{Name: destName, Migration: true}
+ op, err := source.MigrateContainer(sourceName, req)
+ if err != nil {
+ return false, err
+ }
+ err = op.Wait()
+ if err != nil {
+ return false, err
+ }
+ return true, nil
+}
diff --git a/lxd/api_internal.go b/lxd/api_internal.go
index 8627bf853..2db81f756 100644
--- a/lxd/api_internal.go
+++ b/lxd/api_internal.go
@@ -30,6 +30,7 @@ var apiInternal = []Command{
internalContainersCmd,
internalSQLCmd,
internalClusterAcceptCmd,
+ internalClusterContainerMovedCmd,
}
func internalReady(d *Daemon, r *http.Request) Response {
diff --git a/lxd/container_post.go b/lxd/container_post.go
index f77fbd04d..bca51c245 100644
--- a/lxd/container_post.go
+++ b/lxd/container_post.go
@@ -3,30 +3,103 @@ package main
import (
"bytes"
"encoding/json"
+ "fmt"
"io/ioutil"
"net/http"
"github.com/gorilla/mux"
+ "github.com/pkg/errors"
+ "github.com/lxc/lxd/lxd/cluster"
+ "github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
+ "github.com/lxc/lxd/shared/logger"
)
func containerPost(d *Daemon, r *http.Request) Response {
name := mux.Vars(r)["name"]
+ targetNode := r.FormValue("target")
- // Handle requests targeted to a container on a different node
- response, err := ForwardedResponseIfContainerIsRemote(d, r, name)
- if err != nil {
- return SmartError(err)
+ sourceNodeOffline := false
+ targetNodeOffline := false
+
+ // A POST to /containers/<name>?target=<node> is meant to be
+ // used to move a container backed by a ceph storage pool.
+ if targetNode != "" {
+ // Determine if either the source node (the one currently
+ // running the container) or the target node are offline.
+ //
+ // If the target node is offline, we return an error.
+ //
+ // If the source node is offline, we'll assume that the
+ // container is not running and it's safe to move it.
+ //
+ // TODO: add some sort of "force" flag to the API, to signal
+ // that the user really wants to move the container even
+ // if we can't know for sure that it's indeed not
+ // running?
+ err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
+ // Load cluster configuration.
+ config, err := cluster.ConfigLoad(tx)
+ if err != nil {
+ return errors.Wrap(err, "Failed to load LXD config")
+ }
+
+ // Load target node.
+ node, err := tx.NodeByName(targetNode)
+ if err != nil {
+ return errors.Wrap(err, "Failed to get target node")
+ }
+ targetNodeOffline = node.IsOffline(config.OfflineThreshold())
+
+ // Load source node.
+ address, err := tx.ContainerNodeAddress(name)
+ if err != nil {
+ return errors.Wrap(err, "Failed to get address of container's node")
+ }
+ if address == "" {
+ // Local node
+ sourceNodeOffline = false
+ return nil
+ }
+ node, err = tx.NodeByAddress(address)
+ if err != nil {
+ return errors.Wrapf(err, "Failed to get source node for %s", address)
+ }
+ sourceNodeOffline = node.IsOffline(config.OfflineThreshold())
+
+ return nil
+ })
+ if err != nil {
+ return SmartError(err)
+ }
}
- if response != nil {
- return response
+
+ if targetNode != "" && targetNodeOffline {
+ return BadRequest(fmt.Errorf("Target node is offline"))
}
- c, err := containerLoadByName(d.State(), name)
- if err != nil {
- return SmartError(err)
+ var c container
+
+ // For in-cluster migrations, only forward the request to the source
+ // node and load the container if the source node is online. We'll not
+ // check whether the container is running or try to unmap the RBD
+ // volume on it if the source node is offline.
+ if targetNode == "" || !sourceNodeOffline {
+ // Handle requests targeted to a container on a different node
+ response, err := ForwardedResponseIfContainerIsRemote(d, r, name)
+ if err != nil {
+ return SmartError(err)
+ }
+ if response != nil {
+ return response
+ }
+
+ c, err = containerLoadByName(d.State(), name)
+ if err != nil {
+ return SmartError(err)
+ }
}
body, err := ioutil.ReadAll(r.Body)
@@ -57,6 +130,12 @@ func containerPost(d *Daemon, r *http.Request) Response {
}
if req.Migration {
+ if targetNode != "" {
+ // A POST to /containers/<name>?target=<node> is meant to be
+ // used to move a container backed by a ceph storage pool.
+ return containerPostClusteringMigrateWithCeph(d, c, name, req.Name, targetNode)
+ }
+
ws, err := NewMigrationSource(c, stateful, req.ContainerOnly)
if err != nil {
return InternalError(err)
@@ -109,3 +188,153 @@ func containerPost(d *Daemon, r *http.Request) Response {
return OperationResponse(op)
}
+
+// Special case migrating a container backed by ceph across two cluster nodes.
+func containerPostClusteringMigrateWithCeph(d *Daemon, c container, oldName, newName, newNode string) Response {
+ if c != nil && c.IsRunning() {
+ return BadRequest(fmt.Errorf("Container is running"))
+ }
+
+ run := func(*operation) error {
+ // If source node is online (i.e. we're serving the request on
+ // it, and c != nil), let's unmap the RBD volume locally
+ if c != nil {
+ logger.Debugf(`Renaming RBD storage volume for source container "%s" from `+
+ `"%s" to "%s"`, c.Name(), c.Name(), newName)
+ poolName, err := c.StoragePool()
+ if err != nil {
+ return errors.Wrap(err, "Failed to get source container's storage pool name")
+ }
+ _, pool, err := d.cluster.StoragePoolGet(poolName)
+ if err != nil {
+ return errors.Wrap(err, "Failed to get source container's storage pool")
+ }
+ if pool.Driver != "ceph" {
+ return fmt.Errorf("Source container's storage pool is not of type ceph")
+ }
+ si, err := storagePoolVolumeContainerLoadInit(d.State(), c.Name())
+ if err != nil {
+ return errors.Wrap(err, "Failed to initialize source container's storage pool")
+ }
+ s, ok := si.(*storageCeph)
+ if !ok {
+ return fmt.Errorf("Unexpected source container storage backend")
+ }
+ err = cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName, c.Name(),
+ storagePoolVolumeTypeNameContainer, s.UserName, true)
+ if err != nil {
+ return errors.Wrap(err, "Failed to unmap source container's RBD volume")
+ }
+
+ }
+
+ // Re-link the database entries against the new node name.
+ var poolName string
+ err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
+ err := tx.ContainerNodeMove(oldName, newName, newNode)
+ if err != nil {
+ return err
+ }
+ poolName, err = tx.ContainerPool(newName)
+ if err != nil {
+ return err
+ }
+ return nil
+ })
+ if err != nil {
+ return errors.Wrap(err, "Failed to relink container database data")
+ }
+
+ // Rename the RBD volume if necessary.
+ if newName != oldName {
+ s := storageCeph{}
+ _, s.pool, err = d.cluster.StoragePoolGet(poolName)
+ if err != nil {
+ return errors.Wrap(err, "Failed to get storage pool")
+ }
+ if err != nil {
+ return errors.Wrap(err, "Failed to get storage pool")
+ }
+ err = s.StoragePoolInit()
+ if err != nil {
+ return errors.Wrap(err, "Failed to initialize ceph storage pool")
+ }
+ err = cephRBDVolumeRename(s.ClusterName, s.OSDPoolName,
+ storagePoolVolumeTypeNameContainer, oldName, newName, s.UserName)
+ if err != nil {
+ return errors.Wrap(err, "Failed to rename ceph RBD volume")
+ }
+ }
+
+ // Create the container mount point on the target node
+ cert := d.endpoints.NetworkCert()
+ client, err := cluster.ConnectIfContainerIsRemote(d.cluster, newName, cert)
+ if err != nil {
+ return errors.Wrap(err, "Failed to connect to target node")
+ }
+ if client == nil {
+ err := containerPostCreateContainerMountPoint(d, newName)
+ if err != nil {
+ return errors.Wrap(err, "Failed to create mount point on target node")
+ }
+ } else {
+ path := fmt.Sprintf("/internal/cluster/container-moved/%s", newName)
+ resp, _, err := client.RawQuery("POST", path, nil, "")
+ if err != nil {
+ return errors.Wrap(err, "Failed to create mount point on target node")
+ }
+ if resp.StatusCode != 200 {
+ return fmt.Errorf("Failed to create mount point on target node: %s", resp.Error)
+ }
+ }
+
+ return nil
+ }
+
+ resources := map[string][]string{}
+ resources["containers"] = []string{oldName}
+ op, err := operationCreate(d.cluster, operationClassTask, "Moving container", resources, nil, run, nil, nil)
+ if err != nil {
+ return InternalError(err)
+ }
+
+ return OperationResponse(op)
+}
+
+var internalClusterContainerMovedCmd = Command{
+ name: "cluster/container-moved/{name}",
+ post: internalClusterContainerMovedPost,
+}
+
+// Notification that a container was moved.
+//
+// At the moment it used for ceph-based containers, where the target node needs
+// to create the appropriate mount points.
+func internalClusterContainerMovedPost(d *Daemon, r *http.Request) Response {
+ containerName := mux.Vars(r)["name"]
+ err := containerPostCreateContainerMountPoint(d, containerName)
+ if err != nil {
+ return SmartError(err)
+ }
+ return EmptySyncResponse
+}
+
+// Used after to create the appropriate mount point after a container has been
+// moved.
+func containerPostCreateContainerMountPoint(d *Daemon, containerName string) error {
+ c, err := containerLoadByName(d.State(), containerName)
+ if err != nil {
+ return errors.Wrap(err, "Failed to load moved container on target node")
+ }
+ poolName, err := c.StoragePool()
+ if err != nil {
+ return errors.Wrap(err, "Failed get pool name of moved container on target node")
+ }
+ // This is the target node itself.
+ containerMntPoint := getContainerMountPoint(poolName, containerName)
+ err = createContainerMountpoint(containerMntPoint, c.Path(), c.IsPrivileged())
+ if err != nil {
+ return errors.Wrap(err, "Failed to create container mount point on target node")
+ }
+ return nil
+}
From d182925838ed6be1cad381de8f34a834afec154f Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 15 Mar 2018 12:58:56 +0000
Subject: [PATCH 4/5] Fix broken copy of storage volume configs when joining a
cluster
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
lxd/db/storage_pools.go | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/lxd/db/storage_pools.go b/lxd/db/storage_pools.go
index c838879de..8b2da0904 100644
--- a/lxd/db/storage_pools.go
+++ b/lxd/db/storage_pools.go
@@ -140,13 +140,12 @@ INSERT INTO storage_volumes(name, storage_pool_id, node_id, type, description)
return errors.Wrap(err, "failed to create node ceph volumes")
}
+ // Create entries of all the ceph volumes configs for the new node.
stmt = `
SELECT id FROM storage_volumes WHERE storage_pool_id=? AND node_id=?
ORDER BY name, type
`
-
- // Create entries of all the ceph volumes configs for the new node.
- volumeIDs, err := query.SelectIntegers(c.tx, stmt, poolID, otherNodeID)
+ volumeIDs, err := query.SelectIntegers(c.tx, stmt, poolID, nodeID)
if err != nil {
return errors.Wrap(err, "failed to get joining node's ceph volume IDs")
}
From 879ddd0281e887df97c4dbe59df294ae7b9e5744 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Thu, 15 Mar 2018 13:07:11 +0000
Subject: [PATCH 5/5] Add integration tests
Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
test/suites/clustering.sh | 48 +++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 48 insertions(+)
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index 171018bed..79b5d6857 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -367,6 +367,54 @@ test_clustering_storage() {
! LXD_DIR="${LXD_ONE_DIR}" lxc storage show pool1 | grep -q rsync.bwlimit
fi
+ # Test migration of ceph-based containers
+ if [ "${driver}" = "ceph" ]; then
+ LXD_DIR="${LXD_TWO_DIR}" ensure_import_testimage
+ LXD_DIR="${LXD_ONE_DIR}" lxc launch --target node2 -s pool1 testimage foo
+
+ # The container can't be moved if it's running
+ ! LXD_DIR="${LXD_TWO_DIR}" lxc move foo --target node1 || false
+
+ # Move the container to node1
+ LXD_DIR="${LXD_ONE_DIR}" lxc stop foo
+ LXD_DIR="${LXD_TWO_DIR}" lxc move foo --target node1
+ LXD_DIR="${LXD_TWO_DIR}" lxc info foo | grep -q "Location: node1"
+
+ # Start and stop the container on its new node1 host
+ LXD_DIR="${LXD_TWO_DIR}" lxc start foo
+ LXD_DIR="${LXD_TWO_DIR}" lxc stop foo
+
+ # Spawn a third node
+ setup_clustering_netns 3
+ LXD_THREE_DIR=$(mktemp -d -p "${TEST_DIR}" XXX)
+ chmod +x "${LXD_THREE_DIR}"
+ ns3="${prefix}3"
+ spawn_lxd_and_join_cluster "${ns3}" "${bridge}" "${cert}" 3 1 "${LXD_THREE_DIR}" "${driver}"
+
+ # Move the container to node3, renaming it
+ LXD_DIR="${LXD_TWO_DIR}" lxc move foo bar --target node3
+ LXD_DIR="${LXD_TWO_DIR}" lxc info bar | grep -q "Location: node3"
+
+ # Shutdown node 3, and wait for it to be considered offline.
+ LXD_DIR="${LXD_THREE_DIR}" lxc config set cluster.offline_threshold 5
+ LXD_DIR="${LXD_THREE_DIR}" lxd shutdown
+ sleep 10
+
+ # Move the container back to node2, even if node3 is offline
+ LXD_DIR="${LXD_ONE_DIR}" lxc move bar --target node2
+ LXD_DIR="${LXD_ONE_DIR}" lxc info bar | grep -q "Location: node2"
+
+ # Start and stop the container on its new node2 host
+ LXD_DIR="${LXD_TWO_DIR}" lxc start bar
+ LXD_DIR="${LXD_ONE_DIR}" lxc stop bar
+
+ LXD_DIR="${LXD_ONE_DIR}" lxc config set cluster.offline_threshold 20
+ LXD_DIR="${LXD_ONE_DIR}" lxc cluster delete node3 --force
+
+ LXD_DIR="${LXD_ONE_DIR}" lxc delete bar
+ LXD_DIR="${LXD_ONE_DIR}" lxc image delete testimage
+ fi
+
# Delete the storage pool
LXD_DIR="${LXD_ONE_DIR}" lxc storage delete pool1
! LXD_DIR="${LXD_ONE_DIR}" lxc storage list | grep -q pool1
More information about the lxc-devel
mailing list