[lxc-devel] [lxd/master] Add cephfs backend
monstermunchkin on Github
lxc-bot at linuxcontainers.org
Thu Oct 31 15:04:06 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20191031/ba48ba9a/attachment.bin>
-------------- next part --------------
From 07686d47246ee073bcc529473d3ecd9ee0d645b9 Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Thu, 31 Oct 2019 09:53:45 +0100
Subject: [PATCH 1/2] lxd/storage: Add locking for mount operations
Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
lxd/storage/backend_lxd.go | 24 +++++++++++++
lxd/storage/lock.go | 69 ++++++++++++++++++++++++++++++++++++++
2 files changed, 93 insertions(+)
create mode 100644 lxd/storage/lock.go
diff --git a/lxd/storage/backend_lxd.go b/lxd/storage/backend_lxd.go
index 9f91ab84d9..d7e5f560be 100644
--- a/lxd/storage/backend_lxd.go
+++ b/lxd/storage/backend_lxd.go
@@ -125,11 +125,23 @@ func (b *lxdBackend) Delete(op *operations.Operation) error {
// Mount mounts the storage pool.
func (b *lxdBackend) Mount() (bool, error) {
+ unlock := lock(getPoolMountLockID(b.name))
+ if unlock == nil {
+ return false, nil
+ }
+ defer unlock()
+
return b.driver.Mount()
}
// Unmount unmounts the storage pool.
func (b *lxdBackend) Unmount() (bool, error) {
+ unlock := lock(getPoolUmountLockID(b.name))
+ if unlock == nil {
+ return false, nil
+ }
+ defer unlock()
+
return b.driver.Unmount()
}
@@ -596,11 +608,23 @@ func (b *lxdBackend) SetCustomVolumeQuota(vol api.StorageVolume, quota uint64) e
// MountCustomVolume mounts a custom volume.
func (b *lxdBackend) MountCustomVolume(volName string, op *operations.Operation) (bool, error) {
+ unlock := lock(getVolumeMountLockID(volName))
+ if unlock == nil {
+ return false, nil
+ }
+ defer unlock()
+
return b.driver.MountVolume(drivers.VolumeTypeCustom, volName, op)
}
// UnmountCustomVolume unmounts a custom volume.
func (b *lxdBackend) UnmountCustomVolume(volName string, op *operations.Operation) (bool, error) {
+ unlock := lock(getVolumeUmountLockID(volName))
+ if unlock == nil {
+ return false, nil
+ }
+ defer unlock()
+
return b.driver.UnmountVolume(drivers.VolumeTypeCustom, volName, op)
}
diff --git a/lxd/storage/lock.go b/lxd/storage/lock.go
new file mode 100644
index 0000000000..e16a794adf
--- /dev/null
+++ b/lxd/storage/lock.go
@@ -0,0 +1,69 @@
+package storage
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/lxc/lxd/shared/logger"
+)
+
+// lxdStorageLockMap is a hashmap that allows functions to check whether the
+// operation they are about to perform is already in progress. If it is the
+// channel can be used to wait for the operation to finish. If it is not, the
+// function that wants to perform the operation should store its code in the
+// hashmap.
+// Note that any access to this map must be done while holding a lock.
+var lxdStorageOngoingOperationMap = map[string]chan bool{}
+
+// lxdStorageMapLock is used to access lxdStorageOngoingOperationMap.
+var lxdStorageMapLock sync.Mutex
+
+// The following functions are used to construct simple operation codes that are
+// unique.
+func getPoolMountLockID(poolName string) string {
+ return fmt.Sprintf("mount/pool/%s", poolName)
+}
+
+func getPoolUmountLockID(poolName string) string {
+ return fmt.Sprintf("umount/pool/%s", poolName)
+}
+
+func getVolumeMountLockID(volumeName string) string {
+ return fmt.Sprintf("mount/volume/%s", volumeName)
+}
+
+func getVolumeUmountLockID(volumeName string) string {
+ return fmt.Sprintf("umount/volume/%s", volumeName)
+}
+
+func lock(lockID string) func() {
+ lxdStorageMapLock.Lock()
+
+ if waitChannel, ok := lxdStorageOngoingOperationMap[lockID]; ok {
+ lxdStorageMapLock.Unlock()
+
+ _, ok := <-waitChannel
+ if ok {
+ logger.Warnf("Received value over semaphore, this should ot have happened")
+ }
+
+ // Give the benefit of the doubt and assume that the other
+ // thread actually succeeded in mounting the storage pool.
+ return nil
+ }
+
+ lxdStorageOngoingOperationMap[lockID] = make(chan bool)
+ lxdStorageMapLock.Unlock()
+
+ return func() {
+ lxdStorageMapLock.Lock()
+
+ waitChannel, ok := lxdStorageOngoingOperationMap[lockID]
+ if ok {
+ close(waitChannel)
+ delete(lxdStorageOngoingOperationMap, lockID)
+ }
+
+ lxdStorageMapLock.Unlock()
+ }
+}
From 0b07c363a61600d1fa76d6b7568396b5ba67160d Mon Sep 17 00:00:00 2001
From: Thomas Hipp <thomas.hipp at canonical.com>
Date: Wed, 30 Oct 2019 19:24:31 +0100
Subject: [PATCH 2/2] lxd/storage/drivers: Add cephfs
This adds the cephfs storage driver.
Signed-off-by: Thomas Hipp <thomas.hipp at canonical.com>
---
lxd/storage/drivers/driver_cephfs.go | 1013 ++++++++++++++++++++++++++
lxd/storage/drivers/load.go | 3 +-
2 files changed, 1015 insertions(+), 1 deletion(-)
create mode 100644 lxd/storage/drivers/driver_cephfs.go
diff --git a/lxd/storage/drivers/driver_cephfs.go b/lxd/storage/drivers/driver_cephfs.go
new file mode 100644
index 0000000000..26d04d91bf
--- /dev/null
+++ b/lxd/storage/drivers/driver_cephfs.go
@@ -0,0 +1,1013 @@
+package drivers
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strings"
+
+ "github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/operations"
+ "github.com/lxc/lxd/lxd/rsync"
+ "github.com/lxc/lxd/lxd/storage/quota"
+ "github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/api"
+ "github.com/lxc/lxd/shared/ioprogress"
+ "github.com/lxc/lxd/shared/units"
+)
+
+type cephfs struct {
+ common
+
+ fsName string
+ version string
+}
+
+func (d *cephfs) Info() Info {
+ if d.version == "" {
+ msg, err := shared.RunCommand("rbd", "--version")
+ if err != nil {
+ d.version = "unknown"
+ } else {
+ d.version = strings.TrimSpace(msg)
+ }
+ }
+
+ return Info{
+ Name: "cephfs",
+ Version: d.version,
+ OptimizedImages: true,
+ PreservesInodes: false,
+ Usable: true,
+ Remote: false,
+ VolumeTypes: []VolumeType{VolumeTypeCustom},
+ }
+}
+
+func (d *cephfs) HasVolume(volType VolumeType, volName string) bool {
+ if shared.PathExists(GetVolumeMountPath(d.name, volType, volName)) {
+ return true
+ }
+
+ return false
+}
+
+func (d *cephfs) Create() error {
+ if d.config["source"] == "" {
+ return fmt.Errorf("Missing required source name/path")
+ }
+
+ if d.config["cephfs.path"] != "" && d.config["cephfs.path"] != d.config["source"] {
+ return fmt.Errorf("cephfs.path must match the source")
+ }
+
+ if d.config["cephfs.cluster_name"] == "" {
+ d.config["cephfs.cluster_name"] = "ceph"
+ }
+
+ if d.config["cephfs.user.name"] != "" {
+ d.config["cephfs.user.name"] = "admin"
+ }
+
+ d.fsName = d.config["source"]
+
+ // Parse the namespace / path
+ fields := strings.SplitN(d.fsName, "/", 2)
+ fsName := fields[0]
+ fsPath := "/"
+ if len(fields) > 1 {
+ fsPath = fields[1]
+ }
+
+ // Check that the filesystem exists
+ if !d.fsExists(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"], fsName) {
+ return fmt.Errorf("The requested '%v' CEPHFS doesn't exist", fsName)
+ }
+
+ // Create a temporary mountpoint
+ mountPath, err := ioutil.TempDir("", "lxd_cephfs_")
+ if err != nil {
+ return err
+ }
+ defer os.RemoveAll(mountPath)
+
+ err = os.Chmod(mountPath, 0700)
+ if err != nil {
+ return err
+ }
+
+ mountPoint := filepath.Join(mountPath, "mount")
+
+ err = os.Mkdir(mountPoint, 0700)
+ if err != nil {
+ return err
+ }
+
+ // Get the credentials and host
+ monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user_name"])
+ if err != nil {
+ return err
+ }
+
+ connected := false
+ for _, monAddress := range monAddresses {
+ uri := fmt.Sprintf("%s:6789:/", monAddress)
+ err = tryMount(uri, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user_name"], userSecret, fsName))
+ if err != nil {
+ continue
+ }
+
+ connected = true
+ defer forceUnmount(mountPoint)
+ break
+ }
+
+ if !connected {
+ return err
+ }
+
+ // Create the path if missing
+ err = os.MkdirAll(filepath.Join(mountPoint, fsPath), 0755)
+ if err != nil {
+ return err
+ }
+
+ // Check that the existing path is empty
+ ok, _ := shared.PathIsEmpty(filepath.Join(mountPoint, fsPath))
+ if !ok {
+ return fmt.Errorf("Only empty CEPHFS paths can be used as a LXD storage pool")
+ }
+
+ return nil
+}
+
+func (d *cephfs) Delete(op *operations.Operation) error {
+ // Parse the namespace / path
+ fields := strings.SplitN(d.fsName, "/", 2)
+ fsName := fields[0]
+ fsPath := "/"
+ if len(fields) > 1 {
+ fsPath = fields[1]
+ }
+
+ // Create a temporary mountpoint
+ mountPath, err := ioutil.TempDir("", "lxd_cephfs_")
+ if err != nil {
+ return err
+ }
+ defer os.RemoveAll(mountPath)
+
+ err = os.Chmod(mountPath, 0700)
+ if err != nil {
+ return err
+ }
+
+ mountPoint := filepath.Join(mountPath, "mount")
+ err = os.Mkdir(mountPoint, 0700)
+ if err != nil {
+ return err
+ }
+
+ // Get the credentials and host
+ monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user_name"])
+ if err != nil {
+ return err
+ }
+
+ connected := false
+ for _, monAddress := range monAddresses {
+ uri := fmt.Sprintf("%s:6789:/", monAddress)
+ err = tryMount(uri, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user_name"], userSecret, fsName))
+ if err != nil {
+ continue
+ }
+
+ connected = true
+ defer forceUnmount(mountPoint)
+ break
+ }
+
+ if !connected {
+ return err
+ }
+
+ if shared.PathExists(filepath.Join(mountPoint, fsPath)) {
+ // Delete the usual directories
+ for _, dir := range []string{"custom", "custom-snapshots"} {
+ if shared.PathExists(filepath.Join(mountPoint, fsPath, dir)) {
+ err = os.Remove(filepath.Join(mountPoint, fsPath, dir))
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ // Confirm that the path is now empty
+ ok, _ := shared.PathIsEmpty(filepath.Join(mountPoint, fsPath))
+ if !ok {
+ return fmt.Errorf("Only empty CEPHFS paths can be used as a LXD storage pool")
+ }
+
+ // Delete the path itself
+ if fsPath != "" && fsPath != "/" {
+ err = os.Remove(filepath.Join(mountPoint, fsPath))
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ // On delete, wipe everything in the directory.
+ err = wipeDirectory(GetPoolMountPath(d.name))
+ if err != nil {
+ return err
+ }
+
+ // Make sure the existing pool is unmounted
+ _, err = d.Unmount()
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (d *cephfs) Mount() (bool, error) {
+ // Parse the namespace / path
+ fields := strings.SplitN(d.fsName, "/", 2)
+ fsName := fields[0]
+ fsPath := "/"
+ if len(fields) > 1 {
+ fsPath = fields[1]
+ }
+
+ // Get the credentials and host
+ monAddresses, secret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user_name"])
+ if err != nil {
+ return false, err
+ }
+
+ // Do the actual mount
+ connected := false
+ for _, monAddress := range monAddresses {
+ uri := fmt.Sprintf("%s:6789:/%s", monAddress, fsPath)
+ err = tryMount(uri, GetPoolMountPath(d.name), "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user_name"], secret, fsName))
+ if err != nil {
+ continue
+ }
+
+ connected = true
+ break
+ }
+
+ if !connected {
+ return false, err
+ }
+
+ return true, nil
+}
+
+func (d *cephfs) Unmount() (bool, error) {
+ return forceUnmount(GetPoolMountPath(d.name))
+}
+
+func (d *cephfs) GetResources() (*api.ResourcesStoragePool, error) {
+ // Use the generic VFS resources.
+ return vfsResources(GetPoolMountPath(d.name))
+}
+
+func (d *cephfs) ValidateVolume(volConfig map[string]string, removeUnknownKeys bool) error {
+ return d.validateVolume(volConfig, nil, removeUnknownKeys)
+}
+
+func (d *cephfs) CreateVolume(vol Volume, filler func(path string) error, op *operations.Operation) error {
+ if vol.volType != VolumeTypeCustom {
+ return fmt.Errorf("Volume type not supported")
+ }
+
+ if vol.contentType != ContentTypeFS {
+ return fmt.Errorf("Content type not supported")
+ }
+
+ volPath := vol.MountPath()
+
+ // Get the volume ID for the new volume, which is used to set project quota.
+ volID, err := d.getVolID(vol.volType, vol.name)
+ if err != nil {
+ return err
+ }
+
+ err = os.MkdirAll(volPath, 0711)
+ if err != nil {
+ return err
+ }
+
+ revertPath := true
+ defer func() {
+ if revertPath {
+ d.deleteQuota(volPath, volID)
+ os.RemoveAll(volPath)
+ }
+ }()
+
+ // Initialise the volume's quota using the volume ID.
+ err = d.initQuota(volPath, volID)
+ if err != nil {
+ return err
+ }
+
+ // Set the quota if specified in volConfig or pool config.
+ err = d.setQuota(volPath, volID, vol.config["size"])
+ if err != nil {
+ return err
+ }
+
+ if filler != nil {
+ err = filler(volPath)
+ if err != nil {
+ return err
+ }
+ }
+
+ revertPath = false
+ return nil
+}
+
+func (d *cephfs) CreateVolumeFromCopy(vol Volume, srcVol Volume, copySnapshots bool, op *operations.Operation) error {
+ if vol.volType != VolumeTypeCustom || srcVol.volType != VolumeTypeCustom {
+ return fmt.Errorf("Volume type not supported")
+ }
+
+ if vol.contentType != ContentTypeFS || srcVol.contentType != ContentTypeFS {
+ return fmt.Errorf("Content type not supported")
+ }
+
+ bwlimit := d.config["rsync.bwlimit"]
+
+ // Get the volume ID for the new volumes, which is used to set project quota.
+ volID, err := d.getVolID(vol.volType, vol.name)
+ if err != nil {
+ return err
+ }
+
+ // Create slice of paths created if revert needed later.
+ revertPaths := []string{}
+ defer func() {
+ // Remove any paths created if we are reverting.
+ for _, path := range revertPaths {
+ d.deleteQuota(path, volID)
+ os.RemoveAll(path)
+ }
+ }()
+
+ if copySnapshots && !srcVol.IsSnapshot() {
+ srcSnapshots, err := srcVol.Snapshots(op)
+ if err != nil {
+ return err
+ }
+
+ for _, srcSnapshot := range srcSnapshots {
+ _, snapName, _ := shared.ContainerGetParentAndSnapshotName(srcSnapshot.name)
+ dstSnapshot, err := vol.NewSnapshot(snapName)
+ if err != nil {
+ return err
+ }
+
+ dstSnapPath := dstSnapshot.MountPath()
+ err = os.MkdirAll(dstSnapPath, 0711)
+ if err != nil {
+ return err
+ }
+
+ revertPaths = append(revertPaths, dstSnapPath)
+
+ // Initialise the snapshot's quota with the parent volume's ID.
+ err = d.initQuota(dstSnapPath, volID)
+ if err != nil {
+ return err
+ }
+
+ err = srcSnapshot.MountTask(func(srcMountPath string, op *operations.Operation) error {
+ return dstSnapshot.MountTask(func(dstMountPath string, op *operations.Operation) error {
+ _, err = rsync.LocalCopy(srcMountPath, dstMountPath, bwlimit, true)
+ if err != nil {
+ return err
+ }
+
+ cephSnapPath := filepath.Join(dstMountPath, ".snap", snapName)
+ err := os.Mkdir(cephSnapPath, 0711)
+ if err != nil {
+ return err
+ }
+
+ // Make the snapshot path a symlink
+ targetPath := GetVolumeMountPath(d.name, VolumeTypeCustom, GetSnapshotVolumeName(vol.name, snapName))
+ err = os.MkdirAll(filepath.Dir(targetPath), 0711)
+ if err != nil {
+ return err
+ }
+
+ return os.Symlink(cephSnapPath, targetPath)
+ }, op)
+ }, op)
+ }
+ }
+
+ volPath := vol.MountPath()
+ err = os.MkdirAll(volPath, 0711)
+ if err != nil {
+ return err
+ }
+
+ revertPaths = append(revertPaths, volPath)
+
+ // Initialise the volume's quota using the volume ID.
+ err = d.initQuota(volPath, volID)
+ if err != nil {
+ return err
+ }
+
+ // Set the quota if specified in volConfig or pool config.
+ err = d.setQuota(volPath, volID, vol.config["size"])
+ if err != nil {
+ return err
+ }
+
+ // Copy source to destination (mounting each volume if needed).
+ err = srcVol.MountTask(func(srcMountPath string, op *operations.Operation) error {
+ return vol.MountTask(func(dstMountPath string, op *operations.Operation) error {
+ _, err := rsync.LocalCopy(srcMountPath, dstMountPath, bwlimit, true)
+ if err != nil {
+ return err
+ }
+
+ if vol.IsSnapshot() {
+ _, snapName, _ := shared.ContainerGetParentAndSnapshotName(vol.name)
+
+ cephSnapPath := filepath.Join(dstMountPath, ".snap", snapName)
+ err := os.Mkdir(cephSnapPath, 0711)
+ if err != nil {
+ return err
+ }
+
+ // Make the snapshot path a symlink
+ targetPath := GetVolumeMountPath(d.name, VolumeTypeCustom, fmt.Sprintf("%s/%s", vol.name, snapName))
+ err = os.MkdirAll(filepath.Dir(targetPath), 0711)
+ if err != nil {
+ return err
+ }
+
+ return os.Symlink(cephSnapPath, targetPath)
+ }
+
+ return nil
+ }, op)
+ }, op)
+ if err != nil {
+ return err
+ }
+
+ revertPaths = nil // Don't revert.
+ return nil
+}
+
+func (d *cephfs) DeleteVolume(volType VolumeType, volName string, op *operations.Operation) error {
+ if volType != VolumeTypeCustom {
+ return fmt.Errorf("Volume type not supported")
+ }
+
+ snapshots, err := d.VolumeSnapshots(volType, volName, op)
+ if err != nil {
+ return err
+ }
+
+ if len(snapshots) > 0 {
+ return fmt.Errorf("Cannot remove a volume that has snapshots")
+ }
+
+ volPath := GetVolumeMountPath(d.name, volType, volName)
+
+ // If the volume doesn't exist, then nothing more to do.
+ if !shared.PathExists(volPath) {
+ return nil
+ }
+
+ // Get the volume ID for the volume, which is used to remove project quota.
+ volID, err := d.getVolID(volType, volName)
+ if err != nil {
+ return err
+ }
+
+ // Remove the project quota.
+ err = d.deleteQuota(volPath, volID)
+ if err != nil {
+ return err
+ }
+
+ // Remove the volume from the storage device.
+ err = os.RemoveAll(volPath)
+ if err != nil {
+ return err
+ }
+
+ // Although the volume snapshot directory should already be removed, lets remove it here
+ // to just in case the top-level directory is left.
+ snapshotDir, err := GetVolumeSnapshotDir(d.name, volType, volName)
+ if err != nil {
+ return err
+ }
+
+ err = os.RemoveAll(snapshotDir)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (d *cephfs) RenameVolume(volType VolumeType, volName string, newName string, op *operations.Operation) error {
+ if volType != VolumeTypeCustom {
+ return fmt.Errorf("Volume type not supported")
+ }
+
+ vol := NewVolume(d, d.name, volType, ContentTypeFS, volName, nil)
+
+ // Create new snapshots directory.
+ snapshotDir, err := GetVolumeSnapshotDir(d.name, volType, newName)
+ if err != nil {
+ return err
+ }
+
+ err = os.MkdirAll(snapshotDir, 0711)
+ if err != nil {
+ return err
+ }
+
+ type volRevert struct {
+ oldPath string
+ newPath string
+ isSymlink bool
+ }
+
+ // Create slice to record paths renamed if revert needed later.
+ revertPaths := []volRevert{}
+ defer func() {
+ // Remove any paths rename if we are reverting.
+ for _, vol := range revertPaths {
+ if vol.isSymlink {
+ os.Symlink(vol.oldPath, vol.newPath)
+ } else {
+ os.Rename(vol.newPath, vol.oldPath)
+ }
+ }
+
+ // Remove the new snapshot directory if we are reverting.
+ if len(revertPaths) > 0 {
+ err = os.RemoveAll(snapshotDir)
+ }
+ }()
+
+ // Rename any snapshots of the volume too.
+ snapshots, err := vol.Snapshots(op)
+ if err != nil {
+ return err
+ }
+
+ for _, snapshot := range snapshots {
+ srcSnapshotDir, err := GetVolumeSnapshotDir(d.name, volType, volName)
+ if err != nil {
+ return err
+ }
+
+ targetSnapshotDir, err := GetVolumeSnapshotDir(d.name, volType, newName)
+ if err != nil {
+ return err
+ }
+
+ err = os.Rename(srcSnapshotDir, targetSnapshotDir)
+ if err != nil {
+ return err
+ }
+
+ sourcePath := GetVolumeMountPath(d.name, volType, newName)
+ targetPath := GetVolumeMountPath(d.name, volType, newName)
+ _, snapName, _ := shared.ContainerGetParentAndSnapshotName(snapshot.name)
+
+ oldCephSnapPath := filepath.Join(sourcePath, ".snap", snapName)
+ newCephSnapPath := filepath.Join(targetPath, ".snap", snapName)
+
+ oldPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, snapName))
+ newPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(newName, snapName))
+
+ err = os.Symlink(newCephSnapPath, newPath)
+ if err != nil {
+ return err
+ }
+
+ revertPaths = append(revertPaths, volRevert{
+ oldPath: oldPath,
+ newPath: oldCephSnapPath,
+ isSymlink: true,
+ })
+
+ revertPaths = append(revertPaths, volRevert{
+ oldPath: srcSnapshotDir,
+ newPath: targetSnapshotDir,
+ })
+ }
+
+ oldPath := GetVolumeMountPath(d.name, volType, volName)
+ newPath := GetVolumeMountPath(d.name, volType, newName)
+ err = os.Rename(oldPath, newPath)
+ if err != nil {
+ return err
+ }
+
+ revertPaths = append(revertPaths, volRevert{
+ oldPath: oldPath,
+ newPath: newPath,
+ })
+
+ revertPaths = nil
+ return nil
+}
+
+func (d *cephfs) MountVolume(volType VolumeType, volName string, op *operations.Operation) (bool, error) {
+ if volType != VolumeTypeCustom {
+ return false, fmt.Errorf("Volume type not supported")
+ }
+
+ return false, nil
+}
+
+func (d *cephfs) MountVolumeSnapshot(volType VolumeType, VolName, snapshotName string, op *operations.Operation) (bool, error) {
+ if volType != VolumeTypeCustom {
+ return false, fmt.Errorf("Volume type not supported")
+ }
+
+ return false, nil
+}
+
+func (d *cephfs) UnmountVolume(volType VolumeType, volName string, op *operations.Operation) (bool, error) {
+ if volType != VolumeTypeCustom {
+ return false, fmt.Errorf("Volume type not supported")
+ }
+
+ return false, nil
+}
+
+func (d *cephfs) UnmountVolumeSnapshot(volType VolumeType, volName, snapshotName string, op *operations.Operation) (bool, error) {
+ if volType != VolumeTypeCustom {
+ return false, fmt.Errorf("Volume type not supported")
+ }
+
+ return false, nil
+}
+
+func (d *cephfs) CreateVolumeSnapshot(volType VolumeType, volName string, newSnapshotName string, op *operations.Operation) error {
+ if volType != VolumeTypeCustom {
+ return fmt.Errorf("Volume type not supported")
+ }
+
+ // Create the snapshot
+ sourcePath := GetVolumeMountPath(d.name, volType, volName)
+ cephSnapPath := filepath.Join(sourcePath, ".snap", newSnapshotName)
+
+ err := os.Mkdir(cephSnapPath, 0711)
+ if err != nil {
+ return err
+ }
+
+ targetPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, newSnapshotName))
+
+ err = os.MkdirAll(filepath.Dir(targetPath), 0711)
+ if err != nil {
+ return err
+ }
+
+ err = os.Symlink(cephSnapPath, targetPath)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (d *cephfs) DeleteVolumeSnapshot(volType VolumeType, volName string, snapshotName string, op *operations.Operation) error {
+ if volType != VolumeTypeCustom {
+ return fmt.Errorf("Volume type not supported")
+ }
+
+ sourcePath := GetVolumeMountPath(d.name, volType, volName)
+ cephSnapPath := filepath.Join(sourcePath, ".snap", snapshotName)
+
+ err := os.RemoveAll(cephSnapPath)
+ if err != nil {
+ return err
+ }
+
+ // Get the volume ID for the parent volume, which is used to remove project quota.
+ volID, err := d.getVolID(volType, volName)
+ if err != nil {
+ return err
+ }
+
+ // Make the snapshot path a symlink
+ snapPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, snapshotName))
+
+ // Remove the project quota.
+ err = d.deleteQuota(snapPath, volID)
+ if err != nil {
+ return err
+ }
+
+ err = os.RemoveAll(snapPath)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (d *cephfs) RenameVolumeSnapshot(volType VolumeType, volName string, snapshotName string, newSnapshotName string, op *operations.Operation) error {
+ if volType != VolumeTypeCustom {
+ return fmt.Errorf("Volume type not supported")
+ }
+
+ sourcePath := GetVolumeMountPath(d.name, volType, volName)
+ oldCephSnapPath := filepath.Join(sourcePath, ".snap", snapshotName)
+ newCephSnapPath := filepath.Join(sourcePath, ".snap", newSnapshotName)
+
+ err := os.Rename(oldCephSnapPath, newCephSnapPath)
+ if err != nil {
+ return err
+ }
+
+ // Re-generate the snapshot symlink
+ oldPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, snapshotName))
+ err = os.Remove(oldPath)
+ if err != nil {
+ return err
+ }
+
+ newPath := GetVolumeMountPath(d.name, volType, GetSnapshotVolumeName(volName, newSnapshotName))
+ err = os.Symlink(newCephSnapPath, newPath)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (d *cephfs) VolumeSnapshots(volType VolumeType, volName string, op *operations.Operation) ([]string, error) {
+ if volType != VolumeTypeCustom {
+ return nil, fmt.Errorf("Volume type not supported")
+ }
+
+ snapshotDir, err := GetVolumeSnapshotDir(d.name, volType, volName)
+ if err != nil {
+ return nil, err
+ }
+
+ snapshots := []string{}
+
+ ents, err := ioutil.ReadDir(snapshotDir)
+ if err != nil {
+ // If the snapshots directory doesn't exist, there are no snapshots.
+ if os.IsNotExist(err) {
+ return snapshots, nil
+ }
+
+ return nil, err
+ }
+
+ for _, ent := range ents {
+ fileInfo, err := os.Stat(filepath.Join(snapshotDir, ent.Name()))
+ if err != nil {
+ return nil, err
+ }
+
+ if !fileInfo.IsDir() {
+ continue
+ }
+
+ snapshots = append(snapshots, ent.Name())
+ }
+
+ return snapshots, nil
+}
+
+func (d *cephfs) MigrateVolume(vol Volume, conn io.ReadWriteCloser, volSrcArgs migration.VolumeSourceArgs, op *operations.Operation) error {
+ if vol.volType != VolumeTypeCustom {
+ return fmt.Errorf("Volume type not supported")
+ }
+
+ if vol.contentType != ContentTypeFS {
+ return fmt.Errorf("Content type not supported")
+ }
+
+ if volSrcArgs.MigrationType.FSType != migration.MigrationFSType_RSYNC {
+ return fmt.Errorf("Migration type not supported")
+ }
+
+ bwlimit := d.config["rsync.bwlimit"]
+
+ for _, snapName := range volSrcArgs.Snapshots {
+ snapshot, err := vol.NewSnapshot(snapName)
+ if err != nil {
+ return err
+ }
+
+ // Send snapshot to recipient (ensure local snapshot volume is mounted if needed).
+ err = snapshot.MountTask(func(mountPath string, op *operations.Operation) error {
+ var wrapper *ioprogress.ProgressTracker
+ if volSrcArgs.TrackProgress {
+ wrapper = migration.ProgressTracker(op, "fs_progress", snapshot.name)
+ }
+
+ path := shared.AddSlash(mountPath)
+ return rsync.Send(snapshot.name, path, conn, wrapper, volSrcArgs.MigrationType.Features, bwlimit, d.state.OS.ExecPath)
+ }, op)
+ if err != nil {
+ return err
+ }
+ }
+
+ // Send volume to recipient (ensure local volume is mounted if needed).
+ return vol.MountTask(func(mountPath string, op *operations.Operation) error {
+ var wrapper *ioprogress.ProgressTracker
+ if volSrcArgs.TrackProgress {
+ wrapper = migration.ProgressTracker(op, "fs_progress", vol.name)
+ }
+
+ path := shared.AddSlash(mountPath)
+ return rsync.Send(vol.name, path, conn, wrapper, volSrcArgs.MigrationType.Features, bwlimit, d.state.OS.ExecPath)
+ }, op)
+}
+
+func (d *cephfs) CreateVolumeFromMigration(vol Volume, conn io.ReadWriteCloser, volTargetArgs migration.VolumeTargetArgs, op *operations.Operation) error {
+ return nil
+}
+
+func (d *cephfs) fsExists(clusterName string, userName string, fsName string) bool {
+ _, err := shared.RunCommand("ceph", "--name", fmt.Sprintf("client.%s", userName), "--cluster", clusterName, "fs", "get", fsName)
+ if err != nil {
+ return false
+ }
+
+ return true
+}
+
+func (d *cephfs) getConfig(clusterName string, userName string) ([]string, string, error) {
+ // Parse the CEPH configuration
+ cephConf, err := os.Open(fmt.Sprintf("/etc/ceph/%s.conf", clusterName))
+ if err != nil {
+ return nil, "", err
+ }
+
+ cephMon := []string{}
+
+ scan := bufio.NewScanner(cephConf)
+ for scan.Scan() {
+ line := scan.Text()
+ line = strings.TrimSpace(line)
+
+ if line == "" {
+ continue
+ }
+
+ if strings.HasPrefix(line, "mon_host") {
+ fields := strings.SplitN(line, "=", 2)
+ if len(fields) < 2 {
+ continue
+ }
+
+ servers := strings.Split(fields[1], ",")
+ for _, server := range servers {
+ cephMon = append(cephMon, strings.TrimSpace(server))
+ }
+ break
+ }
+ }
+
+ if len(cephMon) == 0 {
+ return nil, "", fmt.Errorf("Couldn't find a CPEH mon")
+ }
+
+ // Parse the CEPH keyring
+ cephKeyring, err := os.Open(fmt.Sprintf("/etc/ceph/%v.client.%v.keyring", clusterName, userName))
+ if err != nil {
+ return nil, "", err
+ }
+
+ var cephSecret string
+
+ scan = bufio.NewScanner(cephKeyring)
+ for scan.Scan() {
+ line := scan.Text()
+ line = strings.TrimSpace(line)
+
+ if line == "" {
+ continue
+ }
+
+ if strings.HasPrefix(line, "key") {
+ fields := strings.SplitN(line, "=", 2)
+ if len(fields) < 2 {
+ continue
+ }
+
+ cephSecret = strings.TrimSpace(fields[1])
+ break
+ }
+ }
+
+ if cephSecret == "" {
+ return nil, "", fmt.Errorf("Couldn't find a keyring entry")
+ }
+
+ return cephMon, cephSecret, nil
+}
+
+// initQuota initialises the project quota on the path. The volID generates a quota project ID.
+func (d *cephfs) initQuota(path string, volID int64) error {
+ if volID == 0 {
+ return fmt.Errorf("Missing volume ID")
+ }
+
+ ok, err := quota.Supported(path)
+ if err != nil || !ok {
+ // Skipping quota as underlying filesystem doesn't suppport project quotas.
+ return nil
+ }
+
+ err = quota.SetProject(path, d.quotaProjectID(volID))
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// setQuota sets the project quota on the path. The volID generates a quota project ID.
+func (d *cephfs) setQuota(path string, volID int64, size string) error {
+ if volID == 0 {
+ return fmt.Errorf("Missing volume ID")
+ }
+
+ // If size not specified in volume config, then use pool's default volume.size setting.
+ if size == "" || size == "0" {
+ size = d.config["volume.size"]
+ }
+
+ sizeBytes, err := units.ParseByteSizeString(size)
+ if err != nil {
+ return err
+ }
+
+ ok, err := quota.Supported(path)
+ if err != nil || !ok {
+ // Skipping quota as underlying filesystem doesn't suppport project quotas.
+ return nil
+ }
+
+ err = quota.SetProjectQuota(path, d.quotaProjectID(volID), sizeBytes)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// deleteQuota removes the project quota for a volID from a path.
+func (d *cephfs) deleteQuota(path string, volID int64) error {
+ if volID == 0 {
+ return fmt.Errorf("Missing volume ID")
+ }
+
+ ok, err := quota.Supported(path)
+ if err != nil || !ok {
+ // Skipping quota as underlying filesystem doesn't suppport project quotas.
+ return nil
+ }
+
+ err = quota.SetProject(path, 0)
+ if err != nil {
+ return err
+ }
+
+ err = quota.SetProjectQuota(path, d.quotaProjectID(volID), 0)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// quotaProjectID generates a project quota ID from a volume ID.
+func (d *cephfs) quotaProjectID(volID int64) uint32 {
+ return uint32(volID + 10000)
+}
diff --git a/lxd/storage/drivers/load.go b/lxd/storage/drivers/load.go
index a10fdfd01f..cffa6439a6 100644
--- a/lxd/storage/drivers/load.go
+++ b/lxd/storage/drivers/load.go
@@ -5,7 +5,8 @@ import (
)
var drivers = map[string]func() driver{
- "dir": func() driver { return &dir{} },
+ "dir": func() driver { return &dir{} },
+ "cephfs": func() driver { return &cephfs{} },
}
// Load returns a Driver for an existing low-level storage pool.
More information about the lxc-devel
mailing list