[lxc-devel] [lxd/master] Add cephfs backend

monstermunchkin on Github lxc-bot at linuxcontainers.org
Thu Oct 31 15:04:06 UTC 2019


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20191031/ba48ba9a/attachment.bin>
-------------- next part --------------
From 07686d47246ee073bcc529473d3ecd9ee0d645b9 Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Thu, 31 Oct 2019 09:53:45 +0100
Subject: [PATCH 1/2] lxd/storage: Add locking for mount operations

Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
 lxd/storage/backend_lxd.go | 24 +++++++++++++
 lxd/storage/lock.go        | 69 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 93 insertions(+)
 create mode 100644 lxd/storage/lock.go

diff --git a/lxd/storage/backend_lxd.go b/lxd/storage/backend_lxd.go
index 9f91ab84d9..d7e5f560be 100644
--- a/lxd/storage/backend_lxd.go
+++ b/lxd/storage/backend_lxd.go
@@ -125,11 +125,23 @@ func (b *lxdBackend) Delete(op *operations.Operation) error {
 
 // Mount mounts the storage pool.
 func (b *lxdBackend) Mount() (bool, error) {
+	unlock := lock(getPoolMountLockID(b.name))
+	if unlock == nil {
+		return false, nil
+	}
+	defer unlock()
+
 	return b.driver.Mount()
 }
 
 // Unmount unmounts the storage pool.
 func (b *lxdBackend) Unmount() (bool, error) {
+	unlock := lock(getPoolUmountLockID(b.name))
+	if unlock == nil {
+		return false, nil
+	}
+	defer unlock()
+
 	return b.driver.Unmount()
 }
 
@@ -596,11 +608,23 @@ func (b *lxdBackend) SetCustomVolumeQuota(vol api.StorageVolume, quota uint64) e
 
 // MountCustomVolume mounts a custom volume.
 func (b *lxdBackend) MountCustomVolume(volName string, op *operations.Operation) (bool, error) {
+	unlock := lock(getVolumeMountLockID(volName))
+	if unlock == nil {
+		return false, nil
+	}
+	defer unlock()
+
 	return b.driver.MountVolume(drivers.VolumeTypeCustom, volName, op)
 }
 
 // UnmountCustomVolume unmounts a custom volume.
 func (b *lxdBackend) UnmountCustomVolume(volName string, op *operations.Operation) (bool, error) {
+	unlock := lock(getVolumeUmountLockID(volName))
+	if unlock == nil {
+		return false, nil
+	}
+	defer unlock()
+
 	return b.driver.UnmountVolume(drivers.VolumeTypeCustom, volName, op)
 }
 
diff --git a/lxd/storage/lock.go b/lxd/storage/lock.go
new file mode 100644
index 0000000000..e16a794adf
--- /dev/null
+++ b/lxd/storage/lock.go
@@ -0,0 +1,69 @@
+package storage
+
+import (
+	"fmt"
+	"sync"
+
+	"github.com/lxc/lxd/shared/logger"
+)
+
+// lxdStorageLockMap is a hashmap that allows functions to check whether the
+// operation they are about to perform is already in progress. If it is the
+// channel can be used to wait for the operation to finish. If it is not, the
+// function that wants to perform the operation should store its code in the
+// hashmap.
+// Note that any access to this map must be done while holding a lock.
+var lxdStorageOngoingOperationMap = map[string]chan bool{}
+
+// lxdStorageMapLock is used to access lxdStorageOngoingOperationMap.
+var lxdStorageMapLock sync.Mutex
+
+// The following functions are used to construct simple operation codes that are
+// unique.
+func getPoolMountLockID(poolName string) string {
+	return fmt.Sprintf("mount/pool/%s", poolName)
+}
+
+func getPoolUmountLockID(poolName string) string {
+	return fmt.Sprintf("umount/pool/%s", poolName)
+}
+
+func getVolumeMountLockID(volumeName string) string {
+	return fmt.Sprintf("mount/volume/%s", volumeName)
+}
+
+func getVolumeUmountLockID(volumeName string) string {
+	return fmt.Sprintf("umount/volume/%s", volumeName)
+}
+
+func lock(lockID string) func() {
+	lxdStorageMapLock.Lock()
+
+	if waitChannel, ok := lxdStorageOngoingOperationMap[lockID]; ok {
+		lxdStorageMapLock.Unlock()
+
+		_, ok := <-waitChannel
+		if ok {
+			logger.Warnf("Received value over semaphore, this should ot have happened")
+		}
+
+		// Give the benefit of the doubt and assume that the other
+		// thread actually succeeded in mounting the storage pool.
+		return nil
+	}
+
+	lxdStorageOngoingOperationMap[lockID] = make(chan bool)
+	lxdStorageMapLock.Unlock()
+
+	return func() {
+		lxdStorageMapLock.Lock()
+
+		waitChannel, ok := lxdStorageOngoingOperationMap[lockID]
+		if ok {
+			close(waitChannel)
+			delete(lxdStorageOngoingOperationMap, lockID)
+		}
+
+		lxdStorageMapLock.Unlock()
+	}
+}

From 0b07c363a61600d1fa76d6b7568396b5ba67160d Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Wed, 30 Oct 2019 19:24:31 +0100
Subject: [PATCH 2/2] lxd/storage/drivers: Add cephfs

This adds the cephfs storage driver.

Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
 lxd/storage/drivers/driver_cephfs.go | 1013 ++++++++++++++++++++++++++
 lxd/storage/drivers/load.go          |    3 +-
 2 files changed, 1015 insertions(+), 1 deletion(-)
 create mode 100644 lxd/storage/drivers/driver_cephfs.go

diff --git a/lxd/storage/drivers/driver_cephfs.go b/lxd/storage/drivers/driver_cephfs.go
new file mode 100644
index 0000000000..26d04d91bf
--- /dev/null
+++ b/lxd/storage/drivers/driver_cephfs.go
@@ -0,0 +1,1013 @@
+package drivers
+
+import (
+	"bufio"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"strings"
+
+	"github.com/lxc/lxd/lxd/migration"
+	"github.com/lxc/lxd/lxd/operations"
+	"github.com/lxc/lxd/lxd/rsync"
+	"github.com/lxc/lxd/lxd/storage/quota"
+	"github.com/lxc/lxd/shared"
+	"github.com/lxc/lxd/shared/api"
+	"github.com/lxc/lxd/shared/ioprogress"
+	"github.com/lxc/lxd/shared/units"
+)
+
+type cephfs struct {
+	common
+
+	fsName  string
+	version string
+}
+
+func (d *cephfs) Info() Info {
+	if d.version == "" {
+		msg, err := shared.RunCommand("rbd", "--version")
+		if err != nil {
+			d.version = "unknown"
+		} else {
+			d.version = strings.TrimSpace(msg)
+		}
+	}
+
+	return Info{
+		Name:            "cephfs",
+		Version:         d.version,
+		OptimizedImages: true,
+		PreservesInodes: false,
+		Usable:          true,
+		Remote:          false,
+		VolumeTypes:     []VolumeType{VolumeTypeCustom},
+	}
+}
+
+func (d *cephfs) HasVolume(volType VolumeType, volName string) bool {
+	if shared.PathExists(GetVolumeMountPath(d.name, volType, volName)) {
+		return true
+	}
+
+	return false
+}
+
+func (d *cephfs) Create() error {
+	if d.config["source"] == "" {
+		return fmt.Errorf("Missing required source name/path")
+	}
+
+	if d.config["cephfs.path"] != "" && d.config["cephfs.path"] != d.config["source"] {
+		return fmt.Errorf("cephfs.path must match the source")
+	}
+
+	if d.config["cephfs.cluster_name"] == "" {
+		d.config["cephfs.cluster_name"] = "ceph"
+	}
+
+	if d.config["cephfs.user.name"] != "" {
+		d.config["cephfs.user.name"] = "admin"
+	}
+
+	d.fsName = d.config["source"]
+
+	// Parse the namespace / path
+	fields := strings.SplitN(d.fsName, "/", 2)
+	fsName := fields[0]
+	fsPath := "/"
+	if len(fields) > 1 {
+		fsPath = fields[1]
+	}
+
+	// Check that the filesystem exists
+	if !d.fsExists(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"], fsName) {
+		return fmt.Errorf("The requested '%v' CEPHFS doesn't exist", fsName)
+	}
+
+	// Create a temporary mountpoint
+	mountPath, err := ioutil.TempDir("", "lxd_cephfs_")
+	if err != nil {
+		return err
+	}
+	defer os.RemoveAll(mountPath)
+
+	err = os.Chmod(mountPath, 0700)
+	if err != nil {
+		return err
+	}
+
+	mountPoint := filepath.Join(mountPath, "mount")
+
+	err = os.Mkdir(mountPoint, 0700)
+	if err != nil {
+		return err
+	}
+
+	// Get the credentials and host
+	monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user_name"])
+	if err != nil {
+		return err
+	}
+
+	connected := false
+	for _, monAddress := range monAddresses {
+		uri := fmt.Sprintf("%s:6789:/", monAddress)
+		err = tryMount(uri, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user_name"], userSecret, fsName))
+		if err != nil {
+			continue
+		}
+
+		connected = true
+		defer forceUnmount(mountPoint)
+		break
+	}
+
+	if !connected {
+		return err
+	}
+
+	// Create the path if missing
+	err = os.MkdirAll(filepath.Join(mountPoint, fsPath), 0755)
+	if err != nil {
+		return err
+	}
+
+	// Check that the existing path is empty
+	ok, _ := shared.PathIsEmpty(filepath.Join(mountPoint, fsPath))
+	if !ok {
+		return fmt.Errorf("Only empty CEPHFS paths can be used as a LXD storage pool")
+	}
+
+	return nil
+}
+
+func (d *cephfs) Delete(op *operations.Operation) error {
+	// Parse the namespace / path
+	fields := strings.SplitN(d.fsName, "/", 2)
+	fsName := fields[0]
+	fsPath := "/"
+	if len(fields) > 1 {
+		fsPath = fields[1]
+	}
+
+	// Create a temporary mountpoint
+	mountPath, err := ioutil.TempDir("", "lxd_cephfs_")
+	if err != nil {
+		return err
+	}
+	defer os.RemoveAll(mountPath)
+
+	err = os.Chmod(mountPath, 0700)
+	if err != nil {
+		return err
+	}
+
+	mountPoint := filepath.Join(mountPath, "mount")
+	err = os.Mkdir(mountPoint, 0700)
+	if err != nil {
+		return err
+	}
+
+	// Get the credentials and host
+	monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user_name"])
+	if err != nil {
+		return err
+	}
+
+	connected := false
+	for _, monAddress := range monAddresses {
+		uri := fmt.Sprintf("%s:6789:/", monAddress)
+		err = tryMount(uri, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user_name"], userSecret, fsName))
+		if err != nil {
+			continue
+		}
+
+		connected = true
+		defer forceUnmount(mountPoint)
+		break
+	}
+
+	if !connected {
+		return err
+	}
+
+	if shared.PathExists(filepath.Join(mountPoint, fsPath)) {
+		// Delete the usual directories
+		for _, dir := range []string{"custom", "custom-snapshots"} {
+			if shared.PathExists(filepath.Join(mountPoint, fsPath, dir)) {
+				err = os.Remove(filepath.Join(mountPoint, fsPath, dir))
+				if err != nil {
+					return err
+				}
+			}
+		}
+
+		// Confirm that the path is now empty
+		ok, _ := shared.PathIsEmpty(filepath.Join(mountPoint, fsPath))
+		if !ok {
+			return fmt.Errorf("Only empty CEPHFS paths can be used as a LXD storage pool")
+		}
+
+		// Delete the path itself
+		if fsPath != "" && fsPath != "/" {
+			err = os.Remove(filepath.Join(mountPoint, fsPath))
+			if err != nil {
+				return err
+			}
+		}
+	}
+
+	// On delete, wipe everything in the directory.
+	err = wipeDirectory(GetPoolMountPath(d.name))
+	if err != nil {
+		return err
+	}
+
+	// Make sure the existing pool is unmounted
+	_, err = d.Unmount()
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *cephfs) Mount() (bool, error) {
+	// Parse the namespace / path
+	fields := strings.SplitN(d.fsName, "/", 2)
+	fsName := fields[0]
+	fsPath := "/"
+	if len(fields) > 1 {
+		fsPath = fields[1]
+	}
+
+	// Get the credentials and host
+	monAddresses, secret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user_name"])
+	if err != nil {
+		return false, err
+	}
+
+	// Do the actual mount
+	connected := false
+	for _, monAddress := range monAddresses {
+		uri := fmt.Sprintf("%s:6789:/%s", monAddress, fsPath)
+		err = tryMount(uri, GetPoolMountPath(d.name), "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user_name"], secret, fsName))
+		if err != nil {
+			continue
+		}
+
+		connected = true
+		break
+	}
+
+	if !connected {
+		return false, err
+	}
+
+	return true, nil
+}
+
+func (d *cephfs) Unmount() (bool, error) {
+	return forceUnmount(GetPoolMountPath(d.name))
+}
+
+func (d *cephfs) GetResources() (*api.ResourcesStoragePool, error) {
+	// Use the generic VFS resources.
+	return vfsResources(GetPoolMountPath(d.name))
+}
+
+func (d *cephfs) ValidateVolume(volConfig map[string]string, removeUnknownKeys bool) error {
+	return d.validateVolume(volConfig, nil, removeUnknownKeys)
+}
+
+func (d *cephfs) CreateVolume(vol Volume, filler func(path string) error, op *operations.Operation) error {
+	if vol.volType != VolumeTypeCustom {
+		return fmt.Errorf("Volume type not supported")
+	}
+
+	if vol.contentType != ContentTypeFS {
+		return fmt.Errorf("Content type not supported")
+	}
+
+	volPath := vol.MountPath()
+
+	// Get the volume ID for the new volume, which is used to set project quota.
+	volID, err := d.getVolID(vol.volType, vol.name)
+	if err != nil {
+		return err
+	}
+
+	err = os.MkdirAll(volPath, 0711)
+	if err != nil {
+		return err
+	}
+
+	revertPath := true
+	defer func() {
+		if revertPath {
+			d.deleteQuota(volPath, volID)
+			os.RemoveAll(volPath)
+		}
+	}()
+
+	// Initialise the volume's quota using the volume ID.
+	err = d.initQuota(volPath, volID)
+	if err != nil {
+		return err
+	}
+
+	// Set the quota if specified in volConfig or pool config.
+	err = d.setQuota(volPath, volID, vol.config["size"])
+	if err != nil {
+		return err
+	}
+
+	if filler != nil {
+		err = filler(volPath)
+		if err != nil {
+			return err
+		}
+	}
+
+	revertPath = false
+	return nil
+}
+
+func (d *cephfs) CreateVolumeFromCopy(vol Volume, srcVol Volume, copySnapshots bool, op *operations.Operation) error {
+	if vol.volType != VolumeTypeCustom || srcVol.volType != VolumeTypeCustom {
+		return fmt.Errorf("Volume type not supported")
+	}
+
+	if vol.contentType != ContentTypeFS || srcVol.contentType != ContentTypeFS {
+		return fmt.Errorf("Content type not supported")
+	}
+
+	bwlimit := d.config["rsync.bwlimit"]
+
+	// Get the volume ID for the new volumes, which is used to set project quota.
+	volID, err := d.getVolID(vol.volType, vol.name)
+	if err != nil {
+		return err
+	}
+
+	// Create slice of paths created if revert needed later.
+	revertPaths := []string{}
+	defer func() {
+		// Remove any paths created if we are reverting.
+		for _, path := range revertPaths {
+			d.deleteQuota(path, volID)
+			os.RemoveAll(path)
+		}
+	}()
+
+	if copySnapshots && !srcVol.IsSnapshot() {
+		srcSnapshots, err := srcVol.Snapshots(op)
+		if err != nil {
+			return err
+		}
+
+		for _, srcSnapshot := range srcSnapshots {
+			_, snapName, _ := shared.ContainerGetParentAndSnapshotName(srcSnapshot.name)
+			dstSnapshot, err := vol.NewSnapshot(snapName)
+			if err != nil {
+				return err
+			}
+
+			dstSnapPath := dstSnapshot.MountPath()
+			err = os.MkdirAll(dstSnapPath, 0711)
+			if err != nil {
+				return err
+			}
+
+			revertPaths = append(revertPaths, dstSnapPath)
+
+			// Initialise the snapshot's quota with the parent volume's ID.
+			err = d.initQuota(dstSnapPath, volID)
+			if err != nil {
+				return err
+			}
+
+			err = srcSnapshot.MountTask(func(srcMountPath string, op *operations.Operation) error {
+				return dstSnapshot.MountTask(func(dstMountPath string, op *operations.Operation) error {
+					_, err = rsync.LocalCopy(srcMountPath, dstMountPath, bwlimit, true)
+					if err != nil {
+						return err
+					}
+
+					cephSnapPath := filepath.Join(dstMountPath, ".snap", snapName)
+					err := os.Mkdir(cephSnapPath, 0711)
+					if err != nil {
+						return err
+					}
+
+					// Make the snapshot path a symlink
+					targetPath := GetVolumeMountPath(d.name, VolumeTypeCustom, GetSnapshotVolumeName(vol.name, snapName))
+					err = os.MkdirAll(filepath.Dir(targetPath), 0711)
+					if err != nil {
+						return err
+					}
+
+					return os.Symlink(cephSnapPath, targetPath)
+				}, op)
+			}, op)
+		}
+	}
+
+	volPath := vol.MountPath()
+	err = os.MkdirAll(volPath, 0711)
+	if err != nil {
+		return err
+	}
+
+	revertPaths = append(revertPaths, volPath)
+
+	// Initialise the volume's quota using the volume ID.
+	err = d.initQuota(volPath, volID)
+	if err != nil {
+		return err
+	}
+
+	// Set the quota if specified in volConfig or pool config.
+	err = d.setQuota(volPath, volID, vol.config["size"])
+	if err != nil {
+		return err
+	}
+
+	// Copy source to destination (mounting each volume if needed).
+	err = srcVol.MountTask(func(srcMountPath string, op *operations.Operation) error {
+		return vol.MountTask(func(dstMountPath string, op *operations.Operation) error {
+			_, err := rsync.LocalCopy(srcMountPath, dstMountPath, bwlimit, true)
+			if err != nil {
+				return err
+			}
+
+			if vol.IsSnapshot() {
+				_, snapName, _ := shared.ContainerGetParentAndSnapshotName(vol.name)
+
+				cephSnapPath := filepath.Join(dstMountPath, ".snap", snapName)
+				err := os.Mkdir(cephSnapPath, 0711)
+				if err != nil {
+					return err
+				}
+
+				// Make the snapshot path a symlink
+				targetPath := GetVolumeMountPath(d.name, VolumeTypeCustom, fmt.Sprintf("%s/%s", vol.name, snapName))
+				err = os.MkdirAll(filepath.Dir(targetPath), 0711)
+				if err != nil {
+					return err
+				}
+
+				return os.Symlink(cephSnapPath, targetPath)
+			}
+
+			return nil
+		}, op)
+	}, op)
+	if err != nil {
+		return err
+	}
+
+	revertPaths = nil // Don't revert.
+	return nil
+}
+
+func (d *cephfs) DeleteVolume(volType VolumeType, volName string, op *operations.Operation) error {
+	if volType != VolumeTypeCustom {
+		return fmt.Errorf("Volume type not supported")
+	}
+
+	snapshots, err := d.VolumeSnapshots(volType, volName, op)
+	if err != nil {
+		return err
+	}
+
+	if len(snapshots) > 0 {
+		return fmt.Errorf("Cannot remove a volume that has snapshots")
+	}
+
+	volPath := GetVolumeMountPath(d.name, volType, volName)
+
+	// If the volume doesn't exist, then nothing more to do.
+	if !shared.PathExists(volPath) {
+		return nil
+	}
+
+	// Get the volume ID for the volume, which is used to remove project quota.
+	volID, err := d.getVolID(volType, volName)
+	if err != nil {
+		return err
+	}
+
+	// Remove the project quota.
+	err = d.deleteQuota(volPath, volID)
+	if err != nil {
+		return err
+	}
+
+	// Remove the volume from the storage device.
+	err = os.RemoveAll(volPath)
+	if err != nil {
+		return err
+	}
+
+	// Although the volume snapshot directory should already be removed, lets remove it here
+	// to just in case the top-level directory is left.
+	snapshotDir, err := GetVolumeSnapshotDir(d.name, volType, volName)
+	if err != nil {
+		return err
+	}
+
+	err = os.RemoveAll(snapshotDir)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *cephfs) RenameVolume(volType VolumeType, volName string, newName string, op *operations.Operation) error {
+	if volType != VolumeTypeCustom {
+		return fmt.Errorf("Volume type not supported")
+	}
+
+	vol := NewVolume(d, d.name, volType, ContentTypeFS, volName, nil)
+
+	// Create new snapshots directory.
+	snapshotDir, err := GetVolumeSnapshotDir(d.name, volType, newName)
+	if err != nil {
+		return err
+	}
+
+	err = os.MkdirAll(snapshotDir, 0711)
+	if err != nil {
+		return err
+	}
+
+	type volRevert struct {
+		oldPath   string
+		newPath   string
+		isSymlink bool
+	}
+
+	// Create slice to record paths renamed if revert needed later.
+	revertPaths := []volRevert{}
+	defer func() {
+		// Remove any paths rename if we are reverting.
+		for _, vol := range revertPaths {
+			if vol.isSymlink {
+				os.Symlink(vol.oldPath, vol.newPath)
+			} else {
+				os.Rename(vol.newPath, vol.oldPath)
+			}
+		}
+
+		// Remove the new snapshot directory if we are reverting.
+		if len(revertPaths) > 0 {
+			err = os.RemoveAll(snapshotDir)
+		}
+	}()
+
+	// Rename any snapshots of the volume too.
+	snapshots, err := vol.Snapshots(op)
+	if err != nil {
+		return err
+	}
+
+	for _, snapshot := range snapshots {
+		srcSnapshotDir, err := GetVolumeSnapshotDir(d.name, volType, volName)
+		if err != nil {
+			return err
+		}
+
+		targetSnapshotDir, err := GetVolumeSnapshotDir(d.name, volType, newName)
+		if err != nil {
+			return err
+		}
+
+		err = os.Rename(srcSnapshotDir, targetSnapshotDir)
+		if err != nil {
+			return err
+		}
+
+		sourcePath := GetVolumeMountPath(d.name, volType, newName)
+		targetPath := GetVolumeMountPath(d.name, volType, newName)
+		_, snapName, _ := shared.ContainerGetParentAndSnapshotName(snapshot.name)
+
+		oldCephSnapPath := filepath.Join(sourcePath, ".snap", snapName)
+		newCephSnapPath := filepath.Join(targetPath, ".snap", snapName)
+
+		oldPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, snapName))
+		newPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(newName, snapName))
+
+		err = os.Symlink(newCephSnapPath, newPath)
+		if err != nil {
+			return err
+		}
+
+		revertPaths = append(revertPaths, volRevert{
+			oldPath:   oldPath,
+			newPath:   oldCephSnapPath,
+			isSymlink: true,
+		})
+
+		revertPaths = append(revertPaths, volRevert{
+			oldPath: srcSnapshotDir,
+			newPath: targetSnapshotDir,
+		})
+	}
+
+	oldPath := GetVolumeMountPath(d.name, volType, volName)
+	newPath := GetVolumeMountPath(d.name, volType, newName)
+	err = os.Rename(oldPath, newPath)
+	if err != nil {
+		return err
+	}
+
+	revertPaths = append(revertPaths, volRevert{
+		oldPath: oldPath,
+		newPath: newPath,
+	})
+
+	revertPaths = nil
+	return nil
+}
+
+func (d *cephfs) MountVolume(volType VolumeType, volName string, op *operations.Operation) (bool, error) {
+	if volType != VolumeTypeCustom {
+		return false, fmt.Errorf("Volume type not supported")
+	}
+
+	return false, nil
+}
+
+func (d *cephfs) MountVolumeSnapshot(volType VolumeType, VolName, snapshotName string, op *operations.Operation) (bool, error) {
+	if volType != VolumeTypeCustom {
+		return false, fmt.Errorf("Volume type not supported")
+	}
+
+	return false, nil
+}
+
+func (d *cephfs) UnmountVolume(volType VolumeType, volName string, op *operations.Operation) (bool, error) {
+	if volType != VolumeTypeCustom {
+		return false, fmt.Errorf("Volume type not supported")
+	}
+
+	return false, nil
+}
+
+func (d *cephfs) UnmountVolumeSnapshot(volType VolumeType, volName, snapshotName string, op *operations.Operation) (bool, error) {
+	if volType != VolumeTypeCustom {
+		return false, fmt.Errorf("Volume type not supported")
+	}
+
+	return false, nil
+}
+
+func (d *cephfs) CreateVolumeSnapshot(volType VolumeType, volName string, newSnapshotName string, op *operations.Operation) error {
+	if volType != VolumeTypeCustom {
+		return fmt.Errorf("Volume type not supported")
+	}
+
+	// Create the snapshot
+	sourcePath := GetVolumeMountPath(d.name, volType, volName)
+	cephSnapPath := filepath.Join(sourcePath, ".snap", newSnapshotName)
+
+	err := os.Mkdir(cephSnapPath, 0711)
+	if err != nil {
+		return err
+	}
+
+	targetPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, newSnapshotName))
+
+	err = os.MkdirAll(filepath.Dir(targetPath), 0711)
+	if err != nil {
+		return err
+	}
+
+	err = os.Symlink(cephSnapPath, targetPath)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *cephfs) DeleteVolumeSnapshot(volType VolumeType, volName string, snapshotName string, op *operations.Operation) error {
+	if volType != VolumeTypeCustom {
+		return fmt.Errorf("Volume type not supported")
+	}
+
+	sourcePath := GetVolumeMountPath(d.name, volType, volName)
+	cephSnapPath := filepath.Join(sourcePath, ".snap", snapshotName)
+
+	err := os.RemoveAll(cephSnapPath)
+	if err != nil {
+		return err
+	}
+
+	// Get the volume ID for the parent volume, which is used to remove project quota.
+	volID, err := d.getVolID(volType, volName)
+	if err != nil {
+		return err
+	}
+
+	// Make the snapshot path a symlink
+	snapPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, snapshotName))
+
+	// Remove the project quota.
+	err = d.deleteQuota(snapPath, volID)
+	if err != nil {
+		return err
+	}
+
+	err = os.RemoveAll(snapPath)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *cephfs) RenameVolumeSnapshot(volType VolumeType, volName string, snapshotName string, newSnapshotName string, op *operations.Operation) error {
+	if volType != VolumeTypeCustom {
+		return fmt.Errorf("Volume type not supported")
+	}
+
+	sourcePath := GetVolumeMountPath(d.name, volType, volName)
+	oldCephSnapPath := filepath.Join(sourcePath, ".snap", snapshotName)
+	newCephSnapPath := filepath.Join(sourcePath, ".snap", newSnapshotName)
+
+	err := os.Rename(oldCephSnapPath, newCephSnapPath)
+	if err != nil {
+		return err
+	}
+
+	// Re-generate the snapshot symlink
+	oldPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, snapshotName))
+	err = os.Remove(oldPath)
+	if err != nil {
+		return err
+	}
+
+	newPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, newSnapshotName))
+	err = os.Symlink(newCephSnapPath, newPath)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (d *cephfs) VolumeSnapshots(volType VolumeType, volName string, op *operations.Operation) ([]string, error) {
+	if volType != VolumeTypeCustom {
+		return nil, fmt.Errorf("Volume type not supported")
+	}
+
+	snapshotDir, err := GetVolumeSnapshotDir(d.name, volType, volName)
+	if err != nil {
+		return nil, err
+	}
+
+	snapshots := []string{}
+
+	ents, err := ioutil.ReadDir(snapshotDir)
+	if err != nil {
+		// If the snapshots directory doesn't exist, there are no snapshots.
+		if os.IsNotExist(err) {
+			return snapshots, nil
+		}
+
+		return nil, err
+	}
+
+	for _, ent := range ents {
+		fileInfo, err := os.Stat(filepath.Join(snapshotDir, ent.Name()))
+		if err != nil {
+			return nil, err
+		}
+
+		if !fileInfo.IsDir() {
+			continue
+		}
+
+		snapshots = append(snapshots, ent.Name())
+	}
+
+	return snapshots, nil
+}
+
+func (d *cephfs) MigrateVolume(vol Volume, conn io.ReadWriteCloser, volSrcArgs migration.VolumeSourceArgs, op *operations.Operation) error {
+	if vol.volType != VolumeTypeCustom {
+		return fmt.Errorf("Volume type not supported")
+	}
+
+	if vol.contentType != ContentTypeFS {
+		return fmt.Errorf("Content type not supported")
+	}
+
+	if volSrcArgs.MigrationType.FSType != migration.MigrationFSType_RSYNC {
+		return fmt.Errorf("Migration type not supported")
+	}
+
+	bwlimit := d.config["rsync.bwlimit"]
+
+	for _, snapName := range volSrcArgs.Snapshots {
+		snapshot, err := vol.NewSnapshot(snapName)
+		if err != nil {
+			return err
+		}
+
+		// Send snapshot to recipient (ensure local snapshot volume is mounted if needed).
+		err = snapshot.MountTask(func(mountPath string, op *operations.Operation) error {
+			var wrapper *ioprogress.ProgressTracker
+			if volSrcArgs.TrackProgress {
+				wrapper = migration.ProgressTracker(op, "fs_progress", snapshot.name)
+			}
+
+			path := shared.AddSlash(mountPath)
+			return rsync.Send(snapshot.name, path, conn, wrapper, volSrcArgs.MigrationType.Features, bwlimit, d.state.OS.ExecPath)
+		}, op)
+		if err != nil {
+			return err
+		}
+	}
+
+	// Send volume to recipient (ensure local volume is mounted if needed).
+	return vol.MountTask(func(mountPath string, op *operations.Operation) error {
+		var wrapper *ioprogress.ProgressTracker
+		if volSrcArgs.TrackProgress {
+			wrapper = migration.ProgressTracker(op, "fs_progress", vol.name)
+		}
+
+		path := shared.AddSlash(mountPath)
+		return rsync.Send(vol.name, path, conn, wrapper, volSrcArgs.MigrationType.Features, bwlimit, d.state.OS.ExecPath)
+	}, op)
+}
+
+func (d *cephfs) CreateVolumeFromMigration(vol Volume, conn io.ReadWriteCloser, volTargetArgs migration.VolumeTargetArgs, op *operations.Operation) error {
+	return nil
+}
+
+func (d *cephfs) fsExists(clusterName string, userName string, fsName string) bool {
+	_, err := shared.RunCommand("ceph", "--name", fmt.Sprintf("client.%s", userName), "--cluster", clusterName, "fs", "get", fsName)
+	if err != nil {
+		return false
+	}
+
+	return true
+}
+
+func (d *cephfs) getConfig(clusterName string, userName string) ([]string, string, error) {
+	// Parse the CEPH configuration
+	cephConf, err := os.Open(fmt.Sprintf("/etc/ceph/%s.conf", clusterName))
+	if err != nil {
+		return nil, "", err
+	}
+
+	cephMon := []string{}
+
+	scan := bufio.NewScanner(cephConf)
+	for scan.Scan() {
+		line := scan.Text()
+		line = strings.TrimSpace(line)
+
+		if line == "" {
+			continue
+		}
+
+		if strings.HasPrefix(line, "mon_host") {
+			fields := strings.SplitN(line, "=", 2)
+			if len(fields) < 2 {
+				continue
+			}
+
+			servers := strings.Split(fields[1], ",")
+			for _, server := range servers {
+				cephMon = append(cephMon, strings.TrimSpace(server))
+			}
+			break
+		}
+	}
+
+	if len(cephMon) == 0 {
+		return nil, "", fmt.Errorf("Couldn't find a CPEH mon")
+	}
+
+	// Parse the CEPH keyring
+	cephKeyring, err := os.Open(fmt.Sprintf("/etc/ceph/%v.client.%v.keyring", clusterName, userName))
+	if err != nil {
+		return nil, "", err
+	}
+
+	var cephSecret string
+
+	scan = bufio.NewScanner(cephKeyring)
+	for scan.Scan() {
+		line := scan.Text()
+		line = strings.TrimSpace(line)
+
+		if line == "" {
+			continue
+		}
+
+		if strings.HasPrefix(line, "key") {
+			fields := strings.SplitN(line, "=", 2)
+			if len(fields) < 2 {
+				continue
+			}
+
+			cephSecret = strings.TrimSpace(fields[1])
+			break
+		}
+	}
+
+	if cephSecret == "" {
+		return nil, "", fmt.Errorf("Couldn't find a keyring entry")
+	}
+
+	return cephMon, cephSecret, nil
+}
+
+// initQuota initialises the project quota on the path. The volID generates a quota project ID.
+func (d *cephfs) initQuota(path string, volID int64) error {
+	if volID == 0 {
+		return fmt.Errorf("Missing volume ID")
+	}
+
+	ok, err := quota.Supported(path)
+	if err != nil || !ok {
+		// Skipping quota as underlying filesystem doesn't suppport project quotas.
+		return nil
+	}
+
+	err = quota.SetProject(path, d.quotaProjectID(volID))
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// setQuota sets the project quota on the path. The volID generates a quota project ID.
+func (d *cephfs) setQuota(path string, volID int64, size string) error {
+	if volID == 0 {
+		return fmt.Errorf("Missing volume ID")
+	}
+
+	// If size not specified in volume config, then use pool's default volume.size setting.
+	if size == "" || size == "0" {
+		size = d.config["volume.size"]
+	}
+
+	sizeBytes, err := units.ParseByteSizeString(size)
+	if err != nil {
+		return err
+	}
+
+	ok, err := quota.Supported(path)
+	if err != nil || !ok {
+		// Skipping quota as underlying filesystem doesn't suppport project quotas.
+		return nil
+	}
+
+	err = quota.SetProjectQuota(path, d.quotaProjectID(volID), sizeBytes)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// deleteQuota removes the project quota for a volID from a path.
+func (d *cephfs) deleteQuota(path string, volID int64) error {
+	if volID == 0 {
+		return fmt.Errorf("Missing volume ID")
+	}
+
+	ok, err := quota.Supported(path)
+	if err != nil || !ok {
+		// Skipping quota as underlying filesystem doesn't suppport project quotas.
+		return nil
+	}
+
+	err = quota.SetProject(path, 0)
+	if err != nil {
+		return err
+	}
+
+	err = quota.SetProjectQuota(path, d.quotaProjectID(volID), 0)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// quotaProjectID generates a project quota ID from a volume ID.
+func (d *cephfs) quotaProjectID(volID int64) uint32 {
+	return uint32(volID + 10000)
+}
diff --git a/lxd/storage/drivers/load.go b/lxd/storage/drivers/load.go
index a10fdfd01f..cffa6439a6 100644
--- a/lxd/storage/drivers/load.go
+++ b/lxd/storage/drivers/load.go
@@ -5,7 +5,8 @@ import (
 )
 
 var drivers = map[string]func() driver{
-	"dir": func() driver { return &dir{} },
+	"dir":    func() driver { return &dir{} },
+	"cephfs": func() driver { return &cephfs{} },
 }
 
 // Load returns a Driver for an existing low-level storage pool.


More information about the lxc-devel mailing list