[lxc-devel] [lxd/master] lxd/storage/cephfs: Initial support

stgraber on Github lxc-bot at linuxcontainers.org
Mon May 13 14:05:42 UTC 2019


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 354 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190513/b0ad4dd6/attachment-0001.bin>
-------------- next part --------------
From a4c7b64add8084f13cb55fe494887a0743118448 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Sun, 12 May 2019 22:44:45 +0200
Subject: [PATCH] lxd/storage/cephfs: Initial support
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 doc/storage.md                |   8 +
 lxd/storage.go                |  25 +-
 lxd/storage_cephfs.go         | 956 ++++++++++++++++++++++++++++++++++
 lxd/storage_pools_config.go   |  10 +-
 lxd/storage_pools_utils.go    |   2 +-
 lxd/storage_volumes_config.go |  11 +-
 6 files changed, 1006 insertions(+), 6 deletions(-)
 create mode 100644 lxd/storage_cephfs.go

diff --git a/doc/storage.md b/doc/storage.md
index 2b4f287133..e00c3289f5 100644
--- a/doc/storage.md
+++ b/doc/storage.md
@@ -16,6 +16,9 @@ ceph.osd.pg\_num                | string    | ceph driver
 ceph.osd.pool\_name             | string    | ceph driver                       | name of the pool           | storage\_driver\_ceph              | Name of the osd storage pool.
 ceph.rbd.clone\_copy            | string    | ceph driver                       | true                       | storage\_driver\_ceph              | Whether to use RBD lightweight clones rather than full dataset copies.
 ceph.user.name                  | string    | ceph driver                       | admin                      | storage\_ceph\_user\_name          | The ceph user to use when creating storage pools and volumes.
+cephfs.cluster\_name            | string    | cephfs driver                     | ceph                       | storage\_driver\_cephfs            | Name of the ceph cluster in which to create new storage pools.
+cephfs.path                     | string    | cephfs driver                     | /                          | storage\_driver\_cephfs            | The base path for the CEPH fs mount
+cephfs.user.name                | string    | cephfs driver                     | admin                      | storage\_driver\_cephfs            | The ceph user to use when creating storage pools and volumes.
 lvm.thinpool\_name              | string    | lvm driver                        | LXDThinPool                | storage                            | Thin pool where images and containers are created.
 lvm.use\_thinpool               | bool      | lvm driver                        | true                       | storage\_lvm\_use\_thinpool        | Whether the storage pool uses a thinpool for logical volumes.
 lvm.vg\_name                    | string    | lvm driver                        | name of the pool           | storage                            | Name of the volume group to create.
@@ -228,6 +231,11 @@ lxc storage create pool1 ceph ceph.osd.pool\_name=my-osd
 lxc storage create pool1 ceph source=my-already-existing-osd
 ```
 
+### CEPHFS
+
+ - Can only be used for custom storage volumes
+ - Supports snapshots if enabled on the server side
+
 ### Btrfs
 
  - Uses a subvolume per container, image and snapshot, creating btrfs snapshots when creating a new object.
diff --git a/lxd/storage.go b/lxd/storage.go
index f8c7e70c45..e67d3ee9db 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -83,13 +83,14 @@ type storageType int
 const (
 	storageTypeBtrfs storageType = iota
 	storageTypeCeph
+	storageTypeCephFs
 	storageTypeDir
 	storageTypeLvm
 	storageTypeMock
 	storageTypeZfs
 )
 
-var supportedStoragePoolDrivers = []string{"btrfs", "ceph", "dir", "lvm", "zfs"}
+var supportedStoragePoolDrivers = []string{"btrfs", "ceph", "cephfs", "dir", "lvm", "zfs"}
 
 func storageTypeToString(sType storageType) (string, error) {
 	switch sType {
@@ -97,6 +98,8 @@ func storageTypeToString(sType storageType) (string, error) {
 		return "btrfs", nil
 	case storageTypeCeph:
 		return "ceph", nil
+	case storageTypeCephFs:
+		return "cephfs", nil
 	case storageTypeDir:
 		return "dir", nil
 	case storageTypeLvm:
@@ -116,6 +119,8 @@ func storageStringToType(sName string) (storageType, error) {
 		return storageTypeBtrfs, nil
 	case "ceph":
 		return storageTypeCeph, nil
+	case "cephfs":
+		return storageTypeCephFs, nil
 	case "dir":
 		return storageTypeDir, nil
 	case "lvm":
@@ -266,6 +271,13 @@ func storageCoreInit(driver string) (storage, error) {
 			return nil, err
 		}
 		return &ceph, nil
+	case storageTypeCephFs:
+		cephfs := storageCephFs{}
+		err = cephfs.StorageCoreInit()
+		if err != nil {
+			return nil, err
+		}
+		return &cephfs, nil
 	case storageTypeLvm:
 		lvm := storageLvm{}
 		err = lvm.StorageCoreInit()
@@ -356,6 +368,17 @@ func storageInit(s *state.State, project, poolName, volumeName string, volumeTyp
 			return nil, err
 		}
 		return &ceph, nil
+	case storageTypeCephFs:
+		cephfs := storageCephFs{}
+		cephfs.poolID = poolID
+		cephfs.pool = pool
+		cephfs.volume = volume
+		cephfs.s = s
+		err = cephfs.StoragePoolInit()
+		if err != nil {
+			return nil, err
+		}
+		return &cephfs, nil
 	case storageTypeLvm:
 		lvm := storageLvm{}
 		lvm.poolID = poolID
diff --git a/lxd/storage_cephfs.go b/lxd/storage_cephfs.go
new file mode 100644
index 0000000000..66cc52dc48
--- /dev/null
+++ b/lxd/storage_cephfs.go
@@ -0,0 +1,956 @@
+package main
+
+import (
+	"bufio"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"strings"
+	"syscall"
+
+	"github.com/gorilla/websocket"
+	"github.com/pkg/errors"
+
+	"github.com/lxc/lxd/lxd/migration"
+	"github.com/lxc/lxd/lxd/state"
+	"github.com/lxc/lxd/shared"
+	"github.com/lxc/lxd/shared/api"
+	"github.com/lxc/lxd/shared/ioprogress"
+	"github.com/lxc/lxd/shared/logger"
+)
+
+type storageCephFs struct {
+	ClusterName string
+	FsName      string
+	UserName    string
+	storageShared
+}
+
+func (s *storageCephFs) StorageCoreInit() error {
+	s.sType = storageTypeCeph
+	typeName, err := storageTypeToString(s.sType)
+	if err != nil {
+		return err
+	}
+	s.sTypeName = typeName
+
+	if cephVersion != "" {
+		s.sTypeVersion = cephVersion
+		return nil
+	}
+
+	msg, err := shared.RunCommand("rbd", "--version")
+	if err != nil {
+		return fmt.Errorf("Error getting CEPH version: %s", err)
+	}
+	s.sTypeVersion = strings.TrimSpace(msg)
+	cephVersion = s.sTypeVersion
+
+	return nil
+}
+
+func (s *storageCephFs) StoragePoolInit() error {
+	var err error
+
+	err = s.StorageCoreInit()
+	if err != nil {
+		return errors.Wrap(err, "Storage pool init")
+	}
+
+	// set cluster name
+	if s.pool.Config["cephfs.cluster_name"] != "" {
+		s.ClusterName = s.pool.Config["cephfs.cluster_name"]
+	} else {
+		s.ClusterName = "ceph"
+	}
+
+	// set ceph user name
+	if s.pool.Config["cephfs.user.name"] != "" {
+		s.UserName = s.pool.Config["cephfs.user.name"]
+	} else {
+		s.UserName = "admin"
+	}
+
+	// set osd pool name
+	if s.pool.Config["ceph.fs.name"] != "" {
+		s.FsName = s.pool.Config["ceph.fs.name"]
+	}
+
+	return nil
+}
+
+// Initialize a full storage interface.
+func (s *storageCephFs) StoragePoolCheck() error {
+	return nil
+}
+
+func (s *storageCephFs) StoragePoolCreate() error {
+	logger.Infof(`Creating CEPHFS storage pool "%s" in cluster "%s"`, s.pool.Name, s.ClusterName)
+
+	// Setup config
+	s.pool.Config["volatile.initial_source"] = s.pool.Config["source"]
+
+	if s.pool.Config["source"] == "" {
+		return fmt.Errorf("A ceph fs name OR name/path source is required")
+	}
+
+	if s.pool.Config["ceph.fs.name"] != "" && s.pool.Config["ceph.fs.name"] != s.pool.Config["source"] {
+		return fmt.Errorf("ceph.fs.name must match the source")
+	}
+
+	if s.pool.Config["ceph.cluster_name"] == "" {
+		s.pool.Config["ceph.cluster_name"] = "ceph"
+	}
+
+	if s.pool.Config["cephfs.user.name"] != "" {
+		s.pool.Config["cephfs.user.name"] = "admin"
+	}
+
+	s.pool.Config["ceph.fs.name"] = s.pool.Config["source"]
+	s.FsName = s.pool.Config["source"]
+
+	// Parse the namespace / path
+	fields := strings.SplitN(s.FsName, "/", 2)
+	fsName := fields[0]
+	fsPath := "/"
+	if len(fields) > 1 {
+		fsPath = fields[1]
+	}
+
+	// Check that the filesystem exists
+	if !cephFsExists(s.ClusterName, s.UserName, fsName) {
+		return fmt.Errorf("The requested '%v' CEPH fs doesn't exist", fsName)
+	}
+
+	// Create the path if needed
+	mountPath, err := ioutil.TempDir("", "lxd_cephfs_")
+	if err != nil {
+		return err
+	}
+	defer os.RemoveAll(mountPath)
+
+	err = os.Chmod(mountPath, 0700)
+	if err != nil {
+		return err
+	}
+
+	mountPoint := filepath.Join(mountPath, "mount")
+	err = os.Mkdir(mountPoint, 0700)
+	if err != nil {
+		return err
+	}
+
+	// Get the credentials and host
+	monAddress, userSecret, err := cephFsConfig(s.ClusterName, s.UserName)
+	if err != nil {
+		return err
+	}
+
+	uri := fmt.Sprintf("%s:/", monAddress)
+	err = tryMount(uri, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", s.UserName, userSecret, fsName))
+	if err != nil {
+		return err
+	}
+	defer tryUnmount(mountPoint, syscall.MNT_DETACH)
+
+	// Check that the existing path is empty
+	err = os.MkdirAll(fmt.Sprintf("%s%s", mountPoint, fsPath), 0755)
+	if err != nil {
+		return err
+	}
+
+	ok, _ := shared.PathIsEmpty(fmt.Sprintf("%s%s", mountPoint, fsPath))
+	if !ok {
+		return fmt.Errorf("Only empty CEPH fs paths can be used as a LXD storage pool")
+	}
+
+	// Create the mountpoint for the storage pool.
+	poolMntPoint := getStoragePoolMountPoint(s.pool.Name)
+	err = os.MkdirAll(poolMntPoint, 0711)
+	if err != nil {
+		logger.Errorf(`Failed to create mountpoint "%s" for CEPH fs storage pool "%s" in cluster "%s": %s`, poolMntPoint, s.FsName, s.ClusterName, err)
+		return err
+	}
+	logger.Debugf(`Created mountpoint "%s" for CEPH fs storage pool "%s" in cluster "%s"`, poolMntPoint, s.FsName, s.ClusterName)
+	logger.Infof(`Created CEPH fs storage pool "%s" in cluster "%s"`, s.pool.Name, s.ClusterName)
+
+	return nil
+}
+
+func (s *storageCephFs) StoragePoolDelete() error {
+	logger.Infof(`Deleting CEPH fs storage pool "%s" in cluster "%s"`, s.pool.Name, s.ClusterName)
+
+	// Mount the storage pool
+	// Delete the content
+	// Umount the storage pool
+
+	// Delete the mountpoint for the storage pool
+	poolMntPoint := getStoragePoolMountPoint(s.pool.Name)
+	if shared.PathExists(poolMntPoint) {
+		err := os.RemoveAll(poolMntPoint)
+		if err != nil {
+			logger.Errorf(`Failed to delete mountpoint "%s" for CEPH fs storage pool "%s" in cluster "%s": %s`, poolMntPoint, s.FsName, s.ClusterName, err)
+			return err
+		}
+		logger.Debugf(`Deleted mountpoint "%s" for CEPH fs storage pool "%s" in cluster "%s"`, poolMntPoint, s.FsName, s.ClusterName)
+	}
+
+	logger.Infof(`Deleted CEPH fs storage pool "%s" in cluster "%s"`, s.pool.Name, s.ClusterName)
+	return nil
+}
+
+func (s *storageCephFs) StoragePoolMount() (bool, error) {
+	logger.Debugf("Mounting CEPHFS storage pool \"%s\"", s.pool.Name)
+
+	poolMountLockID := getPoolMountLockID(s.pool.Name)
+	lxdStorageMapLock.Lock()
+	if waitChannel, ok := lxdStorageOngoingOperationMap[poolMountLockID]; ok {
+		lxdStorageMapLock.Unlock()
+		if _, ok := <-waitChannel; ok {
+			logger.Warnf("Received value over semaphore, this should not have happened")
+		}
+		// Give the benefit of the doubt and assume that the other
+		// thread actually succeeded in mounting the storage pool.
+		return false, nil
+	}
+
+	lxdStorageOngoingOperationMap[poolMountLockID] = make(chan bool)
+	lxdStorageMapLock.Unlock()
+
+	removeLockFromMap := func() {
+		lxdStorageMapLock.Lock()
+		if waitChannel, ok := lxdStorageOngoingOperationMap[poolMountLockID]; ok {
+			close(waitChannel)
+			delete(lxdStorageOngoingOperationMap, poolMountLockID)
+		}
+		lxdStorageMapLock.Unlock()
+	}
+	defer removeLockFromMap()
+
+	// Check if already mounted
+	poolMntPoint := getStoragePoolMountPoint(s.pool.Name)
+	if shared.IsMountPoint(poolMntPoint) {
+		return false, nil
+	}
+
+	// Parse the namespace / path
+	fields := strings.SplitN(s.FsName, "/", 2)
+	fsName := fields[0]
+	fsPath := "/"
+	if len(fields) > 1 {
+		fsPath = fields[1]
+	}
+	logger.Errorf("s.FsName=%v fields=%v", s.FsName, fields)
+
+	// Get the credentials and host
+	monAddress, secret, err := cephFsConfig(s.ClusterName, s.UserName)
+	if err != nil {
+		return false, err
+	}
+
+	// Do the actual mount
+	uri := fmt.Sprintf("%s:%s", monAddress, fsPath)
+	err = tryMount(uri, poolMntPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", s.UserName, secret, fsName))
+	if err != nil {
+		return false, err
+	}
+
+	logger.Debugf("Mounted CEPHFS storage pool \"%s\"", s.pool.Name)
+
+	return true, nil
+}
+
+func (s *storageCephFs) StoragePoolUmount() (bool, error) {
+	source := s.pool.Config["source"]
+	if source == "" {
+		return false, fmt.Errorf("no \"source\" property found for the storage pool")
+	}
+	cleanSource := filepath.Clean(source)
+	poolMntPoint := getStoragePoolMountPoint(s.pool.Name)
+	if cleanSource == poolMntPoint {
+		return true, nil
+	}
+
+	logger.Debugf("Unmounting CEPHFS storage pool \"%s\"", s.pool.Name)
+
+	poolUmountLockID := getPoolUmountLockID(s.pool.Name)
+	lxdStorageMapLock.Lock()
+	if waitChannel, ok := lxdStorageOngoingOperationMap[poolUmountLockID]; ok {
+		lxdStorageMapLock.Unlock()
+		if _, ok := <-waitChannel; ok {
+			logger.Warnf("Received value over semaphore, this should not have happened")
+		}
+		// Give the benefit of the doubt and assume that the other
+		// thread actually succeeded in unmounting the storage pool.
+		return false, nil
+	}
+
+	lxdStorageOngoingOperationMap[poolUmountLockID] = make(chan bool)
+	lxdStorageMapLock.Unlock()
+
+	removeLockFromMap := func() {
+		lxdStorageMapLock.Lock()
+		if waitChannel, ok := lxdStorageOngoingOperationMap[poolUmountLockID]; ok {
+			close(waitChannel)
+			delete(lxdStorageOngoingOperationMap, poolUmountLockID)
+		}
+		lxdStorageMapLock.Unlock()
+	}
+
+	defer removeLockFromMap()
+
+	if !shared.IsMountPoint(poolMntPoint) {
+		return false, nil
+	}
+
+	err := syscall.Unmount(poolMntPoint, 0)
+	if err != nil {
+		return false, err
+	}
+
+	logger.Debugf("Unmounted CEPHFS pool \"%s\"", s.pool.Name)
+	return true, nil
+}
+
+func (s *storageCephFs) GetStoragePoolWritable() api.StoragePoolPut {
+	return s.pool.Writable()
+}
+
+func (s *storageCephFs) GetStoragePoolVolumeWritable() api.StorageVolumePut {
+	return s.volume.Writable()
+}
+
+func (s *storageCephFs) SetStoragePoolWritable(writable *api.StoragePoolPut) {
+	s.pool.StoragePoolPut = *writable
+}
+
+func (s *storageCephFs) SetStoragePoolVolumeWritable(writable *api.StorageVolumePut) {
+	s.volume.StorageVolumePut = *writable
+}
+
+func (s *storageCephFs) GetContainerPoolInfo() (int64, string, string) {
+	return s.poolID, s.pool.Name, s.pool.Name
+}
+
+func (s *storageCephFs) StoragePoolUpdate(writable *api.StoragePoolPut, changedConfig []string) error {
+	logger.Infof(`Updating CEPHFS storage pool "%s"`, s.pool.Name)
+
+	_, err := s.StoragePoolMount()
+	if err != nil {
+		return err
+	}
+
+	changeable := changeableStoragePoolProperties["cephfs"]
+	unchangeable := []string{}
+	for _, change := range changedConfig {
+		if !shared.StringInSlice(change, changeable) {
+			unchangeable = append(unchangeable, change)
+		}
+	}
+
+	if len(unchangeable) > 0 {
+		return updateStoragePoolError(unchangeable, "cephfs")
+	}
+
+	// "rsync.bwlimit" requires no on-disk modifications.
+
+	logger.Infof(`Updated CEPHFS storage pool "%s"`, s.pool.Name)
+	return nil
+}
+
+// Functions dealing with storage pools.
+func (s *storageCephFs) StoragePoolVolumeCreate() error {
+	logger.Infof("Creating CEPHFS storage volume \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+
+	_, err := s.StoragePoolMount()
+	if err != nil {
+		return err
+	}
+
+	source := s.pool.Config["source"]
+	if source == "" {
+		return fmt.Errorf("no \"source\" property found for the storage pool")
+	}
+
+	isSnapshot := shared.IsSnapshot(s.volume.Name)
+
+	var storageVolumePath string
+
+	if isSnapshot {
+		storageVolumePath = getStoragePoolVolumeSnapshotMountPoint(s.pool.Name, s.volume.Name)
+	} else {
+		storageVolumePath = getStoragePoolVolumeMountPoint(s.pool.Name, s.volume.Name)
+	}
+
+	err = os.MkdirAll(storageVolumePath, 0711)
+	if err != nil {
+		return err
+	}
+
+	logger.Infof("Created CEPHFS storage volume \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+	return nil
+}
+
+func (s *storageCephFs) StoragePoolVolumeDelete() error {
+	logger.Infof("Deleting CEPHFS storage volume \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+
+	source := s.pool.Config["source"]
+	if source == "" {
+		return fmt.Errorf("no \"source\" property found for the storage pool")
+	}
+
+	storageVolumePath := getStoragePoolVolumeMountPoint(s.pool.Name, s.volume.Name)
+	if !shared.PathExists(storageVolumePath) {
+		return nil
+	}
+
+	err := os.RemoveAll(storageVolumePath)
+	if err != nil {
+		return err
+	}
+
+	err = s.s.Cluster.StoragePoolVolumeDelete(
+		"default",
+		s.volume.Name,
+		storagePoolVolumeTypeCustom,
+		s.poolID)
+	if err != nil {
+		logger.Errorf(`Failed to delete database entry for CEPHFS storage volume "%s" on storage pool "%s"`,
+			s.volume.Name, s.pool.Name)
+	}
+
+	logger.Infof("Deleted CEPHFS storage volume \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+	return nil
+}
+
+func (s *storageCephFs) StoragePoolVolumeMount() (bool, error) {
+	return true, nil
+}
+
+func (s *storageCephFs) StoragePoolVolumeUmount() (bool, error) {
+	return true, nil
+}
+
+func (s *storageCephFs) StoragePoolVolumeUpdate(writable *api.StorageVolumePut, changedConfig []string) error {
+	if writable.Restore == "" {
+		logger.Infof(`Updating CEPHFS storage volume "%s"`, s.volume.Name)
+	}
+
+	_, err := s.StoragePoolMount()
+	if err != nil {
+		return err
+	}
+
+	if writable.Restore != "" {
+		logger.Infof(`Restoring CEPHFS storage volume "%s" from snapshot "%s"`,
+			s.volume.Name, writable.Restore)
+
+		sourcePath := getStoragePoolVolumeSnapshotMountPoint(s.pool.Name,
+			fmt.Sprintf("%s/%s", s.volume.Name, writable.Restore))
+		targetPath := getStoragePoolVolumeMountPoint(s.pool.Name, s.volume.Name)
+
+		// Restore using rsync
+		bwlimit := s.pool.Config["rsync.bwlimit"]
+		output, err := rsyncLocalCopy(sourcePath, targetPath, bwlimit)
+		if err != nil {
+			return fmt.Errorf("failed to rsync container: %s: %s", string(output), err)
+		}
+
+		logger.Infof(`Restored CEPHFS storage volume "%s" from snapshot "%s"`,
+			s.volume.Name, writable.Restore)
+		return nil
+	}
+
+	changeable := changeableStoragePoolVolumeProperties["cephfs"]
+	unchangeable := []string{}
+	for _, change := range changedConfig {
+		if !shared.StringInSlice(change, changeable) {
+			unchangeable = append(unchangeable, change)
+		}
+	}
+
+	if len(unchangeable) > 0 {
+		return updateStoragePoolVolumeError(unchangeable, "cephfs")
+	}
+
+	logger.Infof(`Updated CEPHFS storage volume "%s"`, s.volume.Name)
+	return nil
+}
+
+func (s *storageCephFs) StoragePoolVolumeRename(newName string) error {
+	logger.Infof(`Renaming CEPHFS storage volume on storage pool "%s" from "%s" to "%s`,
+		s.pool.Name, s.volume.Name, newName)
+
+	_, err := s.StoragePoolMount()
+	if err != nil {
+		return err
+	}
+
+	usedBy, err := storagePoolVolumeUsedByContainersGet(s.s, "default", s.volume.Name, storagePoolVolumeTypeNameCustom)
+	if err != nil {
+		return err
+	}
+	if len(usedBy) > 0 {
+		return fmt.Errorf(`CEPHFS storage volume "%s" on storage pool "%s" is attached to containers`,
+			s.volume.Name, s.pool.Name)
+	}
+
+	oldPath := getStoragePoolVolumeMountPoint(s.pool.Name, s.volume.Name)
+	newPath := getStoragePoolVolumeMountPoint(s.pool.Name, newName)
+	err = os.Rename(oldPath, newPath)
+	if err != nil {
+		return err
+	}
+
+	logger.Infof(`Renamed CEPHFS storage volume on storage pool "%s" from "%s" to "%s`,
+		s.pool.Name, s.volume.Name, newName)
+
+	return s.s.Cluster.StoragePoolVolumeRename("default", s.volume.Name, newName,
+		storagePoolVolumeTypeCustom, s.poolID)
+}
+
+func (s *storageCephFs) ContainerStorageReady(container container) bool {
+	containerMntPoint := getContainerMountPoint(container.Project(), s.pool.Name, container.Name())
+	ok, _ := shared.PathIsEmpty(containerMntPoint)
+	return !ok
+}
+
+func (s *storageCephFs) ContainerCreate(container container) error {
+	logger.Debugf("Creating empty CEPHFS storage volume for container \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+
+	_, err := s.StoragePoolMount()
+	if err != nil {
+		return err
+	}
+
+	source := s.pool.Config["source"]
+	if source == "" {
+		return fmt.Errorf("no \"source\" property found for the storage pool")
+	}
+
+	containerMntPoint := getContainerMountPoint(container.Project(), s.pool.Name, container.Name())
+	err = createContainerMountpoint(containerMntPoint, container.Path(), container.IsPrivileged())
+	if err != nil {
+		return err
+	}
+	revert := true
+	defer func() {
+		if !revert {
+			return
+		}
+		deleteContainerMountpoint(containerMntPoint, container.Path(), s.GetStorageTypeName())
+	}()
+
+	err = container.TemplateApply("create")
+	if err != nil {
+		return errors.Wrap(err, "Apply template")
+	}
+
+	revert = false
+
+	logger.Debugf("Created empty CEPHFS storage volume for container \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+	return nil
+}
+
+func (s *storageCephFs) ContainerCreateFromImage(container container, imageFingerprint string, tracker *ioprogress.ProgressTracker) error {
+	logger.Debugf("Creating CEPHFS storage volume for container \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+
+	_, err := s.StoragePoolMount()
+	if err != nil {
+		return err
+	}
+
+	source := s.pool.Config["source"]
+	if source == "" {
+		return fmt.Errorf("no \"source\" property found for the storage pool")
+	}
+
+	privileged := container.IsPrivileged()
+	containerName := container.Name()
+	containerMntPoint := getContainerMountPoint(container.Project(), s.pool.Name, containerName)
+	err = createContainerMountpoint(containerMntPoint, container.Path(), privileged)
+	if err != nil {
+		return errors.Wrap(err, "Create container mount point")
+	}
+	revert := true
+	defer func() {
+		if !revert {
+			return
+		}
+		s.ContainerDelete(container)
+	}()
+
+	imagePath := shared.VarPath("images", imageFingerprint)
+	err = unpackImage(imagePath, containerMntPoint, storageTypeCephFs, s.s.OS.RunningInUserNS, nil)
+	if err != nil {
+		return errors.Wrap(err, "Unpack image")
+	}
+
+	err = container.TemplateApply("create")
+	if err != nil {
+		return errors.Wrap(err, "Apply template")
+	}
+
+	revert = false
+
+	logger.Debugf("Created CEPHFS storage volume for container \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+	return nil
+}
+
+func (s *storageCephFs) ContainerCanRestore(container container, sourceContainer container) error {
+	return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerDelete(container container) error {
+	return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerCopy(target container, source container, containerOnly bool) error {
+	return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerRefresh(target container, source container, snapshots []container) error {
+	return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerMount(c container) (bool, error) {
+	return false, fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerUmount(c container, path string) (bool, error) {
+	return false, fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerRename(container container, newName string) error {
+	return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerRestore(container container, sourceContainer container) error {
+	return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerGetUsage(c container) (int64, error) {
+	return -1, fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerSnapshotCreate(snapshotContainer container, sourceContainer container) error {
+	return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerSnapshotCreateEmpty(snapshotContainer container) error {
+	return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerSnapshotDelete(snapshotContainer container) error {
+	return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerSnapshotRename(snapshotContainer container, newName string) error {
+	return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerSnapshotStart(container container) (bool, error) {
+	return false, fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerSnapshotStop(container container) (bool, error) {
+	return false, fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerBackupCreate(backup backup, source container) error {
+	return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerBackupLoad(info backupInfo, data io.ReadSeeker, tarArgs []string) error {
+	return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ImageCreate(fingerprint string, tracker *ioprogress.ProgressTracker) error {
+	return fmt.Errorf("CEPHfs cannot be used for images")
+}
+
+func (s *storageCephFs) ImageDelete(fingerprint string) error {
+	return fmt.Errorf("CEPHfs cannot be used for images")
+}
+
+func (s *storageCephFs) ImageMount(fingerprint string) (bool, error) {
+	return false, fmt.Errorf("CEPHfs cannot be used for images")
+}
+
+func (s *storageCephFs) ImageUmount(fingerprint string) (bool, error) {
+	return false, fmt.Errorf("CEPHfs cannot be used for images")
+}
+
+func (s *storageCephFs) MigrationType() migration.MigrationFSType {
+	return migration.MigrationFSType_RSYNC
+}
+
+func (s *storageCephFs) PreservesInodes() bool {
+	return false
+}
+
+func (s *storageCephFs) MigrationSource(args MigrationSourceArgs) (MigrationStorageSourceDriver, error) {
+	return rsyncMigrationSource(args)
+}
+
+func (s *storageCephFs) MigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+	return rsyncMigrationSink(conn, op, args)
+}
+
+func (s *storageCephFs) StorageEntitySetQuota(volumeType int, size int64, data interface{}) error {
+	// FIXME this may be possible to do with
+	// setfattr -n ceph.quota.max_bytes -v 100000000 /some/dir     # 100 MB
+	return fmt.Errorf("TODO")
+}
+
+func (s *storageCephFs) StoragePoolResources() (*api.ResourcesStoragePool, error) {
+	_, err := s.StoragePoolMount()
+	if err != nil {
+		return nil, err
+	}
+
+	poolMntPoint := getStoragePoolMountPoint(s.pool.Name)
+
+	return storageResource(poolMntPoint)
+}
+
+func (s *storageCephFs) StoragePoolVolumeCopy(source *api.StorageVolumeSource) error {
+	logger.Infof("Copying CEPHFS storage volume \"%s\" on storage pool \"%s\" as \"%s\" to storage pool \"%s\"", source.Name, source.Pool, s.volume.Name, s.pool.Name)
+	successMsg := fmt.Sprintf("Copied CEPHFS storage volume \"%s\" on storage pool \"%s\" as \"%s\" to storage pool \"%s\"", source.Name, source.Pool, s.volume.Name, s.pool.Name)
+
+	if s.pool.Name != source.Pool {
+		// setup storage for the source volume
+		srcStorage, err := storagePoolVolumeInit(s.s, "default", source.Pool, source.Name, storagePoolVolumeTypeCustom)
+		if err != nil {
+			logger.Errorf("Failed to initialize CEPHFS storage volume \"%s\" on storage pool \"%s\": %s", s.volume.Name, s.pool.Name, err)
+			return err
+		}
+
+		ourMount, err := srcStorage.StoragePoolMount()
+		if err != nil {
+			logger.Errorf("Failed to mount CEPHFS storage volume \"%s\" on storage pool \"%s\": %s", s.volume.Name, s.pool.Name, err)
+			return err
+		}
+		if ourMount {
+			defer srcStorage.StoragePoolUmount()
+		}
+	}
+
+	err := s.copyVolume(source.Pool, source.Name, s.volume.Name)
+	if err != nil {
+		return err
+	}
+
+	if source.VolumeOnly {
+		logger.Infof(successMsg)
+		return nil
+	}
+
+	snapshots, err := storagePoolVolumeSnapshotsGet(s.s, source.Pool, source.Name, storagePoolVolumeTypeCustom)
+	if err != nil {
+		return err
+	}
+
+	for _, snap := range snapshots {
+		_, snapOnlyName, _ := containerGetParentAndSnapshotName(snap)
+		err = s.copyVolumeSnapshot(source.Pool, snap, fmt.Sprintf("%s/%s", s.volume.Name, snapOnlyName))
+		if err != nil {
+			return err
+		}
+	}
+
+	logger.Infof(successMsg)
+	return nil
+}
+
+func (s *storageCephFs) StorageMigrationSource(args MigrationSourceArgs) (MigrationStorageSourceDriver, error) {
+	return rsyncStorageMigrationSource(args)
+}
+
+func (s *storageCephFs) StorageMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+	return rsyncStorageMigrationSink(conn, op, args)
+}
+
+func (s *storageCephFs) GetStoragePool() *api.StoragePool {
+	return s.pool
+}
+
+func (s *storageCephFs) GetStoragePoolVolume() *api.StorageVolume {
+	return s.volume
+}
+
+func (s *storageCephFs) GetState() *state.State {
+	return s.s
+}
+
+func (s *storageCephFs) StoragePoolVolumeSnapshotCreate(target *api.StorageVolumeSnapshotsPost) error {
+	return fmt.Errorf("TODO")
+}
+
+func (s *storageCephFs) StoragePoolVolumeSnapshotDelete() error {
+	return fmt.Errorf("TODO")
+}
+
+func (s *storageCephFs) StoragePoolVolumeSnapshotRename(newName string) error {
+	logger.Infof("Renaming CEPHFS storage volume on storage pool \"%s\" from \"%s\" to \"%s\"", s.pool.Name, s.volume.Name, newName)
+	var fullSnapshotName string
+
+	if shared.IsSnapshot(newName) {
+		// When renaming volume snapshots, newName will contain the full snapshot name
+		fullSnapshotName = newName
+	} else {
+		sourceName, _, ok := containerGetParentAndSnapshotName(s.volume.Name)
+		if !ok {
+			return fmt.Errorf("Not a snapshot name")
+		}
+
+		fullSnapshotName = fmt.Sprintf("%s%s%s", sourceName, shared.SnapshotDelimiter, newName)
+	}
+
+	oldPath := getStoragePoolVolumeSnapshotMountPoint(s.pool.Name, s.volume.Name)
+	newPath := getStoragePoolVolumeSnapshotMountPoint(s.pool.Name, fullSnapshotName)
+
+	if !shared.PathExists(newPath) {
+		err := os.MkdirAll(newPath, customDirMode)
+		if err != nil {
+			return err
+		}
+	}
+
+	err := os.Rename(oldPath, newPath)
+	if err != nil {
+		return err
+	}
+
+	logger.Infof("Renamed CEPHFS storage volume on storage pool \"%s\" from \"%s\" to \"%s\"", s.pool.Name, s.volume.Name, newName)
+	return s.s.Cluster.StoragePoolVolumeRename("default", s.volume.Name, fullSnapshotName, storagePoolVolumeTypeCustom, s.poolID)
+}
+
+func (s *storageCephFs) copyVolume(sourcePool string, source string, target string) error {
+	var srcMountPoint string
+
+	if shared.IsSnapshot(source) {
+		srcMountPoint = getStoragePoolVolumeSnapshotMountPoint(sourcePool, source)
+	} else {
+		srcMountPoint = getStoragePoolVolumeMountPoint(sourcePool, source)
+	}
+
+	dstMountPoint := getStoragePoolVolumeMountPoint(s.pool.Name, target)
+
+	err := os.MkdirAll(dstMountPoint, 0711)
+	if err != nil {
+		return err
+	}
+
+	bwlimit := s.pool.Config["rsync.bwlimit"]
+
+	_, err = rsyncLocalCopy(srcMountPoint, dstMountPoint, bwlimit)
+	if err != nil {
+		os.RemoveAll(dstMountPoint)
+		logger.Errorf("Failed to rsync into CEPHFS storage volume \"%s\" on storage pool \"%s\": %s", s.volume.Name, s.pool.Name, err)
+		return err
+	}
+
+	return nil
+}
+
+func (s *storageCephFs) copyVolumeSnapshot(sourcePool string, source string, target string) error {
+	srcMountPoint := getStoragePoolVolumeSnapshotMountPoint(sourcePool, source)
+	dstMountPoint := getStoragePoolVolumeSnapshotMountPoint(s.pool.Name, target)
+
+	err := os.MkdirAll(dstMountPoint, 0711)
+	if err != nil {
+		return err
+	}
+
+	bwlimit := s.pool.Config["rsync.bwlimit"]
+
+	_, err = rsyncLocalCopy(srcMountPoint, dstMountPoint, bwlimit)
+	if err != nil {
+		os.RemoveAll(dstMountPoint)
+		logger.Errorf("Failed to rsync into CEPHFS storage volume \"%s\" on storage pool \"%s\": %s", target, s.pool.Name, err)
+		return err
+	}
+
+	return nil
+}
+
+func cephFsExists(clusterName string, userName string, fsName string) bool {
+	_, err := shared.RunCommand("ceph", "--name", fmt.Sprintf("client.%s", userName), "--cluster", clusterName, "fs", "get", fsName)
+	if err != nil {
+		return false
+	}
+
+	return true
+}
+
+func cephFsConfig(clusterName string, userName string) (string, string, error) {
+	// Parse the CEPH configuration
+	cephConf, err := os.Open(fmt.Sprintf("/etc/ceph/%s.conf", clusterName))
+	if err != nil {
+		return "", "", err
+	}
+
+	var cephMon string
+
+	scan := bufio.NewScanner(cephConf)
+	for scan.Scan() {
+		line := scan.Text()
+		line = strings.TrimSpace(line)
+
+		if line == "" {
+			continue
+		}
+
+		if strings.HasPrefix(line, "mon_host") {
+			fields := strings.SplitN(line, "=", 2)
+			if len(fields) < 2 {
+				continue
+			}
+
+			cephMon = strings.TrimSpace(fields[1])
+			break
+		}
+	}
+
+	if cephMon == "" {
+		return "", "", fmt.Errorf("Couldn't find a CPEH mon")
+	}
+
+	// Parse the CEPH keyring
+	cephKeyring, err := os.Open(fmt.Sprintf("/etc/ceph/%v.client.%v.keyring", clusterName, userName))
+	if err != nil {
+		return "", "", err
+	}
+
+	var cephSecret string
+
+	scan = bufio.NewScanner(cephKeyring)
+	for scan.Scan() {
+		line := scan.Text()
+		line = strings.TrimSpace(line)
+
+		if line == "" {
+			continue
+		}
+
+		if strings.HasPrefix(line, "key") {
+			fields := strings.SplitN(line, "=", 2)
+			if len(fields) < 2 {
+				continue
+			}
+
+			cephSecret = strings.TrimSpace(fields[1])
+			break
+		}
+	}
+
+	if cephSecret == "" {
+		return "", "", fmt.Errorf("Couldn't find a keyring entry")
+	}
+
+
+	return cephMon, cephSecret, nil
+}
diff --git a/lxd/storage_pools_config.go b/lxd/storage_pools_config.go
index 228b2abb45..4e654ec01c 100644
--- a/lxd/storage_pools_config.go
+++ b/lxd/storage_pools_config.go
@@ -24,6 +24,9 @@ var changeableStoragePoolProperties = map[string][]string{
 		"volume.block.mount_options",
 		"volume.size"},
 
+	"cephfs": {
+		"rsync.bwlimit"},
+
 	"dir": {
 		"rsync.bwlimit"},
 
@@ -64,6 +67,11 @@ var storagePoolConfigKeys = map[string]func(value string) error{
 	"ceph.rbd.clone_copy": shared.IsBool,
 	"ceph.user.name":      shared.IsAny,
 
+	// valid drivers: cephfs
+	"cephfs.cluster_name": shared.IsAny,
+	"cephfs.path":         shared.IsAny,
+	"cephfs.user.name":    shared.IsAny,
+
 	// valid drivers: lvm
 	"lvm.thinpool_name": shared.IsAny,
 	"lvm.use_thinpool":  shared.IsBool,
@@ -236,7 +244,7 @@ func storagePoolFillDefault(name string, driver string, config map[string]string
 		}
 	}
 
-	if driver == "btrfs" || driver == "ceph" || driver == "lvm" || driver == "zfs" {
+	if driver == "btrfs" || driver == "ceph" || driver == "cephfs" || driver == "lvm" || driver == "zfs" {
 		if config["volume.size"] != "" {
 			_, err := shared.ParseByteSizeString(config["volume.size"])
 			if err != nil {
diff --git a/lxd/storage_pools_utils.go b/lxd/storage_pools_utils.go
index e2dd28144b..5b52dc5aa9 100644
--- a/lxd/storage_pools_utils.go
+++ b/lxd/storage_pools_utils.go
@@ -11,7 +11,7 @@ import (
 	"github.com/lxc/lxd/shared/version"
 )
 
-var supportedPoolTypes = []string{"btrfs", "ceph", "dir", "lvm", "zfs"}
+var supportedPoolTypes = []string{"btrfs", "ceph", "cephfs", "dir", "lvm", "zfs"}
 
 func storagePoolUpdate(state *state.State, name, newDescription string, newConfig map[string]string, withDB bool) error {
 	s, err := storagePoolInit(state, name)
diff --git a/lxd/storage_volumes_config.go b/lxd/storage_volumes_config.go
index f45309063c..d834a2efe4 100644
--- a/lxd/storage_volumes_config.go
+++ b/lxd/storage_volumes_config.go
@@ -59,6 +59,11 @@ var changeableStoragePoolVolumeProperties = map[string][]string{
 		"security.unmapped",
 		"size"},
 
+	"cephfs": {
+		"security.unmapped",
+		"size",
+	},
+
 	"dir": {
 		"security.unmapped",
 	},
@@ -75,7 +80,7 @@ var changeableStoragePoolVolumeProperties = map[string][]string{
 		"zfs.use_refquota"},
 }
 
-// btrfs, ceph, dir, lvm, zfs
+// btrfs, ceph, cephfs, dir, lvm, zfs
 var storageVolumeConfigKeys = map[string]func(value string) ([]string, error){
 	"block.filesystem": func(value string) ([]string, error) {
 		err := shared.IsOneOf(value, []string{"btrfs", "ext4", "xfs"})
@@ -93,7 +98,7 @@ var storageVolumeConfigKeys = map[string]func(value string) ([]string, error){
 	},
 	"size": func(value string) ([]string, error) {
 		if value == "" {
-			return []string{"btrfs", "ceph", "lvm", "zfs"}, nil
+			return []string{"btrfs", "ceph", "cephfs", "lvm", "zfs"}, nil
 		}
 
 		_, err := shared.ParseByteSizeString(value)
@@ -101,7 +106,7 @@ var storageVolumeConfigKeys = map[string]func(value string) ([]string, error){
 			return nil, err
 		}
 
-		return []string{"btrfs", "ceph", "lvm", "zfs"}, nil
+		return []string{"btrfs", "ceph", "cephfs", "lvm", "zfs"}, nil
 	},
 	"volatile.idmap.last": func(value string) ([]string, error) {
 		return supportedPoolTypes, shared.IsAny(value)


More information about the lxc-devel mailing list