[lxc-devel] [lxd/master] Add Ceph storage driver

monstermunchkin on Github lxc-bot at linuxcontainers.org
Mon Jan 20 14:59:49 UTC 2020


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20200120/b609552a/attachment-0001.bin>
-------------- next part --------------
From 7b75c0d99dcf8ee752541ae72d9791d4a7dab35a Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Thu, 19 Dec 2019 13:46:47 +0100
Subject: [PATCH 1/2] lxd/storage/drivers: Add Ceph driver

Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
 lxd/storage/drivers/driver_ceph.go         |  286 +++
 lxd/storage/drivers/driver_ceph_utils.go   | 1990 ++++++++++++++++++++
 lxd/storage/drivers/driver_ceph_volumes.go |  857 +++++++++
 lxd/storage/drivers/load.go                |    1 +
 4 files changed, 3134 insertions(+)
 create mode 100644 lxd/storage/drivers/driver_ceph.go
 create mode 100644 lxd/storage/drivers/driver_ceph_utils.go
 create mode 100644 lxd/storage/drivers/driver_ceph_volumes.go

diff --git a/lxd/storage/drivers/driver_ceph.go b/lxd/storage/drivers/driver_ceph.go
new file mode 100644
index 0000000000..c76f50f18e
--- /dev/null
+++ b/lxd/storage/drivers/driver_ceph.go
@@ -0,0 +1,286 @@
+package drivers
+
+import (
+	"fmt"
+	"os/exec"
+	"strings"
+
+	"github.com/lxc/lxd/lxd/migration"
+	"github.com/lxc/lxd/lxd/operations"
+	"github.com/lxc/lxd/shared"
+	"github.com/lxc/lxd/shared/api"
+	"github.com/lxc/lxd/shared/logger"
+	"github.com/lxc/lxd/shared/units"
+)
+
+var cephAllowedFilesystems = []string{"btrfs", "ext4"}
+var cephVersion string
+var cephLoaded bool
+
+type ceph struct {
+	common
+}
+
+func (d *ceph) load() error {
+	// Register the patches.
+	d.patches = map[string]func() error{
+		"storage_create_vm": nil,
+	}
+
+	// Done if previously loaded.
+	if cephLoaded {
+		return nil
+	}
+
+	// Validate the required binaries.
+	for _, tool := range []string{"ceph", "rbd"} {
+		_, err := exec.LookPath(tool)
+		if err != nil {
+			return fmt.Errorf("Required tool '%s' is missing", tool)
+		}
+	}
+
+	// Detect and record the version.
+	if cephVersion == "" {
+		out, err := shared.RunCommand("rbd", "--version")
+		if err != nil {
+			return err
+		}
+
+		cephVersion = strings.TrimSpace(out)
+	}
+
+	cephLoaded = true
+	return nil
+}
+
+// Info returns info about the driver and its environment.
+func (d *ceph) Info() Info {
+	return Info{
+		Name:                  "ceph",
+		Version:               cephVersion,
+		OptimizedImages:       true,
+		PreservesInodes:       false,
+		Remote:                true,
+		VolumeTypes:           []VolumeType{VolumeTypeCustom, VolumeTypeImage, VolumeTypeContainer, VolumeTypeVM},
+		BlockBacking:          false,
+		RunningQuotaResize:    false,
+		RunningSnapshotFreeze: true,
+	}
+}
+
+func (d *ceph) Create() error {
+	revert := true
+
+	d.config["volatile.initial_source"] = d.config["source"]
+
+	// Set default properties if missing.
+	if d.config["ceph.cluster_name"] == "" {
+		d.config["ceph.cluster_name"] = "ceph"
+	}
+
+	if d.config["ceph.user_name"] == "" {
+		d.config["ceph.user_name"] = "admin"
+	}
+
+	if d.config["ceph.osd.pg_num"] == "" {
+		d.config["ceph.osd.pg_num"] = "32"
+	} else {
+		// Validate
+		_, err := units.ParseByteSizeString(d.config["ceph.osd.pg_num"])
+		if err != nil {
+			return err
+		}
+	}
+
+	// sanity check
+	if d.config["source"] != "" &&
+		d.config["ceph.osd.pool_name"] != "" &&
+		d.config["source"] != d.config["ceph.osd.pool_name"] {
+		return fmt.Errorf(`The "source" and "ceph.osd.pool_name" property must not differ for CEPH OSD storage pools`)
+	}
+
+	// use an existing OSD pool
+	if d.config["source"] != "" {
+		d.config["ceph.osd.pool_name"] = d.config["source"]
+	}
+
+	if d.config["ceph.osd.pool_name"] == "" {
+		d.config["ceph.osd.pool_name"] = d.name
+		d.config["source"] = d.name
+	}
+
+	if !d.osdPoolExists() {
+		// Create new osd pool
+		err := d.osdCreatePool()
+		if err != nil {
+			return fmt.Errorf("Failed to create Ceph OSD pool %q in cluster %q: %s", d.config["ceph.osd.pool_name"], d.config["ceph.cluster_name"], err)
+		}
+
+		defer func() {
+			if !revert {
+				return
+			}
+
+			err := d.osdDeletePool()
+			if err != nil {
+				logger.Warnf("Failed to delete Ceph OSD pool %q in cluster %q: %s", d.config["ceph.osd.pool_name"], d.config["ceph.cluster_name"], err)
+			}
+		}()
+
+		// Create dummy storage volume. Other LXD instances will use this to detect whether this osd pool is already in use by another LXD instance.
+		err = d.rbdCreateVolume(d.config["ceph.osd.pool_name"], "lxd", "0")
+		if err != nil {
+			logger.Errorf("Failed to create RBD volume %q on OSD pool %q: %s", d.config["ceph.osd.pool_name"], d.config["ceph.osd.pool_name"], err)
+			return err
+		}
+		d.config["volatile.pool.pristine"] = "true"
+	} else {
+		ok := d.rbdVolumeExists(d.config["ceph.osd.pool_name"], "lxd")
+		d.config["volatile.pool.pristine"] = "false"
+		if ok {
+			if d.config["ceph.osd.force_reuse"] == "" || !shared.IsTrue(d.config["ceph.osd.force_reuse"]) {
+				return fmt.Errorf("Ceph OSD pool %q in cluster %q seems to be in use by another LXD instance. Use \"ceph.osd.force_reuse=true\" to force", d.config["ceph.osd.pool_name"], d.config["ceph.cluster_name"])
+			}
+		}
+
+		// Use existing osd pool
+		msg, err := d.osdGetPoolPGNum()
+		if err != nil {
+			return fmt.Errorf("Failed to retrieve number of placement groups for Ceph OSD pool %q in cluster %q: %s", d.config["ceph.osd.pool_name"], d.config["ceph.cluster_name"], err)
+		}
+
+		logger.Debugf("Retrieved number of placement groups or Ceph OSD pool %q in cluster %q", d.config["ceph.osd.pool_name"], d.config["ceph.cluster_name"])
+
+		idx := strings.Index(msg, "pg_num:")
+		if idx == -1 {
+			return fmt.Errorf("Failed to parse number of placement groups for Ceph OSD pool %q in cluster %q: %s", d.config["ceph.osd.pool_name"], d.config["ceph.cluster_name"], msg)
+		}
+
+		msg = msg[(idx + len("pg_num:")):]
+		msg = strings.TrimSpace(msg)
+
+		// It is ok to update the pool configuration since storage pool
+		// creation via API is implemented such that the storage pool is
+		// checked for a changed config after this function returns and
+		// if so the db for it is updated.
+		d.config["ceph.osd.pg_num"] = msg
+	}
+
+	if d.config["source"] == "" {
+		d.config["source"] = d.config["ceph.osd.pool_name"]
+	}
+
+	// set immutable ceph.cluster_name property
+	if d.config["ceph.cluster_name"] == "" {
+		d.config["ceph.cluster_name"] = "ceph"
+	}
+
+	// set immutable ceph.osd.pool_name property
+	if d.config["ceph.osd.pool_name"] == "" {
+		d.config["ceph.osd.pool_name"] = d.name
+	}
+
+	if d.config["ceph.osd.pg_num"] == "" {
+		d.config["ceph.osd.pg_num"] = "32"
+	}
+
+	revert = false
+
+	return nil
+}
+
+func (d *ceph) Delete(op *operations.Operation) error {
+	// test if pool exists
+	poolExists := d.osdPoolExists()
+	if !poolExists {
+		logger.Warnf("Ceph OSD pool %q does not exist in cluster %q", d.config["ceph.osd.pool_name"], d.config["ceph.cluster_name"])
+	}
+
+	// Check whether we own the pool and only remove in this case.
+	if d.config["volatile.pool.pristine"] != "" &&
+		shared.IsTrue(d.config["volatile.pool.pristine"]) {
+		logger.Debugf("Detected that this LXD instance is the owner of the Ceph OSD pool %q in cluster %q", d.config["ceph.osd.pool_name"], d.config["ceph.cluster_name"])
+
+		// Delete the osd pool.
+		if poolExists {
+			err := d.osdDeletePool()
+			if err != nil {
+				return fmt.Errorf("Failed to delete Ceph OSD pool %q in cluster %q: %s", d.config["ceph.osd.pool_name"], d.config["ceph.cluster_name"], err)
+			}
+		}
+	}
+
+	// If the user completely destroyed it, call it done.
+	if !shared.PathExists(GetPoolMountPath(d.name)) {
+		return nil
+	}
+
+	// On delete, wipe everything in the directory.
+	err := wipeDirectory(GetPoolMountPath(d.name))
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) Mount() (bool, error) {
+	// Nothing to do here.
+	return true, nil
+}
+func (d *ceph) Unmount() (bool, error) {
+	// Nothing to do here.
+	return true, nil
+}
+
+func (d *ceph) GetResources() (*api.ResourcesStoragePool, error) {
+	return d.getPoolUsage()
+}
+
+func (d *ceph) Validate(config map[string]string) error {
+	rules := map[string]func(value string) error{
+		"ceph.cluster_name":       shared.IsAny,
+		"ceph.osd.force_reuse":    shared.IsBool,
+		"ceph.osd.pg_num":         shared.IsAny,
+		"ceph.osd.pool_name":      shared.IsAny,
+		"ceph.osd.data_pool_name": shared.IsAny,
+		"ceph.rbd.clone_copy":     shared.IsBool,
+		"ceph.user_name":          shared.IsAny,
+		"volatile.pool.pristine":  shared.IsAny,
+		"volume.block.filesystem": func(value string) error {
+			if value == "" {
+				return nil
+			}
+			return shared.IsOneOf(value, cephAllowedFilesystems)
+		},
+		"volume.block.mount_options": shared.IsAny,
+	}
+
+	return d.validatePool(config, rules)
+}
+
+func (d *ceph) Update(changedConfig map[string]string) error {
+	return nil
+}
+
+func (d *ceph) MigrationTypes(contentType ContentType, refresh bool) []migration.Type {
+	if refresh {
+		return []migration.Type{
+			{
+				FSType:   migration.MigrationFSType_RSYNC,
+				Features: []string{"delete", "compress", "bidirectional"},
+			},
+		}
+	}
+
+	return []migration.Type{
+		{
+			FSType: migration.MigrationFSType_RBD,
+		},
+		{
+			FSType:   migration.MigrationFSType_RSYNC,
+			Features: []string{"delete", "compress", "bidirectional"},
+		},
+	}
+}
diff --git a/lxd/storage/drivers/driver_ceph_utils.go b/lxd/storage/drivers/driver_ceph_utils.go
new file mode 100644
index 0000000000..412e2d4b6e
--- /dev/null
+++ b/lxd/storage/drivers/driver_ceph_utils.go
@@ -0,0 +1,1990 @@
+package drivers
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"os"
+	"os/exec"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"github.com/pborman/uuid"
+	"golang.org/x/sys/unix"
+
+	"github.com/lxc/lxd/lxd/db"
+	"github.com/lxc/lxd/lxd/operations"
+	"github.com/lxc/lxd/lxd/rsync"
+	"github.com/lxc/lxd/shared"
+	"github.com/lxc/lxd/shared/api"
+	"github.com/lxc/lxd/shared/ioprogress"
+	"github.com/lxc/lxd/shared/units"
+)
+
+// osdCreatePool creates an OSD pool.
+func (d *ceph) osdCreatePool() error {
+	// Create new osd pool
+	_, err := shared.TryRunCommand("ceph",
+		"--name", fmt.Sprintf("client.%s", d.config["ceph.user_name"]),
+		"--cluster", d.config["ceph.cluster_name"],
+		"osd",
+		"pool",
+		"create",
+		d.config["ceph.osd.pool_name"],
+		d.config["ceph.osd.pg_num"])
+
+	return err
+}
+
+// osdPoolExists checks whether a given OSD pool exists.
+func (d *ceph) osdPoolExists() bool {
+	_, err := shared.RunCommand(
+		"ceph",
+		"--name", fmt.Sprintf("client.%s", d.config["ceph.user_name"]),
+		"--cluster", d.config["ceph.cluster_name"],
+		"osd",
+		"pool",
+		"get",
+		d.config["ceph.osd.pool_name"],
+		"size")
+	if err != nil {
+		return false
+	}
+
+	return true
+}
+
+// osdDeletePool destroys an OSD pool.
+// - A call to osdDeletePool will destroy a pool including any storage
+//   volumes that still exist in the pool.
+// - In case the OSD pool that is supposed to be deleted does not exist this
+//   command will still exit 0. This means that if the caller wants to be sure
+//   that this call actually deleted an OSD pool it needs to check for the
+//   existence of the pool first.
+func (d *ceph) osdDeletePool() error {
+	_, err := shared.RunCommand("ceph",
+		"--name", fmt.Sprintf("client.%s", d.config["ceph.user_name"]),
+		"--cluster", d.config["ceph.cluster_name"],
+		"osd",
+		"pool",
+		"delete",
+		d.config["ceph.osd.pool_name"],
+		d.config["ceph.osd.pool_name"],
+		"--yes-i-really-really-mean-it")
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) osdGetPoolPGNum() (string, error) {
+	out, err := shared.RunCommand("ceph",
+		"--name", fmt.Sprintf("client.%s", d.config["ceph.user_name"]),
+		"--cluster", d.config["ceph.cluster_name"],
+		"osd",
+		"pool",
+		"get",
+		d.config["ceph.osd.pool_name"],
+		"pg_num")
+
+	return out, err
+}
+
+func (d *ceph) getPoolUsage() (*api.ResourcesStoragePool, error) {
+	var stdout bytes.Buffer
+
+	err := shared.RunCommandWithFds(nil, &stdout,
+		"ceph",
+		"--name", fmt.Sprintf("client.%s", d.config["ceph.user_name"]),
+		"--cluster", d.config["ceph.cluster_name"],
+		"df",
+		"-f", "json")
+	if err != nil {
+		return nil, err
+	}
+
+	// Temporary structs for parsing
+	type cephDfPoolStats struct {
+		BytesUsed      int64 `json:"bytes_used"`
+		BytesAvailable int64 `json:"max_avail"`
+	}
+
+	type cephDfPool struct {
+		Name  string          `json:"name"`
+		Stats cephDfPoolStats `json:"stats"`
+	}
+
+	type cephDf struct {
+		Pools []cephDfPool `json:"pools"`
+	}
+
+	// Parse the JSON output
+	df := cephDf{}
+	err = json.NewDecoder(&stdout).Decode(&df)
+	if err != nil {
+		return nil, err
+	}
+
+	var pool *cephDfPool
+	for _, entry := range df.Pools {
+		if entry.Name == d.config["ceph.osd.pool_name"] {
+			pool = &entry
+			break
+		}
+	}
+
+	if pool == nil {
+		return nil, fmt.Errorf("OSD pool missing in df output")
+	}
+
+	spaceUsed := uint64(pool.Stats.BytesUsed)
+	spaceAvailable := uint64(pool.Stats.BytesAvailable)
+
+	res := api.ResourcesStoragePool{}
+	res.Space.Total = spaceAvailable + spaceUsed
+	res.Space.Used = spaceUsed
+
+	return &res, nil
+}
+
+// rbdCreateVolume creates an RBD storage volume.
+// Note that the set of features is intentionally limited is intentionally
+// limited by passing --image-feature explicitly. This is done to ensure that
+// the chances of a conflict between the features supported by the userspace
+// library and the kernel module are minimized. Otherwise random panics might
+// occur.
+func (d *ceph) rbdCreateVolume(volumeName string,
+	volumeType string, size string) error {
+	cmd := []string{
+		"--id", d.config["ceph.user_name"],
+		"--image-feature", "layering,",
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+	}
+
+	if d.config["ceph.osd.data_pool_name"] != "" {
+		cmd = append(cmd, "--data-pool", d.config["ceph.osd.data_pool_name"])
+	}
+
+	cmd = append(cmd,
+		"--size", size,
+		"create",
+		d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+
+	_, err := shared.RunCommand("rbd", cmd...)
+	return err
+}
+
+// rbdVolumeExists checks whether a given RBD storage volume exists.
+func (d *ceph) rbdVolumeExists(volumeName string, volumeType string) bool {
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"image-meta",
+		"list",
+		d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+// rbdVolumeSnapshotExists checks whether a given RBD snapshot exists.
+func (d *ceph) rbdVolumeSnapshotExists(volumeName string, volumeType string, snapshotName string) bool {
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"info",
+		d.getRBDVolumeName(volumeName, snapshotName, volumeType, false, false))
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+// rbdDeleteVolume deletes an RBD storage volume.
+// - In case the RBD storage volume that is supposed to be deleted does not
+//   exist this command will still exit 0. This means that if the caller wants
+//   to be sure that this call actually deleted an RBD storage volume it needs
+//   to check for the existence of the pool first.
+func (d *ceph) rbdDeleteVolume(volumeName string, volumeType string) error {
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"rm",
+		d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// rbdMapVolume maps a given RBD storage volume
+// This will ensure that the RBD storage volume is accessible as a block device
+// in the /dev directory and is therefore necessary in order to mount it.
+func (d *ceph) rbdMapVolume(volumeName string, volumeType string) (string, error) {
+	devPath, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"map",
+		d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+	if err != nil {
+		return "", err
+	}
+
+	idx := strings.Index(devPath, "/dev/rbd")
+	if idx < 0 {
+		return "", fmt.Errorf("Failed to detect mapped device path")
+	}
+
+	devPath = devPath[idx:]
+	return strings.TrimSpace(devPath), nil
+}
+
+// rbdUnmapVolume unmaps a given RBD storage volume
+// This is a precondition in order to delete an RBD storage volume can.
+func (d *ceph) rbdUnmapVolume(volumeName string, volumeType string, unmapUntilEINVAL bool) error {
+	busyCount := 0
+
+again:
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"unmap",
+		d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+	if err != nil {
+		runError, ok := err.(shared.RunError)
+		if ok {
+			exitError, ok := runError.Err.(*exec.ExitError)
+			if ok {
+				waitStatus := exitError.Sys().(syscall.WaitStatus)
+				if waitStatus.ExitStatus() == 22 {
+					// EINVAL (already unmapped)
+					return nil
+				}
+
+				if waitStatus.ExitStatus() == 16 {
+					// EBUSY (currently in use)
+					busyCount++
+					if busyCount == 10 {
+						return err
+					}
+
+					// Wait a second an try again
+					time.Sleep(time.Second)
+					goto again
+				}
+			}
+		}
+
+		return err
+	}
+
+	if unmapUntilEINVAL {
+		goto again
+	}
+
+	return nil
+}
+
+// rbdUnmapVolumeSnapshot unmaps a given RBD snapshot
+// This is a precondition in order to delete an RBD snapshot can.
+func (d *ceph) rbdUnmapVolumeSnapshot(volumeName string, volumeType string, snapshotName string, unmapUntilEINVAL bool) error {
+again:
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"unmap",
+		d.getRBDVolumeName(volumeName, snapshotName, volumeType, false, false))
+	if err != nil {
+		runError, ok := err.(shared.RunError)
+		if ok {
+			exitError, ok := runError.Err.(*exec.ExitError)
+			if ok {
+				waitStatus := exitError.Sys().(syscall.WaitStatus)
+				if waitStatus.ExitStatus() == 22 {
+					// EINVAL (already unmapped)
+					return nil
+				}
+			}
+		}
+		return err
+	}
+
+	if unmapUntilEINVAL {
+		goto again
+	}
+
+	return nil
+}
+
+// rbdCreateVolumeSnapshot creates a read-write snapshot of a given RBD storage
+// volume
+func (d *ceph) rbdCreateVolumeSnapshot(volumeName string, volumeType string, snapshotName string) error {
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"snap",
+		"create",
+		"--snap", snapshotName,
+		d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// rbdPurgeVolumeSnapshots deletes all snapshot of a given RBD storage volume
+// Note that this will only succeed if none of the snapshots are protected.
+func (d *ceph) rbdPurgeVolumeSnapshots(volumeName string, volumeType string) error {
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"snap",
+		"purge",
+		d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// rbdProtectVolumeSnapshot protects a given snapshot from being deleted
+// This is a precondition to be able to create RBD clones from a given snapshot.
+func (d *ceph) rbdProtectVolumeSnapshot(volumeName string, volumeType string, snapshotName string) error {
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"snap",
+		"protect",
+		"--snap", snapshotName,
+		d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+	if err != nil {
+		runError, ok := err.(shared.RunError)
+		if ok {
+			exitError, ok := runError.Err.(*exec.ExitError)
+			if ok {
+				waitStatus := exitError.Sys().(syscall.WaitStatus)
+				if waitStatus.ExitStatus() == 16 {
+					// EBUSY (snapshot already protected)
+					return nil
+				}
+			}
+		}
+		return err
+	}
+
+	return nil
+}
+
+// rbdUnprotectVolumeSnapshot unprotects a given snapshot
+// - This is a precondition to be able to delete an RBD snapshot.
+// - This command will only succeed if the snapshot does not have any clones.
+func (d *ceph) rbdUnprotectVolumeSnapshot(volumeName string, volumeType string, snapshotName string) error {
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"snap",
+		"unprotect",
+		"--snap", snapshotName,
+		d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+	if err != nil {
+		runError, ok := err.(shared.RunError)
+		if ok {
+			exitError, ok := runError.Err.(*exec.ExitError)
+			if ok {
+				waitStatus := exitError.Sys().(syscall.WaitStatus)
+				if waitStatus.ExitStatus() == 22 {
+					// EBUSY (snapshot already unprotected)
+					return nil
+				}
+			}
+		}
+		return err
+	}
+
+	return nil
+}
+
+// rbdCreateClone creates a clone from a protected RBD snapshot
+func (d *ceph) rbdCreateClone(sourceVolumeName string, sourceVolumeType string, sourceSnapshotName string, targetVolumeName string, targetVolumeType string) error {
+	cmd := []string{
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--image-feature", "layering",
+	}
+
+	if d.config["ceph.osd.data_pool_name"] != "" {
+		cmd = append(cmd, "--data-pool", d.config["ceph.osd.data_pool_name"])
+	}
+
+	cmd = append(cmd,
+		"clone",
+		d.getRBDVolumeName(sourceVolumeName, sourceSnapshotName, sourceVolumeType, false, true),
+		d.getRBDVolumeName(targetVolumeName, "", targetVolumeType, false, true))
+
+	_, err := shared.RunCommand("rbd", cmd...)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// rbdListSnapshotClones list all clones of an RBD snapshot
+func (d *ceph) rbdListSnapshotClones(volumeName string, volumeType string, snapshotName string) ([]string, error) {
+	msg, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"children",
+		"--image", d.getRBDVolumeName(volumeName, "", volumeType, false, false),
+		"--snap", snapshotName)
+	if err != nil {
+		return nil, err
+	}
+
+	msg = strings.TrimSpace(msg)
+	clones := strings.Fields(msg)
+	if len(clones) == 0 {
+		return nil, db.ErrNoSuchObject
+	}
+
+	return clones, nil
+}
+
+// rbdMarkVolumeDeleted marks an RBD storage volume as being in "zombie"
+// state
+// An RBD storage volume that is in zombie state is not tracked in LXD's
+// database anymore but still needs to be kept around for the sake of any
+// dependent storage entities in the storage pool. This usually happens when an
+// RBD storage volume has protected snapshots; a scenario most common when
+// creating a sparse copy of a container or when LXD updated an image and the
+// image still has dependent container clones.
+func (d *ceph) rbdMarkVolumeDeleted(volumeType string, oldVolumeName string, newVolumeName string, suffix string) error {
+	deletedName := d.getRBDVolumeName(newVolumeName, "", volumeType, true, true)
+	if suffix != "" {
+		deletedName = fmt.Sprintf("%s_%s", deletedName, suffix)
+	}
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"mv",
+		d.getRBDVolumeName(oldVolumeName, "", volumeType, false, true),
+		deletedName)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// rbdUnmarkVolumeDeleted unmarks an RBD storage volume as being in "zombie"
+// state
+// - An RBD storage volume that is in zombie is not tracked in LXD's database
+//   anymore but still needs to be kept around for the sake of any dependent
+//   storage entities in the storage pool.
+// - This function is mostly used when a user has deleted the storage volume of
+//   an image from the storage pool and then triggers a container creation. If
+//   LXD detects that the storage volume for the given hash already exists in
+//   the pool but is marked as "zombie" it will unmark it as a zombie instead of
+//   creating another storage volume for the image.
+func (d *ceph) rbdUnmarkVolumeDeleted(volumeName string, volumeType string, oldSuffix string, newSuffix string) error {
+	oldName := d.getRBDVolumeName(volumeName, "", volumeType, true, true)
+	if oldSuffix != "" {
+		oldName = fmt.Sprintf("%s_%s", oldName, oldSuffix)
+	}
+
+	newName := d.getRBDVolumeName(volumeName, "", volumeType, false, true)
+	if newSuffix != "" {
+		newName = fmt.Sprintf("%s_%s", newName, newSuffix)
+	}
+
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"mv",
+		oldName,
+		newName)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// rbdRenameVolume renames a given RBD storage volume
+// Note that this usually requires that the image be unmapped under its original
+// name, then renamed, and finally will be remapped again. If it is not unmapped
+// under its original name and the callers maps it under its new name the image
+// will be mapped twice. This will prevent it from being deleted.
+func (d *ceph) rbdRenameVolume(oldVolumeName string, newVolumeName string, volumeType string) error {
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"mv",
+		d.getRBDVolumeName(oldVolumeName, "", volumeType, false, true),
+		d.getRBDVolumeName(newVolumeName, "", volumeType, false, true))
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// rbdRenameVolumeSnapshot renames a given RBD storage volume
+// Note that if the snapshot is mapped - which it usually shouldn't be - this
+// usually requires that the snapshot be unmapped under its original name, then
+// renamed, and finally will be remapped again. If it is not unmapped under its
+// original name and the caller maps it under its new name the snapshot will be
+// mapped twice. This will prevent it from being deleted.
+func (d *ceph) rbdRenameVolumeSnapshot(volumeName string, volumeType string, oldSnapshotName string, newSnapshotName string) error {
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"snap",
+		"rename",
+		d.getRBDVolumeName(volumeName, oldSnapshotName, volumeType, false, true),
+		d.getRBDVolumeName(volumeName, newSnapshotName, volumeType, false, true))
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// rbdGetVolumeParent will return the snapshot the RBD clone was created
+// from
+// - If the RBD storage volume is not a clone then this function will return
+//   db.NoSuchObjectError.
+// - The snapshot will be returned as
+//   <osd-pool-name>/<rbd-volume-name>@<rbd-snapshot-name>
+//   The caller will usually want to parse this according to its needs. This
+//   helper library provides two small functions to do this but see below.
+func (d *ceph) rbdGetVolumeParent(volumeName string, volumeType string) (string, error) {
+	msg, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"info",
+		d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+	if err != nil {
+		return "", err
+	}
+
+	idx := strings.Index(msg, "parent: ")
+	if idx == -1 {
+		return "", db.ErrNoSuchObject
+	}
+
+	msg = msg[(idx + len("parent: ")):]
+	msg = strings.TrimSpace(msg)
+
+	idx = strings.Index(msg, "\n")
+	if idx == -1 {
+		return "", fmt.Errorf("Unexpected parsing error")
+	}
+
+	msg = msg[:idx]
+	msg = strings.TrimSpace(msg)
+
+	return msg, nil
+}
+
+// rbdDeleteVolumeSnapshot deletes an RBD snapshot
+// This requires that the snapshot does not have any clones and is unmapped and
+// unprotected.
+func (d *ceph) rbdDeleteVolumeSnapshot(volumeName string, volumeType string, snapshotName string) error {
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"snap",
+		"rm",
+		d.getRBDVolumeName(volumeName, snapshotName, volumeType, false, false))
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// rbdCopyVolume copies an RBD storage volume
+// This is a non-sparse copy which doesn't introduce any dependency relationship
+// between the source RBD storage volume and the target RBD storage volume. The
+// operations is similar to creating an empty RBD storage volume and rsyncing
+// the contents of the source RBD storage volume into it.
+func (d *ceph) rbdCopyVolume(oldVolumeName string, newVolumeName string) error {
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"cp",
+		oldVolumeName,
+		newVolumeName)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// rbdListVolumeSnapshots retrieves the snapshots of an RBD storage volume
+// The format of the snapshot names is simply the part after the @. So given a
+// valid RBD path relative to a pool
+// <osd-pool-name>/<rbd-storage-volume>@<rbd-snapshot-name>
+// this will only return
+// <rbd-snapshot-name>
+func (d *ceph) rbdListVolumeSnapshots(volumeName string, volumeType string) ([]string, error) {
+	msg, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--format", "json",
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"snap",
+		"ls",
+		d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+	if err != nil {
+		return []string{}, err
+	}
+
+	var data []map[string]interface{}
+	err = json.Unmarshal([]byte(msg), &data)
+	if err != nil {
+		return []string{}, err
+	}
+
+	snapshots := []string{}
+	for _, v := range data {
+		_, ok := v["name"]
+		if !ok {
+			return []string{}, fmt.Errorf("No \"name\" property found")
+		}
+
+		name, ok := v["name"].(string)
+		if !ok {
+			return []string{}, fmt.Errorf("\"name\" property did not have string type")
+		}
+
+		name = strings.TrimSpace(name)
+		snapshots = append(snapshots, name)
+	}
+
+	if len(snapshots) == 0 {
+		return []string{}, db.ErrNoSuchObject
+	}
+
+	return snapshots, nil
+}
+
+// rbdRestoreVolume restores an RBD storage volume to the state of one of
+// its snapshots
+func (d *ceph) rbdRestoreVolume(volumeName string, volumeType string, snapshotName string) error {
+	_, err := shared.RunCommand(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"snap",
+		"rollback",
+		"--snap", snapshotName,
+		d.getRBDVolumeName(volumeName, "", volumeType, false, false))
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// getRBDSize returns the size the RBD storage volume is supposed to be created
+// with
+func (d *ceph) getRBDSize(vol Volume) (string, error) {
+	size, ok := vol.config["size"]
+	if !ok {
+		size = vol.poolConfig["volume.size"]
+	}
+
+	sz, err := units.ParseByteSizeString(size)
+	if err != nil {
+		return "", err
+	}
+
+	// Safety net: Set to default value.
+	if sz == 0 {
+		sz, _ = units.ParseByteSizeString("10GB")
+	}
+
+	return fmt.Sprintf("%dB", sz), nil
+}
+
+// getRBDFilesystem returns the filesystem the RBD storage volume is supposed to
+// be created with
+func (d *ceph) getRBDFilesystem(vol Volume) string {
+	if vol.config["block.filesystem"] != "" {
+		return vol.config["block.filesystem"]
+	}
+
+	if vol.poolConfig["volume.block.filesystem"] != "" {
+		return vol.poolConfig["volume.block.filesystem"]
+	}
+
+	return "ext4"
+}
+
+// getRBDMountOptions returns the mount options the storage volume is supposed
+// to be mounted with
+// The option string that is returned needs to be passed to the approriate
+// helper (currently named "LXDResolveMountoptions") which will take on the job
+// of splitting it into appropriate flags and string options.
+func (d *ceph) getRBDMountOptions(vol Volume) string {
+	if vol.config["block.mount_options"] != "" {
+		return vol.config["block.mount_options"]
+	}
+
+	if vol.poolConfig["volume.block.mount_options"] != "" {
+		return vol.poolConfig["volume.block.mount_options"]
+	}
+
+	if d.getRBDFilesystem(vol) == "btrfs" {
+		return "user_subvol_rm_allowed,discard"
+	}
+
+	return "discard"
+}
+
+// copyWithoutSnapshotsFull creates a non-sparse copy of a container
+// This does not introduce a dependency relation between the source RBD storage
+// volume and the target RBD storage volume.
+func (d *ceph) copyWithoutSnapshotsFull(source Volume, target Volume) error {
+	targetContainerName := target.name
+	oldVolumeName := d.getRBDVolumeName(source.name, "", string(source.volType), false, true)
+	newVolumeName := d.getRBDVolumeName(target.name, "", string(target.volType), false, true)
+
+	err := d.rbdCopyVolume(oldVolumeName, newVolumeName)
+	if err != nil {
+		return err
+	}
+
+	_, err = d.rbdMapVolume(targetContainerName, string(target.volType))
+	if err != nil {
+		return err
+	}
+
+	// Re-generate the UUID
+	err = d.generateUUID(target)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// copyWithoutSnapshotsFull creates a sparse copy of a container
+// This introduces a dependency relation between the source RBD storage volume
+// and the target RBD storage volume.
+func (d *ceph) copyWithoutSnapshotsSparse(source Volume, target Volume) error {
+	sourceIsSnapshot := source.IsSnapshot()
+	sourceContainerName := source.name
+	targetContainerName := target.name
+	sourceContainerOnlyName := sourceContainerName
+	sourceSnapshotOnlyName := ""
+	snapshotName := fmt.Sprintf("zombie_snapshot_%s",
+		uuid.NewRandom().String())
+
+	if sourceIsSnapshot {
+		sourceContainerOnlyName, sourceSnapshotOnlyName, _ =
+			shared.InstanceGetParentAndSnapshotName(sourceContainerName)
+		snapshotName = fmt.Sprintf("snapshot_%s", sourceSnapshotOnlyName)
+	} else {
+		// create snapshot
+		err := d.rbdCreateVolumeSnapshot(
+			sourceContainerName, string(source.volType),
+			snapshotName)
+		if err != nil {
+			return err
+		}
+	}
+
+	// protect volume so we can create clones of it
+	err := d.rbdProtectVolumeSnapshot(
+		sourceContainerOnlyName, string(source.volType),
+		snapshotName)
+	if err != nil {
+		return err
+	}
+
+	err = d.rbdCreateClone(
+		sourceContainerOnlyName, string(source.volType),
+		snapshotName, targetContainerName,
+		string(target.volType))
+	if err != nil {
+		return err
+	}
+
+	// Re-generate the UUID
+	err = d.generateUUID(target)
+	if err != nil {
+		return err
+	}
+
+	// Create mountpoint
+	// targetContainerMountPoint := driver.GetContainerMountPoint(target.Project(), s.pool.Name, target.Name())
+	// err = driver.CreateContainerMountpoint(targetContainerMountPoint, target.Path(), target.IsPrivileged())
+	// if err != nil {
+	// 	return err
+	// }
+
+	// ourMount, err := target.StorageStart()
+	// if err != nil {
+	// 	return err
+	// }
+	// if ourMount {
+	// 	defer target.StorageStop()
+	// }
+
+	// err = target.DeferTemplateApply("copy")
+	// if err != nil {
+
+	// 	return err
+	// }
+
+	return nil
+}
+
+// copyWithSnapshots creates a non-sparse copy of a container including its
+// snapshots
+// This does not introduce a dependency relation between the source RBD storage
+// volume and the target RBD storage volume.
+func (d *ceph) copyWithSnapshots(sourceVolumeName string,
+	targetVolumeName string, sourceParentSnapshot string) error {
+	args := []string{
+		"export-diff",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		sourceVolumeName,
+	}
+
+	if sourceParentSnapshot != "" {
+		args = append(args, "--from-snap", sourceParentSnapshot)
+	}
+
+	// redirect output to stdout
+	args = append(args, "-")
+
+	rbdSendCmd := exec.Command("rbd", args...)
+	rbdRecvCmd := exec.Command(
+		"rbd",
+		"--id", d.config["ceph.user_name"],
+		"import-diff",
+		"--cluster", d.config["ceph.cluster_name"],
+		"-",
+		targetVolumeName)
+
+	rbdRecvCmd.Stdin, _ = rbdSendCmd.StdoutPipe()
+	rbdRecvCmd.Stdout = os.Stdout
+	rbdRecvCmd.Stderr = os.Stderr
+
+	err := rbdRecvCmd.Start()
+	if err != nil {
+		return err
+	}
+
+	err = rbdSendCmd.Run()
+	if err != nil {
+		return err
+	}
+
+	err = rbdRecvCmd.Wait()
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// deleteVolume deletes the RBD storage volume of a container including
+// any dependencies
+// - This function takes care to delete any RBD storage entities that are marked
+//   as zombie and whose existence is solely dependent on the RBD storage volume
+//   for the container to be deleted.
+// - This function will mark any storage entities of the container to be deleted
+//   as zombies in case any RBD storage entities in the storage pool have a
+//   dependency relation with it.
+// - This function uses a C-style convention to return error or success simply
+//   because it is more elegant and simple than the go way.
+//   The function will return
+//   -1 on error
+//    0 if the RBD storage volume has been deleted
+//    1 if the RBD storage volume has been marked as a zombie
+// - deleteVolume in conjunction with deleteVolumeSnapshot
+//   recurses through an OSD storage pool to find and delete any storage
+//   entities that were kept around because of dependency relations but are not
+//   deletable.
+func (d *ceph) deleteVolume(volumeName string,
+	volumeType string) int {
+	snaps, err := d.rbdListVolumeSnapshots(
+		volumeName, volumeType)
+	if err == nil {
+		var zombies int
+		for _, snap := range snaps {
+			ret := d.deleteVolumeSnapshot(volumeName, volumeType, snap)
+			if ret < 0 {
+				return -1
+			} else if ret == 1 {
+				zombies++
+			}
+		}
+
+		if zombies > 0 {
+			// unmap
+			err = d.rbdUnmapVolume(
+				volumeName, volumeType, true)
+			if err != nil {
+				return -1
+			}
+
+			if strings.HasPrefix(volumeName, "zombie_") ||
+				strings.HasPrefix(volumeType, "zombie_") {
+				return 1
+			}
+
+			newVolumeName := fmt.Sprintf("%s_%s", volumeName,
+				uuid.NewRandom().String())
+			err := d.rbdMarkVolumeDeleted(
+				volumeType, volumeName, newVolumeName,
+				"")
+			if err != nil {
+				return -1
+			}
+
+			return 1
+		}
+	} else {
+		if err != db.ErrNoSuchObject {
+			return -1
+		}
+
+		parent, err := d.rbdGetVolumeParent(
+			volumeName, volumeType)
+		if err == nil {
+			_, parentVolumeType, parentVolumeName,
+				parentSnapshotName, err := d.parseParent(parent)
+			if err != nil {
+				return -1
+			}
+
+			// unmap
+			err = d.rbdUnmapVolume(
+				volumeName, volumeType, true)
+			if err != nil {
+				return -1
+			}
+
+			// delete
+			err = d.rbdDeleteVolume(
+				volumeName, volumeType)
+			if err != nil {
+				return -1
+			}
+
+			// Only delete the parent snapshot of the container if
+			// it is a zombie. If it is not we know that LXD is
+			// still using it.
+			if strings.HasPrefix(parentVolumeType, "zombie_") ||
+				strings.HasPrefix(parentSnapshotName, "zombie_") {
+				ret := d.deleteVolumeSnapshot(
+					parentVolumeName,
+					parentVolumeType, parentSnapshotName)
+				if ret < 0 {
+					return -1
+				}
+			}
+		} else {
+			if err != db.ErrNoSuchObject {
+				return -1
+			}
+
+			// unmap
+			err = d.rbdUnmapVolume(
+				volumeName, volumeType, true)
+			if err != nil {
+				return -1
+			}
+
+			// delete
+			err = d.rbdDeleteVolume(
+				volumeName, volumeType)
+			if err != nil {
+				return -1
+			}
+		}
+	}
+
+	return 0
+}
+
+// deleteVolumeSnapshot deletes an RBD snapshot of a container including
+// any dependencies
+// - This function takes care to delete any RBD storage entities that are marked
+//   as zombie and whose existence is solely dependent on the RBD snapshot for
+//   the container to be deleted.
+// - This function will mark any storage entities of the container to be deleted
+//   as zombies in case any RBD storage entities in the storage pool have a
+//   dependency relation with it.
+// - This function uses a C-style convention to return error or success simply
+//   because it is more elegant and simple than the go way.
+//   The function will return
+//   -1 on error
+//    0 if the RBD storage volume has been deleted
+//    1 if the RBD storage volume has been marked as a zombie
+// - deleteVolumeSnapshot in conjunction with deleteVolume
+//   recurses through an OSD storage pool to find and delete any storage
+//   entities that were kept around because of dependency relations but are not
+//   deletable.
+func (d *ceph) deleteVolumeSnapshot(
+	volumeName string, volumeType string, snapshotName string) int {
+	clones, err := d.rbdListSnapshotClones(
+		volumeName, volumeType, snapshotName)
+	if err != nil {
+		if err != db.ErrNoSuchObject {
+			return -1
+		}
+
+		// unprotect
+		err = d.rbdUnprotectVolumeSnapshot(volumeName,
+			volumeType, snapshotName)
+		if err != nil {
+			return -1
+		}
+
+		// unmap
+		err = d.rbdUnmapVolumeSnapshot(
+			volumeName, volumeType, snapshotName, true)
+		if err != nil {
+			return -1
+		}
+
+		// delete
+		err = d.rbdDeleteVolumeSnapshot(volumeName,
+			volumeType, snapshotName)
+		if err != nil {
+			return -1
+		}
+
+		// Only delete the parent image if it is a zombie. If it is not
+		// we know that LXD is still using it.
+		if strings.HasPrefix(volumeType, "zombie_") {
+			ret := d.deleteVolume(
+				volumeName, volumeType)
+			if ret < 0 {
+				return -1
+			}
+		}
+
+		return 0
+	}
+
+	canDelete := true
+	for _, clone := range clones {
+		_, cloneType, cloneName, err := d.parseClone(clone)
+		if err != nil {
+			return -1
+		}
+
+		if !strings.HasPrefix(cloneType, "zombie_") {
+			canDelete = false
+			continue
+		}
+
+		ret := d.deleteVolume(
+			cloneName, cloneType)
+		if ret < 0 {
+			return -1
+		} else if ret == 1 {
+			// Only marked as zombie
+			canDelete = false
+		}
+	}
+
+	if canDelete {
+		// unprotect
+		err = d.rbdUnprotectVolumeSnapshot(
+			volumeName, volumeType, snapshotName)
+		if err != nil {
+			return -1
+		}
+
+		// unmap
+		err = d.rbdUnmapVolumeSnapshot(
+			volumeName, volumeType, snapshotName,
+			true)
+		if err != nil {
+			return -1
+		}
+
+		// delete
+		err = d.rbdDeleteVolumeSnapshot(
+			volumeName, volumeType, snapshotName)
+		if err != nil {
+			return -1
+		}
+
+		// Only delete the parent image if it is a zombie. If it
+		// is not we know that LXD is still using it.
+		if strings.HasPrefix(volumeType, "zombie_") {
+			ret := d.deleteVolume(
+				volumeName, volumeType)
+			if ret < 0 {
+				return -1
+			}
+		}
+	} else {
+		if strings.HasPrefix(snapshotName, "zombie_") {
+			return 1
+		}
+
+		err := d.rbdUnmapVolumeSnapshot(
+			volumeName, volumeType, snapshotName,
+			true)
+		if err != nil {
+			return -1
+		}
+
+		newSnapshotName := fmt.Sprintf("zombie_%s", snapshotName)
+
+		err = d.rbdRenameVolumeSnapshot(
+			volumeName, volumeType, snapshotName,
+			newSnapshotName)
+		if err != nil {
+			return -1
+		}
+	}
+
+	return 1
+}
+
+// parseParent splits a string describing a RBD storage entity into its
+// components
+// This can be used on strings like
+// <osd-pool-name>/<lxd-specific-prefix>_<rbd-storage-volume>@<rbd-snapshot-name>
+// and will split it into
+// <osd-pool-name>, <rbd-storage-volume>, <lxd-specific-prefix>, <rbdd-snapshot-name>
+func (d *ceph) parseParent(parent string) (string, string, string, string, error) {
+	idx := strings.Index(parent, "/")
+	if idx == -1 {
+		return "", "", "", "", fmt.Errorf("Unexpected parsing error")
+	}
+	slider := parent[(idx + 1):]
+	poolName := parent[:idx]
+
+	volumeType := slider
+	idx = strings.Index(slider, "zombie_")
+	if idx == 0 {
+		idx += len("zombie_")
+		volumeType = slider
+		slider = slider[idx:]
+	}
+
+	idxType := strings.Index(slider, "_")
+	if idxType == -1 {
+		return "", "", "", "", fmt.Errorf("Unexpected parsing error")
+	}
+
+	if idx == len("zombie_") {
+		idxType += idx
+	}
+	volumeType = volumeType[:idxType]
+
+	idx = strings.Index(slider, "_")
+	if idx == -1 {
+		return "", "", "", "", fmt.Errorf("Unexpected parsing error")
+	}
+
+	volumeName := slider
+	idx = strings.Index(volumeName, "_")
+	if idx == -1 {
+		return "", "", "", "", fmt.Errorf("Unexpected parsing error")
+	}
+	volumeName = volumeName[(idx + 1):]
+
+	idx = strings.Index(volumeName, "@")
+	if idx == -1 {
+		return "", "", "", "", fmt.Errorf("Unexpected parsing error")
+	}
+	snapshotName := volumeName[(idx + 1):]
+	volumeName = volumeName[:idx]
+
+	return poolName, volumeType, volumeName, snapshotName, nil
+}
+
+// parseClone splits a strings describing an RBD storage volume
+// For example a string like
+// <osd-pool-name>/<lxd-specific-prefix>_<rbd-storage-volume>
+// will be split into
+// <osd-pool-name>, <lxd-specific-prefix>, <rbd-storage-volume>
+func (d *ceph) parseClone(clone string) (string, string, string, error) {
+	idx := strings.Index(clone, "/")
+	if idx == -1 {
+		return "", "", "", fmt.Errorf("Unexpected parsing error")
+	}
+	slider := clone[(idx + 1):]
+	poolName := clone[:idx]
+
+	volumeType := slider
+	idx = strings.Index(slider, "zombie_")
+	if idx == 0 {
+		idx += len("zombie_")
+		volumeType = slider
+		slider = slider[idx:]
+	}
+
+	idxType := strings.Index(slider, "_")
+	if idxType == -1 {
+		return "", "", "", fmt.Errorf("Unexpected parsing error")
+	}
+
+	if idx == len("zombie_") {
+		idxType += idx
+	}
+	volumeType = volumeType[:idxType]
+
+	idx = strings.Index(slider, "_")
+	if idx == -1 {
+		return "", "", "", fmt.Errorf("Unexpected parsing error")
+	}
+
+	volumeName := slider
+	idx = strings.Index(volumeName, "_")
+	if idx == -1 {
+		return "", "", "", fmt.Errorf("Unexpected parsing error")
+	}
+	volumeName = volumeName[(idx + 1):]
+
+	return poolName, volumeType, volumeName, nil
+}
+
+// getRBDMappedDevPath looks at sysfs to retrieve the device path
+// "/dev/rbd<idx>" for an RBD image. If it doesn't find it it will map it if
+// told to do so.
+func (d *ceph) getRBDMappedDevPath(poolName string, volumeName string, volumeType string,
+	doMap bool) (string, int) {
+	files, err := ioutil.ReadDir("/sys/devices/rbd")
+	if err != nil {
+		if os.IsNotExist(err) {
+			if doMap {
+				goto mapImage
+			}
+
+			return "", 0
+		}
+
+		return "", -1
+	}
+
+	for _, f := range files {
+		if !f.IsDir() {
+			continue
+		}
+
+		fName := f.Name()
+		idx, err := strconv.ParseUint(fName, 10, 64)
+		if err != nil {
+			continue
+		}
+
+		tmp := fmt.Sprintf("/sys/devices/rbd/%s/pool", fName)
+		contents, err := ioutil.ReadFile(tmp)
+		if err != nil {
+			if os.IsNotExist(err) {
+				continue
+			}
+
+			return "", -1
+		}
+
+		detectedPoolName := strings.TrimSpace(string(contents))
+		if detectedPoolName != poolName {
+			continue
+		}
+
+		tmp = fmt.Sprintf("/sys/devices/rbd/%s/name", fName)
+		contents, err = ioutil.ReadFile(tmp)
+		if err != nil {
+			if os.IsNotExist(err) {
+				continue
+			}
+
+			return "", -1
+		}
+
+		typedVolumeName := fmt.Sprintf("%s_%s", volumeType, volumeName)
+		detectedVolumeName := strings.TrimSpace(string(contents))
+		if detectedVolumeName != typedVolumeName {
+			continue
+		}
+
+		tmp = fmt.Sprintf("/sys/devices/rbd/%s/snap", fName)
+		contents, err = ioutil.ReadFile(tmp)
+		if err != nil {
+			if os.IsNotExist(err) {
+				return fmt.Sprintf("/dev/rbd%d", idx), 1
+			}
+
+			return "", -1
+		}
+
+		detectedSnapName := strings.TrimSpace(string(contents))
+		if detectedSnapName != "-" {
+			continue
+		}
+
+		return fmt.Sprintf("/dev/rbd%d", idx), 1
+	}
+
+	if !doMap {
+		return "", 0
+	}
+
+mapImage:
+	devPath, err := d.rbdMapVolume(volumeName, volumeType)
+	if err != nil {
+		return "", -1
+	}
+
+	return strings.TrimSpace(devPath), 2
+}
+
+func (d *ceph) rbdShrink(path string, size int64, fsType string,
+	vol Volume) error {
+	var msg string
+
+	err := shrinkFileSystem(fsType, path, vol, size)
+	if err != nil {
+		return err
+	}
+
+	msg, err = shared.TryRunCommand(
+		"rbd",
+		"resize",
+		"--allow-shrink",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"--size", fmt.Sprintf("%dM", (size/1024/1024)),
+		d.getRBDVolumeName(vol.name, "", string(vol.volType), false, false))
+	if err != nil {
+		return fmt.Errorf(`Could not shrink RBD storage volume "%s":
+			%s`, path, msg)
+	}
+
+	return nil
+}
+
+func (d *ceph) rbdGrow(path string, size int64, fsType string,
+	vol Volume) error {
+
+	// Grow the block device
+	msg, err := shared.TryRunCommand(
+		"rbd",
+		"resize",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		"--pool", d.config["ceph.osd.pool_name"],
+		"--size", fmt.Sprintf("%dM", (size/1024/1024)),
+		d.getRBDVolumeName(vol.name, "", string(vol.volType), false, false))
+	if err != nil {
+		return fmt.Errorf(`Could not extend RBD storage volume "%s":
+			%s`, path, msg)
+	}
+
+	// Grow the filesystem
+	return growFileSystem(fsType, path, vol)
+}
+
+func (d *ceph) rbdExportVolumeToFile(sourceVolumeName string, file string) error {
+	args := []string{
+		"export",
+		"--id", d.config["ceph.user_name"],
+		"--cluster", d.config["ceph.cluster_name"],
+		sourceVolumeName,
+		file,
+	}
+
+	rbdSendCmd := exec.Command("rbd", args...)
+	err := rbdSendCmd.Run()
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) rbdCreateVolumeBackup(vol Volume, targetPath string) error {
+	sourceIsSnapshot := vol.IsSnapshot()
+	sourceContainerName := vol.name
+	sourceContainerOnlyName := vol.name
+	sourceSnapshotOnlyName := ""
+
+	// Prepare for rsync
+	rsync := func(oldPath string, newPath string, bwlimit string) error {
+		output, err := rsync.LocalCopy(oldPath, newPath, bwlimit, true)
+		if err != nil {
+			return fmt.Errorf("Failed to rsync: %s: %s", string(output), err)
+		}
+
+		return nil
+	}
+
+	bwlimit := vol.poolConfig["rsync.bwlimit"]
+	// Create a temporary snapshot
+	snapshotName := fmt.Sprintf("zombie_snapshot_%s", uuid.NewRandom().String())
+	if sourceIsSnapshot {
+		sourceContainerOnlyName, sourceSnapshotOnlyName, _ = shared.InstanceGetParentAndSnapshotName(sourceContainerName)
+		snapshotName = fmt.Sprintf("snapshot_%s", sourceSnapshotOnlyName)
+	} else {
+		// This is costly but we need to ensure that all cached data has
+		// been committed to disk. If we don't then the rbd snapshot of
+		// the underlying filesystem can be inconsistent or - worst case
+		// - empty.
+		unix.Sync()
+
+		// create snapshot
+		err := d.rbdCreateVolumeSnapshot(sourceContainerOnlyName, string(vol.volType), snapshotName)
+		if err != nil {
+			return err
+		}
+		defer d.rbdDeleteVolumeSnapshot(sourceContainerOnlyName, string(vol.volType), snapshotName)
+	}
+
+	// Protect volume so we can create clones of it
+	err := d.rbdProtectVolumeSnapshot(sourceContainerOnlyName, string(vol.volType), snapshotName)
+	if err != nil {
+		return err
+	}
+	defer d.rbdUnprotectVolumeSnapshot(sourceContainerOnlyName, string(vol.volType), snapshotName)
+
+	// Create a new volume from the snapshot
+	cloneName := uuid.NewRandom().String()
+	err = d.rbdCreateClone(sourceContainerOnlyName, string(vol.volType), snapshotName, cloneName, "backup")
+	if err != nil {
+		return err
+	}
+	defer d.rbdDeleteVolume(cloneName, "backup")
+
+	// Map the new volume
+	RBDDevPath, err := d.rbdMapVolume(cloneName, "backup")
+	if err != nil {
+		return err
+	}
+	defer d.rbdUnmapVolume(cloneName, "backup", true)
+
+	// Generate a new UUID if needed
+	RBDFilesystem := d.getRBDFilesystem(vol)
+
+	err = regenerateFilesystemUUID(RBDFilesystem, RBDDevPath)
+	if err != nil {
+		return err
+	}
+
+	// Create a temporary mountpoing
+	tmpContainerMntPoint, err := ioutil.TempDir("", "lxd_backup_")
+	if err != nil {
+		return err
+	}
+	defer os.RemoveAll(tmpContainerMntPoint)
+
+	err = os.Chmod(tmpContainerMntPoint, 0100)
+	if err != nil {
+		return err
+	}
+
+	// Mount the volume
+	mountFlags, mountOptions := resolveMountOptions(d.getRBDMountOptions(vol))
+	err = TryMount(RBDDevPath, tmpContainerMntPoint, RBDFilesystem, mountFlags, mountOptions)
+	if err != nil {
+		return err
+	}
+
+	defer TryUnmount(tmpContainerMntPoint, unix.MNT_DETACH)
+
+	// Figure out the target name
+	targetName := sourceContainerName
+	if sourceIsSnapshot {
+		_, targetName, _ = shared.InstanceGetParentAndSnapshotName(sourceContainerName)
+	}
+
+	// Create the path for the backup.
+	targetBackupMntPoint := fmt.Sprintf("%s/container", targetPath)
+	if sourceIsSnapshot {
+		targetBackupMntPoint = fmt.Sprintf("%s/snapshots/%s", targetPath, targetName)
+	}
+
+	err = os.MkdirAll(targetBackupMntPoint, 0711)
+	if err != nil {
+		return err
+	}
+
+	err = rsync(tmpContainerMntPoint, targetBackupMntPoint, bwlimit)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) createVolume(vol Volume, filler *VolumeFiller, op *operations.Operation) error {
+	revert := true
+
+	// get size
+	RBDSize, err := d.getRBDSize(vol)
+	if err != nil {
+
+		return err
+	}
+
+	// create volume
+	err = d.rbdCreateVolume(vol.name, string(vol.volType), RBDSize)
+	if err != nil {
+		return err
+	}
+
+	defer func() {
+		if !revert {
+			return
+		}
+
+		err := d.rbdDeleteVolume(vol.name, string(vol.volType))
+		if err != nil {
+
+		}
+	}()
+
+	RBDDevPath, err := d.rbdMapVolume(vol.name, string(vol.volType))
+	if err != nil {
+		return err
+	}
+
+	defer func() {
+		if !revert {
+			return
+		}
+
+		d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+	}()
+
+	// get filesystem
+	RBDFilesystem := d.getRBDFilesystem(vol)
+	_, err = makeFSType(RBDDevPath, RBDFilesystem, nil)
+	if err != nil {
+		return err
+	}
+
+	_, err = d.MountVolume(vol, op)
+	if err != nil {
+		return err
+	}
+	defer d.UnmountVolume(vol, op)
+
+	// Run the volume filler function if supplied.
+	if filler != nil && filler.Fill != nil {
+		err = filler.Fill(vol.MountPath(), "")
+		if err != nil {
+			return err
+		}
+	}
+
+	revert = false
+	return nil
+}
+
+// generateUUID regenerates the XFS/btrfs UUID as needed
+func (d *ceph) generateUUID(vol Volume) error {
+	fsType := d.getRBDFilesystem(vol)
+
+	if !renegerateFilesystemUUIDNeeded(fsType) {
+		return nil
+	}
+
+	// Map the RBD volume
+	RBDDevPath, err := d.rbdMapVolume(vol.name, string(vol.volType))
+	if err != nil {
+		return err
+	}
+	defer d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+
+	// Update the UUID
+	err = regenerateFilesystemUUID(fsType, RBDDevPath)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) getRBDVolumeName(volName string, snapName string, volType string, zombie bool, withPoolName bool) string {
+	out := ""
+	parentName, snapshotName, isSnapshot := shared.InstanceGetParentAndSnapshotName(volName)
+
+	// We use this map as values VolumeType* and StoragePoolVolumeType* differ,
+	// e.g. containers vs. container. This needs to be handled correctly.
+	volumeTypeMap := map[string]string{
+		string(VolumeTypeContainer): db.StoragePoolVolumeTypeNameContainer,
+		string(VolumeTypeCustom):    db.StoragePoolVolumeTypeNameCustom,
+		string(VolumeTypeImage):     db.StoragePoolVolumeTypeNameImage,
+		string(VolumeTypeVM):        db.StoragePoolVolumeTypeNameVM,
+	}
+
+	volumeType, ok := volumeTypeMap[volType]
+	if !ok {
+		volumeType = volType
+	}
+
+	if snapName != "" {
+		// Always use the provided snapshot name if specified.
+		out = fmt.Sprintf("%s_%s@%s", volumeType, parentName, snapName)
+	} else {
+		if isSnapshot {
+			// If volumeName is a snapshot (<vol>/<snap>) and snapName is not set,
+			// assue that it's a normal snapshot (not a zombie) and prefix it with
+			// "snapshot_".
+			out = fmt.Sprintf("%s_%s at snapshot_%s", volumeType, parentName, snapshotName)
+		} else {
+			out = fmt.Sprintf("%s_%s", volumeType, parentName)
+		}
+	}
+
+	// If the volume is to be in zombie state (i.e. not tracked by the LXD database),
+	// prefix the output with "zombie_".
+	if zombie {
+		out = fmt.Sprintf("zombie_%s", out)
+	}
+
+	// If needed, the output will be prefixed with the pool name, e.g.
+	// <pool>/<type>_<volname>@<snapname>.
+	if withPoolName {
+		out = fmt.Sprintf("%s/%s", d.config["ceph.osd.pool_name"], out)
+	}
+
+	return out
+}
+
+func (d *ceph) createVolumeFromImage(srcVol Volume, vol Volume, op *operations.Operation) error {
+	revert := true
+
+	err := d.rbdCreateClone(srcVol.name, string(srcVol.volType), "readonly", vol.name, string(vol.volType))
+	if err != nil {
+		return err
+	}
+
+	defer func() {
+		if revert {
+			d.deleteVolume(vol.name, string(vol.volType))
+		}
+	}()
+
+	err = d.generateUUID(vol)
+	if err != nil {
+		return err
+	}
+
+	err = d.SetVolumeQuota(vol, srcVol.config["size"], op)
+	if err != nil {
+		return err
+	}
+
+	ourMount, err := d.MountVolume(vol, op)
+	if err != nil {
+		return err
+	}
+
+	if ourMount {
+		defer d.UnmountVolume(vol, op)
+	}
+
+	revert = false
+	return nil
+}
+
+func (d *ceph) createImage(vol Volume, filler *VolumeFiller, op *operations.Operation) error {
+	revert := true
+	prefixedType := fmt.Sprintf("zombie_image_%s", vol.config["block.filesystem"])
+
+	// Check if we have a zombie image. If so, restore it otherwise
+	// create a new image volume.
+	exists := d.rbdVolumeExists(vol.name, prefixedType)
+
+	if !exists {
+		err := d.createVolume(vol, filler, op)
+		if err != nil {
+			return err
+		}
+		defer func() {
+			if revert {
+				d.deleteVolume(vol.name, string(vol.volType))
+			}
+		}()
+
+		err = d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+		if err != nil {
+			return err
+		}
+
+		err = d.rbdCreateVolumeSnapshot(vol.name, string(vol.volType), "readonly")
+		if err != nil {
+			return err
+		}
+		defer func() {
+			if revert {
+				d.deleteVolumeSnapshot(vol.name, string(vol.volType), "readonly")
+			}
+		}()
+
+		err = d.rbdProtectVolumeSnapshot(vol.name, string(vol.volType), "readonly")
+		if err != nil {
+			return err
+		}
+	} else {
+		// unmark deleted
+		err := d.rbdUnmarkVolumeDeleted(vol.name, string(vol.volType), vol.config["block.filesystem"], "")
+		if err != nil {
+			return fmt.Errorf(`Failed to unmark RBD storage volume for image "%s" on storage pool "%s" as zombie: %s`, vol.name, d.config["ceph.osd.pool_name"], err)
+		}
+		defer func() {
+			if !revert {
+				return
+			}
+
+			d.rbdMarkVolumeDeleted(vol.name, vol.name, string(vol.volType), vol.config["block.filesystem"])
+		}()
+	}
+
+	revert = false
+	return nil
+}
+
+// Let's say we want to send the a container "a" including snapshots "snap0" and
+// "snap1" on storage pool "pool1" from LXD "l1" to LXD "l2" on storage pool
+// "pool2":
+//
+// The pool layout on "l1" would be:
+//	pool1/container_a
+//	pool1/container_a at snapshot_snap0
+//	pool1/container_a at snapshot_snap1
+//
+// Then we need to send:
+//	rbd export-diff pool1/container_a at snapshot_snap0 - | rbd import-diff - pool2/container_a
+// (Note that pool2/container_a must have been created by the receiving LXD
+// instance before.)
+//	rbd export-diff pool1/container_a at snapshot_snap1 --from-snap snapshot_snap0 - | rbd import-diff - pool2/container_a
+//	rbd export-diff pool1/container_a --from-snap snapshot_snap1 - | rbd import-diff - pool2/container_a
+func (d *ceph) sendVolume(conn io.ReadWriteCloser, volumeName string, volumeParentName string, tracker *ioprogress.ProgressTracker) error {
+	args := []string{
+		"export-diff",
+		"--cluster", d.config["ceph.cluster_name"],
+		volumeName,
+	}
+
+	if volumeParentName != "" {
+		args = append(args, "--from-snap", volumeParentName)
+	}
+
+	// redirect output to stdout
+	args = append(args, "-")
+
+	cmd := exec.Command("rbd", args...)
+
+	stdout, err := cmd.StdoutPipe()
+	if err != nil {
+		return err
+	}
+
+	stderr, err := cmd.StderrPipe()
+	if err != nil {
+		return err
+	}
+
+	// Setup progress tracker.
+	stdoutPipe := stdout
+	if tracker != nil {
+		stdoutPipe = &ioprogress.ProgressReader{
+			ReadCloser: stdout,
+			Tracker:    tracker,
+		}
+	}
+
+	// Forward any output on stdout.
+	chStdoutPipe := make(chan error, 1)
+	go func() {
+		_, err := io.Copy(conn, stdoutPipe)
+		chStdoutPipe <- err
+		conn.Close()
+	}()
+
+	err = cmd.Start()
+	if err != nil {
+		return err
+	}
+
+	output, _ := ioutil.ReadAll(stderr)
+
+	// Handle errors.
+	errs := []error{}
+	chStdoutPipeErr := <-chStdoutPipe
+
+	err = cmd.Wait()
+	if err != nil {
+		errs = append(errs, err)
+
+		if chStdoutPipeErr != nil {
+			errs = append(errs, chStdoutPipeErr)
+		}
+	}
+
+	if len(errs) > 0 {
+		return fmt.Errorf("ceph export-diff failed: %v (%s)", errs, string(output))
+	}
+
+	return nil
+}
+
+func (d *ceph) receiveVolume(volumeName string, conn io.ReadWriteCloser, writeWrapper func(io.WriteCloser) io.WriteCloser) error {
+	args := []string{
+		"import-diff",
+		"--cluster", d.config["ceph.cluster_name"],
+		"-",
+		volumeName,
+	}
+
+	cmd := exec.Command("rbd", args...)
+
+	stdin, err := cmd.StdinPipe()
+	if err != nil {
+		return err
+	}
+
+	stderr, err := cmd.StderrPipe()
+	if err != nil {
+		return err
+	}
+
+	// Forward input through stdin.
+	chCopyConn := make(chan error, 1)
+	go func() {
+		_, err = io.Copy(stdin, conn)
+		stdin.Close()
+		chCopyConn <- err
+	}()
+
+	// Run the command.
+	err = cmd.Start()
+	if err != nil {
+		return err
+	}
+
+	// Read any error.
+	output, _ := ioutil.ReadAll(stderr)
+
+	// Handle errors.
+	errs := []error{}
+	chCopyConnErr := <-chCopyConn
+
+	err = cmd.Wait()
+	if err != nil {
+		errs = append(errs, err)
+
+		if chCopyConnErr != nil {
+			errs = append(errs, chCopyConnErr)
+		}
+	}
+
+	if len(errs) > 0 {
+		return fmt.Errorf("Problem with ceph import-diff: (%v) %s", errs, string(output))
+	}
+
+	return nil
+}
+
+func (d *ceph) deleteImage(vol Volume, op *operations.Operation) error {
+	// Try to umount but don't fail
+	d.UnmountVolume(vol, op)
+
+	// Check if image has dependant snapshots
+	_, err := d.rbdListSnapshotClones(vol.name, string(vol.volType), "readonly")
+	if err != nil {
+		if err != db.ErrNoSuchObject {
+			return err
+		}
+
+		// Unprotect snapshot
+		err = d.rbdUnprotectVolumeSnapshot(vol.name, string(vol.volType), "readonly")
+		if err != nil {
+			return err
+		}
+
+		// Delete snapshots
+		err = d.rbdPurgeVolumeSnapshots(vol.name, string(vol.volType))
+		if err != nil {
+			return err
+		}
+
+		// Unmap image
+		err = d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+		if err != nil {
+			return err
+		}
+
+		// Delete image
+		err = d.rbdDeleteVolume(vol.name, string(vol.volType))
+		if err != nil {
+			return err
+		}
+	} else {
+		err = d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+		if err != nil {
+			return err
+		}
+
+		err = d.rbdMarkVolumeDeleted(string(vol.volType), vol.name, vol.name, vol.config["block.filesystem"])
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
diff --git a/lxd/storage/drivers/driver_ceph_volumes.go b/lxd/storage/drivers/driver_ceph_volumes.go
new file mode 100644
index 0000000000..aa0bf4e5e2
--- /dev/null
+++ b/lxd/storage/drivers/driver_ceph_volumes.go
@@ -0,0 +1,857 @@
+package drivers
+
+import (
+	"fmt"
+	"io"
+	"os"
+	"strings"
+
+	"github.com/lxc/lxd/lxd/db"
+	"github.com/lxc/lxd/lxd/migration"
+	"github.com/lxc/lxd/lxd/operations"
+	"github.com/lxc/lxd/shared"
+	"github.com/lxc/lxd/shared/ioprogress"
+	"github.com/lxc/lxd/shared/logger"
+	"github.com/lxc/lxd/shared/units"
+	"github.com/pborman/uuid"
+	"golang.org/x/sys/unix"
+)
+
+func (d *ceph) HasVolume(vol Volume) bool {
+	return d.rbdVolumeExists(vol.name, string(vol.volType))
+}
+
+func (d *ceph) ValidateVolume(vol Volume, removeUnknownKeys bool) error {
+	rules := map[string]func(value string) error{
+		"block.filesystem":    shared.IsAny,
+		"block.mount_options": shared.IsAny,
+	}
+
+	return d.validateVolume(vol, rules, removeUnknownKeys)
+}
+
+func (d *ceph) CreateVolume(vol Volume, filler *VolumeFiller, op *operations.Operation) error {
+	if vol.volType == VolumeTypeImage {
+		err := d.createImage(vol, filler, op)
+		if err != nil {
+			return err
+		}
+
+		return nil
+	}
+
+	err := d.createVolume(vol, filler, op)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) CreateVolumeFromCopy(vol Volume, srcVol Volume, copySnapshots bool, op *operations.Operation) error {
+	var err error
+	snapshots := []string{}
+
+	if !srcVol.IsSnapshot() && copySnapshots {
+		snapshots, err = d.VolumeSnapshots(srcVol, op)
+		if err != nil {
+			return err
+		}
+	}
+
+	revert := true
+
+	if srcVol.volType == VolumeTypeImage {
+		err := d.createVolumeFromImage(srcVol, vol, op)
+		if err != nil {
+			return err
+		}
+
+		return nil
+	}
+
+	// Copy without snapshots
+	if !copySnapshots || len(snapshots) == 0 {
+		if d.config["ceph.rbd.clone_copy"] != "" &&
+			!shared.IsTrue(d.config["ceph.rbd.clone_copy"]) {
+			err = d.copyWithoutSnapshotsFull(srcVol, vol)
+		} else {
+			err = d.copyWithoutSnapshotsSparse(srcVol, vol)
+		}
+		if err != nil {
+			return err
+		}
+
+		ourMount, err := d.MountVolume(vol, op)
+		if err != nil {
+			return err
+		}
+
+		if ourMount {
+			defer d.UnmountVolume(vol, op)
+		}
+
+		revert = false
+		return nil
+	}
+
+	// Copy with snapshots
+	// create empty dummy volume
+	err = d.rbdCreateVolume(vol.name, string(vol.volType), "0")
+	if err != nil {
+		logger.Errorf("Failed to create RBD volume %q on OSD pool %q: %s", vol.name, d.config["ceph.osd.pool_name"], err)
+		return err
+	}
+
+	defer func() {
+		if !revert {
+			return
+		}
+
+		err := d.rbdDeleteVolume(vol.name, string(vol.volType))
+		if err != nil {
+			logger.Warnf("Failed to delete RBD volume %q on OSD pool %q: %s", vol.name, d.config["ceph.osd.pool_name"], err)
+		}
+	}()
+
+	// receive over the dummy volume we created above
+	targetVolumeName := d.getRBDVolumeName(vol.name, "", string(vol.volType), false, true)
+
+	lastSnap := ""
+
+	if len(snapshots) > 0 {
+		err := createParentSnapshotDirIfMissing(d.name, vol.volType, vol.name)
+		if err != nil {
+			return err
+		}
+	}
+
+	for i, snap := range snapshots {
+		prev := ""
+		if i > 0 {
+			prev = fmt.Sprintf("snapshot_%s", snapshots[i-1])
+		}
+
+		lastSnap = fmt.Sprintf("snapshot_%s", snap)
+		sourceVolumeName := d.getRBDVolumeName(srcVol.name, lastSnap, string(srcVol.volType), false, true)
+
+		err = d.copyWithSnapshots(
+			sourceVolumeName,
+			targetVolumeName,
+			prev)
+		if err != nil {
+			return fmt.Errorf("Failed to copy RBD volume %q to %q", sourceVolumeName, targetVolumeName)
+		}
+		logger.Debugf(`Copied RBD container storage %s to %s`,
+			sourceVolumeName, targetVolumeName)
+
+		defer func() {
+			if !revert {
+				return
+			}
+
+			err := d.rbdDeleteVolumeSnapshot(vol.name, snap, string(vol.volType))
+			if err != nil {
+				logger.Warnf("Failed to delete RBD volume snapshot %s/%s", vol.name, snap)
+			}
+		}()
+
+		snapVol, err := vol.NewSnapshot(snap)
+		if err != nil {
+			return err
+		}
+
+		err = snapVol.EnsureMountPath()
+		if err != nil {
+			return err
+		}
+	}
+
+	// copy snapshot
+	sourceVolumeName := d.getRBDVolumeName(srcVol.name, "", string(srcVol.volType), false, true)
+
+	err = d.copyWithSnapshots(
+		sourceVolumeName,
+		targetVolumeName,
+		lastSnap)
+	if err != nil {
+		return err
+	}
+
+	// Re-generate the UUID
+	err = d.generateUUID(vol)
+	if err != nil {
+		return err
+	}
+
+	ourMount, err := d.MountVolume(vol, op)
+	if err != nil {
+		return err
+	}
+
+	if ourMount {
+		defer d.UnmountVolume(vol, op)
+	}
+
+	revert = false
+
+	return nil
+}
+
+func (d *ceph) RefreshVolume(vol Volume, srcVol Volume, srcSnapshots []Volume, op *operations.Operation) error {
+	return genericCopyVolume(d, nil, vol, srcVol, srcSnapshots, true, op)
+}
+
+func (d *ceph) DeleteVolume(vol Volume, op *operations.Operation) error {
+	if vol.volType == VolumeTypeImage {
+		err := d.deleteImage(vol, op)
+		if err != nil {
+			return err
+		}
+	} else {
+		_, err := d.UnmountVolume(vol, op)
+		if err != nil {
+			return err
+		}
+
+		if !d.rbdVolumeExists(vol.name, string(vol.volType)) {
+			return nil
+		}
+
+		ret := d.deleteVolume(vol.name, string(vol.volType))
+		if ret < 0 {
+			return fmt.Errorf("Failed to delete volume")
+		}
+	}
+
+	err := wipeDirectory(vol.MountPath())
+	if err != nil {
+		return err
+	}
+
+	err = os.Remove(vol.MountPath())
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) RenameVolume(vol Volume, newName string, op *operations.Operation) error {
+	revert := true
+
+	_, err := d.UnmountVolume(vol, op)
+	if err != nil {
+		return err
+	}
+
+	err = d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+	if err != nil {
+		return nil
+	}
+
+	defer func() {
+		if !revert {
+			return
+		}
+
+		_, err := d.rbdMapVolume(vol.name, string(vol.volType))
+		if err != nil {
+			logger.Warnf("Failed to map RBD volume %q", vol.name)
+		}
+	}()
+
+	err = d.rbdRenameVolume(vol.name, newName, string(vol.volType))
+	if err != nil {
+		return err
+	}
+
+	defer func() {
+		if !revert {
+			return
+		}
+
+		d.rbdRenameVolume(newName, vol.name, string(vol.volType))
+	}()
+
+	_, err = d.rbdMapVolume(newName, string(vol.volType))
+	if err != nil {
+		return err
+	}
+
+	err = genericVFSRenameVolume(d, vol, newName, op)
+	if err != nil {
+		return nil
+	}
+
+	revert = false
+	return nil
+}
+
+func (d *ceph) UpdateVolume(vol Volume, changedConfig map[string]string) error {
+	if vol.volType != VolumeTypeCustom {
+		return ErrNotSupported
+	}
+
+	val, ok := changedConfig["size"]
+	if ok {
+		err := d.SetVolumeQuota(vol, val, nil)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (d *ceph) GetVolumeUsage(vol Volume) (int64, error) {
+	return -1, fmt.Errorf("RBD quotas are currently not supported")
+}
+
+func (d *ceph) SetVolumeQuota(vol Volume, size string, op *operations.Operation) error {
+	fsType := d.getRBDFilesystem(vol)
+
+	RBDDevPath, ret := d.getRBDMappedDevPath(vol.pool, vol.name, string(vol.volType), true)
+	if ret < 0 {
+		return fmt.Errorf("Failed to get mapped RBD path")
+	}
+
+	oldSize, err := units.ParseByteSizeString(vol.config["size"])
+	if err != nil {
+		return err
+	}
+
+	newSize, err := units.ParseByteSizeString(size)
+	if err != nil {
+		return err
+	}
+
+	// The right disjunct just means that someone unset the size property in
+	// the container's config. We obviously cannot resize to 0.
+	if oldSize == newSize || newSize == 0 {
+		return nil
+	}
+
+	if newSize < oldSize {
+		err = d.rbdShrink(RBDDevPath, newSize, fsType, vol)
+	} else {
+		err = d.rbdGrow(RBDDevPath, newSize, fsType, vol)
+	}
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) GetVolumeDiskPath(vol Volume) (string, error) {
+	return "", ErrNotImplemented
+}
+
+func (d *ceph) MountVolume(vol Volume, op *operations.Operation) (bool, error) {
+	RBDFilesystem := d.getRBDFilesystem(vol)
+	mountPath := vol.MountPath()
+
+	if shared.IsMountPoint(mountPath) {
+		return false, nil
+	}
+
+	err := vol.EnsureMountPath()
+	if err != nil {
+		return false, err
+	}
+
+	RBDDevPath, ret := d.getRBDMappedDevPath(vol.pool, vol.name, string(vol.volType), true)
+	if ret < 0 {
+		return false, nil
+	}
+
+	mountFlags, mountOptions := resolveMountOptions(d.getRBDMountOptions(vol))
+
+	err = TryMount(RBDDevPath, mountPath, RBDFilesystem, mountFlags, mountOptions)
+	if err != nil {
+		return false, err
+	}
+
+	return true, nil
+}
+
+func (d *ceph) MountVolumeSnapshot(snapVol Volume, op *operations.Operation) (bool, error) {
+	revert := true
+	parentName, snapshotOnlyName, _ := shared.InstanceGetParentAndSnapshotName(snapVol.name)
+	prefixedSnapOnlyName := fmt.Sprintf("snapshot_%s", snapshotOnlyName)
+
+	// Protect snapshot to prevent data loss.
+	err := d.rbdProtectVolumeSnapshot(parentName, string(snapVol.volType), prefixedSnapOnlyName)
+	if err != nil {
+		return false, err
+	}
+
+	defer func() {
+		if !revert {
+			return
+		}
+
+		d.rbdUnprotectVolumeSnapshot(parentName, string(snapVol.volType), prefixedSnapOnlyName)
+	}()
+
+	// Clone snapshot.
+	cloneName := fmt.Sprintf("%s_%s_start_clone", parentName, snapshotOnlyName)
+
+	err = d.rbdCreateClone(parentName, string(snapVol.volType), prefixedSnapOnlyName, cloneName, "snapshots")
+	if err != nil {
+		return false, err
+	}
+
+	defer func() {
+		if !revert {
+			return
+		}
+
+		d.rbdDeleteVolume(cloneName, "snapshots")
+	}()
+
+	// Map volume
+	rbdDevPath, err := d.rbdMapVolume(cloneName, "snapshots")
+	if err != nil {
+		return false, err
+	}
+
+	defer func() {
+		if !revert {
+			return
+		}
+
+		d.rbdUnmapVolume(cloneName, "snapshots", true)
+	}()
+
+	mountPath := snapVol.MountPath()
+
+	if shared.IsMountPoint(mountPath) {
+		return false, nil
+	}
+
+	err = snapVol.EnsureMountPath()
+	if err != nil {
+		return false, err
+	}
+
+	RBDFilesystem := d.getRBDFilesystem(snapVol)
+	mountFlags, mountOptions := resolveMountOptions(d.getRBDMountOptions(snapVol))
+	if RBDFilesystem == "xfs" {
+		idx := strings.Index(mountOptions, "nouuid")
+		if idx < 0 {
+			mountOptions += ",nouuid"
+		}
+	}
+
+	err = TryMount(rbdDevPath, mountPath, RBDFilesystem, mountFlags, mountOptions)
+	if err != nil {
+		return false, err
+	}
+
+	revert = false
+
+	return true, nil
+}
+
+func (d *ceph) UnmountVolume(vol Volume, op *operations.Operation) (bool, error) {
+	mountPath := vol.MountPath()
+
+	if !shared.IsMountPoint(mountPath) {
+		return false, nil
+	}
+
+	err := TryUnmount(mountPath, unix.MNT_DETACH)
+	if err != nil {
+		return false, err
+	}
+
+	// Attempt to unmap
+	if vol.volType == VolumeTypeCustom {
+		err = d.rbdUnmapVolume(vol.name, string(vol.volType), true)
+		if err != nil {
+			return true, err
+		}
+	}
+
+	return true, nil
+}
+
+func (d *ceph) UnmountVolumeSnapshot(snapVol Volume, op *operations.Operation) (bool, error) {
+	mountPath := snapVol.MountPath()
+
+	if !shared.IsMountPoint(mountPath) {
+		return false, nil
+	}
+
+	err := TryUnmount(mountPath, unix.MNT_DETACH)
+	if err != nil {
+		return false, err
+	}
+
+	parentName, snapshotOnlyName, _ := shared.InstanceGetParentAndSnapshotName(snapVol.name)
+	cloneName := fmt.Sprintf("%s_%s_start_clone", parentName, snapshotOnlyName)
+
+	err = d.rbdUnmapVolume(cloneName, "snapshots", true)
+	if err != nil {
+		return false, err
+	}
+
+	if !d.rbdVolumeExists(cloneName, "snapshots") {
+		return true, nil
+	}
+
+	// Delete the temporary RBD volume
+	err = d.rbdDeleteVolume(cloneName, "snapshots")
+	if err != nil {
+		return false, err
+	}
+
+	return true, nil
+}
+
+func (d *ceph) CreateVolumeSnapshot(snapVol Volume, op *operations.Operation) error {
+	parentName, snapshotOnlyName, _ := shared.InstanceGetParentAndSnapshotName(snapVol.name)
+	sourcePath := GetVolumeMountPath(d.name, snapVol.volType, parentName)
+	snapshotName := fmt.Sprintf("snapshot_%s", snapshotOnlyName)
+
+	if shared.IsMountPoint(sourcePath) {
+		// This is costly but we need to ensure that all cached data has
+		// been committed to disk. If we don't then the rbd snapshot of
+		// the underlying filesystem can be inconsistent or - worst case
+		// - empty.
+		unix.Sync()
+
+		_, err := shared.TryRunCommand("fsfreeze", "--freeze", sourcePath)
+		if err == nil {
+			defer shared.TryRunCommand("fsfreeze", "--unfreeze", sourcePath)
+		}
+	}
+
+	// Create the parent directory.
+	err := createParentSnapshotDirIfMissing(d.name, snapVol.volType, parentName)
+	if err != nil {
+		return err
+	}
+
+	err = snapVol.EnsureMountPath()
+	if err != nil {
+		return err
+	}
+
+	err = d.rbdCreateVolumeSnapshot(parentName, string(snapVol.volType), snapshotName)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) DeleteVolumeSnapshot(snapVol Volume, op *operations.Operation) error {
+	parentName, snapshotOnlyName, _ := shared.InstanceGetParentAndSnapshotName(snapVol.name)
+	snapshotName := fmt.Sprintf("snapshot_%s", snapshotOnlyName)
+
+	if !d.rbdVolumeSnapshotExists(parentName, string(snapVol.volType), snapshotName) {
+		return nil
+	}
+
+	ret := d.deleteVolumeSnapshot(parentName, string(snapVol.volType), snapshotName)
+	if ret < 0 {
+		return fmt.Errorf("Failed to delete volume snapshot")
+	}
+
+	err := wipeDirectory(snapVol.MountPath())
+	if err != nil {
+		return err
+	}
+
+	err = os.Remove(snapVol.MountPath())
+	if err != nil {
+		return err
+	}
+
+	// Remove the parent snapshot directory if this is the last snapshot being removed.
+	err = deleteParentSnapshotDirIfEmpty(d.name, snapVol.volType, parentName)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) RenameVolumeSnapshot(snapVol Volume, newSnapshotName string, op *operations.Operation) error {
+	parentName, snapshotOnlyName, _ := shared.InstanceGetParentAndSnapshotName(snapVol.name)
+	oldSnapOnlyName := fmt.Sprintf("snapshot_%s", snapshotOnlyName)
+	newSnapOnlyName := fmt.Sprintf("snapshot_%s", newSnapshotName)
+
+	err := d.rbdRenameVolumeSnapshot(parentName, string(snapVol.volType), oldSnapOnlyName, newSnapOnlyName)
+	if err != nil {
+		return err
+	}
+
+	return genericVFSRenameVolumeSnapshot(d, snapVol, newSnapshotName, op)
+}
+
+func (d *ceph) VolumeSnapshots(vol Volume, op *operations.Operation) ([]string, error) {
+	snapshots, err := d.rbdListVolumeSnapshots(vol.name, string(vol.volType))
+	if err != nil {
+		if err == db.ErrNoSuchObject {
+			return nil, nil
+		}
+
+		return nil, err
+	}
+
+	var ret []string
+
+	for _, snap := range snapshots {
+		// Ignore zombie snapshots as these are only used internally and
+		// not relevant for users.
+		if strings.HasPrefix(snap, "zombie_") || strings.HasPrefix(snap, "migration-send-") {
+			continue
+		}
+
+		ret = append(ret, strings.TrimPrefix(snap, "snapshot_"))
+	}
+
+	return ret, nil
+}
+
+func (d *ceph) RestoreVolume(vol Volume, snapshotName string, op *operations.Operation) error {
+	ourUmount, err := d.UnmountVolume(vol, op)
+	if err != nil {
+		return err
+	}
+
+	if ourUmount {
+		defer d.MountVolume(vol, op)
+	}
+
+	prefixedSnapshotName := fmt.Sprintf("snapshot_%s", snapshotName)
+
+	err = d.rbdRestoreVolume(vol.name, string(vol.volType), prefixedSnapshotName)
+	if err != nil {
+		return err
+	}
+
+	snapVol, err := vol.NewSnapshot(snapshotName)
+	if err != nil {
+		return err
+	}
+
+	err = d.generateUUID(snapVol)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) MigrateVolume(vol Volume, conn io.ReadWriteCloser, volSrcArgs *migration.VolumeSourceArgs, op *operations.Operation) error {
+	if vol.contentType != ContentTypeFS {
+		return ErrNotSupported
+	}
+
+	// Handle simple rsync through generic.
+	if volSrcArgs.MigrationType.FSType == migration.MigrationFSType_RSYNC {
+		return genericVFSMigrateVolume(d, d.state, vol, conn, volSrcArgs, op)
+	} else if volSrcArgs.MigrationType.FSType != migration.MigrationFSType_RBD {
+		return ErrNotSupported
+	}
+
+	if shared.IsSnapshot(vol.name) {
+		parentName, snapOnlyName, _ := shared.InstanceGetParentAndSnapshotName(vol.name)
+		sendName := fmt.Sprintf("%s/snapshots_%s_%s_start_clone", d.name, parentName, snapOnlyName)
+
+		cloneVol := NewVolume(d, d.name, vol.volType, vol.contentType, vol.name, nil, nil)
+
+		// Mounting the volume snapshot will create the clone "snapshots_<parent>_<snap>_start_clone".
+		_, err := d.MountVolumeSnapshot(cloneVol, op)
+		if err != nil {
+			return err
+		}
+		defer d.UnmountVolumeSnapshot(cloneVol, op)
+
+		// Setup progress tracking.
+		var wrapper *ioprogress.ProgressTracker
+		if volSrcArgs.TrackProgress {
+			wrapper = migration.ProgressTracker(op, "fs_progress", vol.name)
+		}
+
+		err = d.sendVolume(conn, sendName, "", wrapper)
+		if err != nil {
+			return err
+		}
+
+		return nil
+	}
+
+	lastSnap := ""
+
+	if !volSrcArgs.FinalSync {
+		for i, snapName := range volSrcArgs.Snapshots {
+			snapshot, _ := vol.NewSnapshot(snapName)
+
+			prev := ""
+
+			if i > 0 {
+				prev = fmt.Sprintf("snapshot_%s", volSrcArgs.Snapshots[i-1])
+			}
+
+			lastSnap = fmt.Sprintf("snapshot_%s", snapName)
+			sendSnapName := d.getRBDVolumeName(vol.name, lastSnap, string(vol.volType), false, true)
+
+			// Setup progress tracking.
+			var wrapper *ioprogress.ProgressTracker
+
+			if volSrcArgs.TrackProgress {
+				wrapper = migration.ProgressTracker(op, "fs_progress", snapshot.name)
+			}
+
+			err := d.sendVolume(conn, sendSnapName, prev, wrapper)
+			if err != nil {
+				return err
+			}
+		}
+	}
+
+	// Setup progress tracking.
+	var wrapper *ioprogress.ProgressTracker
+	if volSrcArgs.TrackProgress {
+		wrapper = migration.ProgressTracker(op, "fs_progress", vol.name)
+	}
+
+	runningSnapName := fmt.Sprintf("migration-send-%s", uuid.NewRandom().String())
+
+	err := d.rbdCreateVolumeSnapshot(vol.name, string(vol.volType), runningSnapName)
+	if err != nil {
+		return err
+	}
+	defer d.rbdDeleteVolumeSnapshot(vol.name, string(vol.volType), runningSnapName)
+
+	cur := d.getRBDVolumeName(vol.name, runningSnapName, string(vol.volType), false, true)
+
+	err = d.sendVolume(conn, cur, lastSnap, wrapper)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) CreateVolumeFromMigration(vol Volume, conn io.ReadWriteCloser, volTargetArgs migration.VolumeTargetArgs, preFiller *VolumeFiller, op *operations.Operation) error {
+	if vol.contentType != ContentTypeFS {
+		return ErrNotSupported
+	}
+
+	// Handle simple rsync through generic.
+	if volTargetArgs.MigrationType.FSType == migration.MigrationFSType_RSYNC {
+		return genericCreateVolumeFromMigration(d, nil, vol, conn, volTargetArgs, preFiller, op)
+	} else if volTargetArgs.MigrationType.FSType != migration.MigrationFSType_RBD {
+		return ErrNotSupported
+	}
+
+	recvName := d.getRBDVolumeName(vol.name, "", string(vol.volType), false, true)
+
+	if !d.rbdVolumeExists(vol.name, string(vol.volType)) {
+		err := d.rbdCreateVolume(vol.name, string(vol.volType), "0")
+		if err != nil {
+			return err
+		}
+	}
+
+	err := vol.EnsureMountPath()
+	if err != nil {
+		return err
+	}
+
+	// Handle zfs send/receive migration.
+	if len(volTargetArgs.Snapshots) > 0 {
+		// Create the parent directory.
+		err := createParentSnapshotDirIfMissing(d.name, vol.volType, vol.name)
+		if err != nil {
+			return err
+		}
+
+		// Transfer the snapshots.
+		for _, snapName := range volTargetArgs.Snapshots {
+			fullSnapshotName := d.getRBDVolumeName(vol.name, snapName, string(vol.volType), false, true)
+			wrapper := migration.ProgressWriter(op, "fs_progress", fullSnapshotName)
+
+			err = d.receiveVolume(recvName, conn, wrapper)
+			if err != nil {
+				return err
+			}
+
+			snapVol, err := vol.NewSnapshot(snapName)
+			if err != nil {
+				return err
+			}
+
+			err = snapVol.EnsureMountPath()
+			if err != nil {
+				return err
+			}
+		}
+	}
+
+	defer func() {
+		// Delete all migration-send-* snapshots
+		snaps, err := d.rbdListVolumeSnapshots(vol.name, string(vol.volType))
+		if err != nil {
+			return
+		}
+
+		for _, snap := range snaps {
+			if !strings.HasPrefix(snap, "migration-send") {
+				continue
+			}
+
+			d.rbdDeleteVolumeSnapshot(vol.name, string(vol.volType), snap)
+		}
+	}()
+
+	wrapper := migration.ProgressWriter(op, "fs_progress", vol.name)
+
+	err = d.receiveVolume(recvName, conn, wrapper)
+	if err != nil {
+		return err
+	}
+
+	if volTargetArgs.Live {
+		err = d.receiveVolume(recvName, conn, wrapper)
+		if err != nil {
+			return err
+		}
+	}
+
+	err = d.generateUUID(vol)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *ceph) BackupVolume(vol Volume, targetPath string, optimized bool, snapshots bool, op *operations.Operation) error {
+	if snapshots {
+		snaps, err := vol.Snapshots(op)
+		if err != nil {
+			return err
+		}
+
+		for _, snap := range snaps {
+			err := d.rbdCreateVolumeBackup(snap, targetPath)
+			if err != nil {
+				return err
+			}
+		}
+	}
+
+	return d.rbdCreateVolumeBackup(vol, targetPath)
+}
+
+func (d *ceph) CreateVolumeFromBackup(vol Volume, snapshots []string, srcData io.ReadSeeker, optimizedStorage bool, op *operations.Operation) (func(vol Volume) error, func(), error) {
+	return nil, nil, ErrNotImplemented
+}
diff --git a/lxd/storage/drivers/load.go b/lxd/storage/drivers/load.go
index 2f7a114558..55f141038c 100644
--- a/lxd/storage/drivers/load.go
+++ b/lxd/storage/drivers/load.go
@@ -11,6 +11,7 @@ var drivers = map[string]func() driver{
 	"dir":    func() driver { return &dir{} },
 	"lvm":    func() driver { return &lvm{} },
 	"zfs":    func() driver { return &zfs{} },
+	"ceph":   func() driver { return &ceph{} },
 }
 
 // Validators contains functions used for validating a drivers's config.

From ff0e24ecdd290ff043a48d0fd6bf4611a39f5cc3 Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Mon, 20 Jan 2020 15:55:29 +0100
Subject: [PATCH 2/2] DEBUG: disable tests

This commit will be removed again.

Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
 test/main.sh | 210 +++++++++++++++++++++++++--------------------------
 1 file changed, 105 insertions(+), 105 deletions(-)

diff --git a/test/main.sh b/test/main.sh
index ab098a5f48..b6931539e5 100755
--- a/test/main.sh
+++ b/test/main.sh
@@ -154,111 +154,111 @@ if [ "$#" -gt 0 ]; then
   exit
 fi
 
-run_test test_check_deps "checking dependencies"
-run_test test_static_analysis "static analysis"
-run_test test_database_update "database schema updates"
-run_test test_database_restore "database restore"
-run_test test_database_no_disk_space "database out of disk space"
-run_test test_sql "lxd sql"
-run_test test_basic_usage "basic usage"
-run_test test_remote_url "remote url handling"
-run_test test_remote_admin "remote administration"
-run_test test_remote_usage "remote usage"
-run_test test_clustering_enable "clustering enable"
-run_test test_clustering_membership "clustering membership"
-run_test test_clustering_containers "clustering containers"
-run_test test_clustering_storage "clustering storage"
-run_test test_clustering_network "clustering network"
-run_test test_clustering_publish "clustering publish"
-run_test test_clustering_profiles "clustering profiles"
-run_test test_clustering_join_api "clustering join api"
-run_test test_clustering_shutdown_nodes "clustering shutdown"
-run_test test_clustering_projects "clustering projects"
-run_test test_clustering_address "clustering address"
-run_test test_clustering_image_replication "clustering image replication"
-run_test test_clustering_dns "clustering DNS"
-run_test test_clustering_recover "clustering recovery"
-run_test test_clustering_handover "clustering handover"
-run_test test_clustering_rebalance "clustering rebalance"
-run_test test_clustering_upgrade "clustering upgrade"
-run_test test_projects_default "default project"
-run_test test_projects_crud "projects CRUD operations"
-run_test test_projects_containers "containers inside projects"
-run_test test_projects_snapshots "snapshots inside projects"
-run_test test_projects_backups "backups inside projects"
-run_test test_projects_profiles "profiles inside projects"
-run_test test_projects_profiles_default "profiles from the global default project"
-run_test test_projects_images "images inside projects"
-run_test test_projects_images_default "images from the global default project"
-run_test test_projects_storage "projects and storage pools"
-run_test test_projects_network "projects and networks"
-run_test test_container_devices_disk "container devices - disk"
-run_test test_container_devices_nic_p2p "container devices - nic - p2p"
-run_test test_container_devices_nic_bridged "container devices - nic - bridged"
-run_test test_container_devices_nic_bridged_filtering "container devices - nic - bridged - filtering"
-run_test test_container_devices_nic_physical "container devices - nic - physical"
-run_test test_container_devices_nic_macvlan "container devices - nic - macvlan"
-run_test test_container_devices_nic_ipvlan "container devices - nic - ipvlan"
-run_test test_container_devices_nic_sriov "container devices - nic - sriov"
-run_test test_container_devices_nic_routed "container devices - nic - routed"
-run_test test_container_devices_infiniband_physical "container devices - infiniband - physical"
-run_test test_container_devices_infiniband_sriov "container devices - infiniband - sriov"
-run_test test_container_devices_proxy "container devices - proxy"
-run_test test_container_devices_gpu "container devices - gpu"
-run_test test_container_devices_unix_char "container devices - unix-char"
-run_test test_container_devices_unix_block "container devices - unix-block"
-run_test test_security "security features"
-run_test test_security_protection "container protection"
-run_test test_image_expiry "image expiry"
-run_test test_image_list_all_aliases "image list all aliases"
-run_test test_image_auto_update "image auto-update"
-run_test test_image_prefer_cached "image prefer cached"
-run_test test_image_import_dir "import image from directory"
-run_test test_concurrent_exec "concurrent exec"
-run_test test_concurrent "concurrent startup"
-run_test test_snapshots "container snapshots"
-run_test test_snap_restore "snapshot restores"
-run_test test_snap_expiry "snapshot expiry"
-run_test test_config_profiles "profiles and configuration"
-run_test test_config_edit "container configuration edit"
-run_test test_config_edit_container_snapshot_pool_config "container and snapshot volume configuration edit"
-run_test test_container_metadata "manage container metadata and templates"
-run_test test_container_snapshot_config "container snapshot configuration"
-run_test test_server_config "server configuration"
-run_test test_filemanip "file manipulations"
-run_test test_network "network management"
-run_test test_idmap "id mapping"
-run_test test_template "file templating"
-run_test test_pki "PKI mode"
-run_test test_devlxd "/dev/lxd"
-run_test test_fuidshift "fuidshift"
-run_test test_migration "migration"
-run_test test_fdleak "fd leak"
-run_test test_storage "storage"
-run_test test_storage_volume_snapshots "storage volume snapshots"
-run_test test_init_auto "lxd init auto"
-run_test test_init_interactive "lxd init interactive"
-run_test test_init_preseed "lxd init preseed"
-run_test test_storage_profiles "storage profiles"
-run_test test_container_import "container import"
-run_test test_storage_volume_attach "attaching storage volumes"
-run_test test_storage_driver_ceph "ceph storage driver"
-run_test test_storage_driver_cephfs "cephfs storage driver"
-run_test test_resources "resources"
-run_test test_kernel_limits "kernel limits"
-run_test test_macaroon_auth "macaroon authentication"
-run_test test_console "console"
-run_test test_query "query"
-run_test test_storage_local_volume_handling "storage local volume handling"
-run_test test_backup_import "backup import"
-run_test test_backup_export "backup export"
-run_test test_backup_rename "backup rename"
-run_test test_container_local_cross_pool_handling "container local cross pool handling"
-run_test test_incremental_copy "incremental container copy"
-run_test test_profiles_project_default "profiles in default project"
-run_test test_profiles_project_images_profiles "profiles in project with images and profiles enabled"
-run_test test_profiles_project_images "profiles in project with images enabled and profiles disabled"
-run_test test_profiles_project_profiles "profiles in project with images disabled and profiles enabled"
+# run_test test_check_deps "checking dependencies"
+# run_test test_static_analysis "static analysis"
+# run_test test_database_update "database schema updates"
+# run_test test_database_restore "database restore"
+# run_test test_database_no_disk_space "database out of disk space"
+# run_test test_sql "lxd sql"
+# run_test test_basic_usage "basic usage"
+# run_test test_remote_url "remote url handling"
+# run_test test_remote_admin "remote administration"
+# run_test test_remote_usage "remote usage"
+# run_test test_clustering_enable "clustering enable"
+# run_test test_clustering_membership "clustering membership"
+# run_test test_clustering_containers "clustering containers"
+# run_test test_clustering_storage "clustering storage"
+# run_test test_clustering_network "clustering network"
+# run_test test_clustering_publish "clustering publish"
+# run_test test_clustering_profiles "clustering profiles"
+# run_test test_clustering_join_api "clustering join api"
+# run_test test_clustering_shutdown_nodes "clustering shutdown"
+# run_test test_clustering_projects "clustering projects"
+# run_test test_clustering_address "clustering address"
+# run_test test_clustering_image_replication "clustering image replication"
+# run_test test_clustering_dns "clustering DNS"
+# run_test test_clustering_recover "clustering recovery"
+# run_test test_clustering_handover "clustering handover"
+# run_test test_clustering_rebalance "clustering rebalance"
+# run_test test_clustering_upgrade "clustering upgrade"
+# run_test test_projects_default "default project"
+# run_test test_projects_crud "projects CRUD operations"
+# run_test test_projects_containers "containers inside projects"
+# run_test test_projects_snapshots "snapshots inside projects"
+# run_test test_projects_backups "backups inside projects"
+# run_test test_projects_profiles "profiles inside projects"
+# run_test test_projects_profiles_default "profiles from the global default project"
+# run_test test_projects_images "images inside projects"
+# run_test test_projects_images_default "images from the global default project"
+# run_test test_projects_storage "projects and storage pools"
+# run_test test_projects_network "projects and networks"
+# run_test test_container_devices_disk "container devices - disk"
+# run_test test_container_devices_nic_p2p "container devices - nic - p2p"
+# run_test test_container_devices_nic_bridged "container devices - nic - bridged"
+# run_test test_container_devices_nic_bridged_filtering "container devices - nic - bridged - filtering"
+# run_test test_container_devices_nic_physical "container devices - nic - physical"
+# run_test test_container_devices_nic_macvlan "container devices - nic - macvlan"
+# run_test test_container_devices_nic_ipvlan "container devices - nic - ipvlan"
+# run_test test_container_devices_nic_sriov "container devices - nic - sriov"
+# run_test test_container_devices_nic_routed "container devices - nic - routed"
+# run_test test_container_devices_infiniband_physical "container devices - infiniband - physical"
+# run_test test_container_devices_infiniband_sriov "container devices - infiniband - sriov"
+# run_test test_container_devices_proxy "container devices - proxy"
+# run_test test_container_devices_gpu "container devices - gpu"
+# run_test test_container_devices_unix_char "container devices - unix-char"
+# run_test test_container_devices_unix_block "container devices - unix-block"
+# run_test test_security "security features"
+# run_test test_security_protection "container protection"
+# run_test test_image_expiry "image expiry"
+# run_test test_image_list_all_aliases "image list all aliases"
+# run_test test_image_auto_update "image auto-update"
+# run_test test_image_prefer_cached "image prefer cached"
+# run_test test_image_import_dir "import image from directory"
+# run_test test_concurrent_exec "concurrent exec"
+# run_test test_concurrent "concurrent startup"
+# run_test test_snapshots "container snapshots"
+# run_test test_snap_restore "snapshot restores"
+# run_test test_snap_expiry "snapshot expiry"
+# run_test test_config_profiles "profiles and configuration"
+# run_test test_config_edit "container configuration edit"
+# run_test test_config_edit_container_snapshot_pool_config "container and snapshot volume configuration edit"
+# run_test test_container_metadata "manage container metadata and templates"
+# run_test test_container_snapshot_config "container snapshot configuration"
+# run_test test_server_config "server configuration"
+# run_test test_filemanip "file manipulations"
+# run_test test_network "network management"
+# run_test test_idmap "id mapping"
+# run_test test_template "file templating"
+# run_test test_pki "PKI mode"
+# run_test test_devlxd "/dev/lxd"
+# run_test test_fuidshift "fuidshift"
+# run_test test_migration "migration"
+# run_test test_fdleak "fd leak"
+# run_test test_storage "storage"
+# run_test test_storage_volume_snapshots "storage volume snapshots"
+# run_test test_init_auto "lxd init auto"
+# run_test test_init_interactive "lxd init interactive"
+# run_test test_init_preseed "lxd init preseed"
+# run_test test_storage_profiles "storage profiles"
+# run_test test_container_import "container import"
+# run_test test_storage_volume_attach "attaching storage volumes"
+# run_test test_storage_driver_ceph "ceph storage driver"
+# run_test test_storage_driver_cephfs "cephfs storage driver"
+# run_test test_resources "resources"
+# run_test test_kernel_limits "kernel limits"
+# run_test test_macaroon_auth "macaroon authentication"
+# run_test test_console "console"
+# run_test test_query "query"
+# run_test test_storage_local_volume_handling "storage local volume handling"
+# run_test test_backup_import "backup import"
+# run_test test_backup_export "backup export"
+# run_test test_backup_rename "backup rename"
+# run_test test_container_local_cross_pool_handling "container local cross pool handling"
+# run_test test_incremental_copy "incremental container copy"
+# run_test test_profiles_project_default "profiles in default project"
+# run_test test_profiles_project_images_profiles "profiles in project with images and profiles enabled"
+# run_test test_profiles_project_images "profiles in project with images enabled and profiles disabled"
+# run_test test_profiles_project_profiles "profiles in project with images disabled and profiles enabled"
 
 # shellcheck disable=SC2034
 TEST_RESULT=success


More information about the lxc-devel mailing list