[lxc-devel] [lxd/master] lxd/storage/cephfs: Initial support
stgraber on Github
lxc-bot at linuxcontainers.org
Mon May 13 14:05:42 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 354 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190513/b0ad4dd6/attachment-0001.bin>
-------------- next part --------------
From a4c7b64add8084f13cb55fe494887a0743118448 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Sun, 12 May 2019 22:44:45 +0200
Subject: [PATCH] lxd/storage/cephfs: Initial support
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
doc/storage.md | 8 +
lxd/storage.go | 25 +-
lxd/storage_cephfs.go | 956 ++++++++++++++++++++++++++++++++++
lxd/storage_pools_config.go | 10 +-
lxd/storage_pools_utils.go | 2 +-
lxd/storage_volumes_config.go | 11 +-
6 files changed, 1006 insertions(+), 6 deletions(-)
create mode 100644 lxd/storage_cephfs.go
diff --git a/doc/storage.md b/doc/storage.md
index 2b4f287133..e00c3289f5 100644
--- a/doc/storage.md
+++ b/doc/storage.md
@@ -16,6 +16,9 @@ ceph.osd.pg\_num | string | ceph driver
ceph.osd.pool\_name | string | ceph driver | name of the pool | storage\_driver\_ceph | Name of the osd storage pool.
ceph.rbd.clone\_copy | string | ceph driver | true | storage\_driver\_ceph | Whether to use RBD lightweight clones rather than full dataset copies.
ceph.user.name | string | ceph driver | admin | storage\_ceph\_user\_name | The ceph user to use when creating storage pools and volumes.
+cephfs.cluster\_name | string | cephfs driver | ceph | storage\_driver\_cephfs | Name of the ceph cluster in which to create new storage pools.
+cephfs.path | string | cephfs driver | / | storage\_driver\_cephfs | The base path for the CEPH fs mount
+cephfs.user.name | string | cephfs driver | admin | storage\_driver\_cephfs | The ceph user to use when creating storage pools and volumes.
lvm.thinpool\_name | string | lvm driver | LXDThinPool | storage | Thin pool where images and containers are created.
lvm.use\_thinpool | bool | lvm driver | true | storage\_lvm\_use\_thinpool | Whether the storage pool uses a thinpool for logical volumes.
lvm.vg\_name | string | lvm driver | name of the pool | storage | Name of the volume group to create.
@@ -228,6 +231,11 @@ lxc storage create pool1 ceph ceph.osd.pool\_name=my-osd
lxc storage create pool1 ceph source=my-already-existing-osd
```
+### CEPHFS
+
+ - Can only be used for custom storage volumes
+ - Supports snapshots if enabled on the server side
+
### Btrfs
- Uses a subvolume per container, image and snapshot, creating btrfs snapshots when creating a new object.
diff --git a/lxd/storage.go b/lxd/storage.go
index f8c7e70c45..e67d3ee9db 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -83,13 +83,14 @@ type storageType int
const (
storageTypeBtrfs storageType = iota
storageTypeCeph
+ storageTypeCephFs
storageTypeDir
storageTypeLvm
storageTypeMock
storageTypeZfs
)
-var supportedStoragePoolDrivers = []string{"btrfs", "ceph", "dir", "lvm", "zfs"}
+var supportedStoragePoolDrivers = []string{"btrfs", "ceph", "cephfs", "dir", "lvm", "zfs"}
func storageTypeToString(sType storageType) (string, error) {
switch sType {
@@ -97,6 +98,8 @@ func storageTypeToString(sType storageType) (string, error) {
return "btrfs", nil
case storageTypeCeph:
return "ceph", nil
+ case storageTypeCephFs:
+ return "cephfs", nil
case storageTypeDir:
return "dir", nil
case storageTypeLvm:
@@ -116,6 +119,8 @@ func storageStringToType(sName string) (storageType, error) {
return storageTypeBtrfs, nil
case "ceph":
return storageTypeCeph, nil
+ case "cephfs":
+ return storageTypeCephFs, nil
case "dir":
return storageTypeDir, nil
case "lvm":
@@ -266,6 +271,13 @@ func storageCoreInit(driver string) (storage, error) {
return nil, err
}
return &ceph, nil
+ case storageTypeCephFs:
+ cephfs := storageCephFs{}
+ err = cephfs.StorageCoreInit()
+ if err != nil {
+ return nil, err
+ }
+ return &cephfs, nil
case storageTypeLvm:
lvm := storageLvm{}
err = lvm.StorageCoreInit()
@@ -356,6 +368,17 @@ func storageInit(s *state.State, project, poolName, volumeName string, volumeTyp
return nil, err
}
return &ceph, nil
+ case storageTypeCephFs:
+ cephfs := storageCephFs{}
+ cephfs.poolID = poolID
+ cephfs.pool = pool
+ cephfs.volume = volume
+ cephfs.s = s
+ err = cephfs.StoragePoolInit()
+ if err != nil {
+ return nil, err
+ }
+ return &cephfs, nil
case storageTypeLvm:
lvm := storageLvm{}
lvm.poolID = poolID
diff --git a/lxd/storage_cephfs.go b/lxd/storage_cephfs.go
new file mode 100644
index 0000000000..66cc52dc48
--- /dev/null
+++ b/lxd/storage_cephfs.go
@@ -0,0 +1,956 @@
+package main
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strings"
+ "syscall"
+
+ "github.com/gorilla/websocket"
+ "github.com/pkg/errors"
+
+ "github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/state"
+ "github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/api"
+ "github.com/lxc/lxd/shared/ioprogress"
+ "github.com/lxc/lxd/shared/logger"
+)
+
+type storageCephFs struct {
+ ClusterName string
+ FsName string
+ UserName string
+ storageShared
+}
+
+func (s *storageCephFs) StorageCoreInit() error {
+ s.sType = storageTypeCeph
+ typeName, err := storageTypeToString(s.sType)
+ if err != nil {
+ return err
+ }
+ s.sTypeName = typeName
+
+ if cephVersion != "" {
+ s.sTypeVersion = cephVersion
+ return nil
+ }
+
+ msg, err := shared.RunCommand("rbd", "--version")
+ if err != nil {
+ return fmt.Errorf("Error getting CEPH version: %s", err)
+ }
+ s.sTypeVersion = strings.TrimSpace(msg)
+ cephVersion = s.sTypeVersion
+
+ return nil
+}
+
+func (s *storageCephFs) StoragePoolInit() error {
+ var err error
+
+ err = s.StorageCoreInit()
+ if err != nil {
+ return errors.Wrap(err, "Storage pool init")
+ }
+
+ // set cluster name
+ if s.pool.Config["cephfs.cluster_name"] != "" {
+ s.ClusterName = s.pool.Config["cephfs.cluster_name"]
+ } else {
+ s.ClusterName = "ceph"
+ }
+
+ // set ceph user name
+ if s.pool.Config["cephfs.user.name"] != "" {
+ s.UserName = s.pool.Config["cephfs.user.name"]
+ } else {
+ s.UserName = "admin"
+ }
+
+ // set osd pool name
+ if s.pool.Config["ceph.fs.name"] != "" {
+ s.FsName = s.pool.Config["ceph.fs.name"]
+ }
+
+ return nil
+}
+
+// Initialize a full storage interface.
+func (s *storageCephFs) StoragePoolCheck() error {
+ return nil
+}
+
+func (s *storageCephFs) StoragePoolCreate() error {
+ logger.Infof(`Creating CEPHFS storage pool "%s" in cluster "%s"`, s.pool.Name, s.ClusterName)
+
+ // Setup config
+ s.pool.Config["volatile.initial_source"] = s.pool.Config["source"]
+
+ if s.pool.Config["source"] == "" {
+ return fmt.Errorf("A ceph fs name OR name/path source is required")
+ }
+
+ if s.pool.Config["ceph.fs.name"] != "" && s.pool.Config["ceph.fs.name"] != s.pool.Config["source"] {
+ return fmt.Errorf("ceph.fs.name must match the source")
+ }
+
+ if s.pool.Config["ceph.cluster_name"] == "" {
+ s.pool.Config["ceph.cluster_name"] = "ceph"
+ }
+
+ if s.pool.Config["cephfs.user.name"] != "" {
+ s.pool.Config["cephfs.user.name"] = "admin"
+ }
+
+ s.pool.Config["ceph.fs.name"] = s.pool.Config["source"]
+ s.FsName = s.pool.Config["source"]
+
+ // Parse the namespace / path
+ fields := strings.SplitN(s.FsName, "/", 2)
+ fsName := fields[0]
+ fsPath := "/"
+ if len(fields) > 1 {
+ fsPath = fields[1]
+ }
+
+ // Check that the filesystem exists
+ if !cephFsExists(s.ClusterName, s.UserName, fsName) {
+ return fmt.Errorf("The requested '%v' CEPH fs doesn't exist", fsName)
+ }
+
+ // Create the path if needed
+ mountPath, err := ioutil.TempDir("", "lxd_cephfs_")
+ if err != nil {
+ return err
+ }
+ defer os.RemoveAll(mountPath)
+
+ err = os.Chmod(mountPath, 0700)
+ if err != nil {
+ return err
+ }
+
+ mountPoint := filepath.Join(mountPath, "mount")
+ err = os.Mkdir(mountPoint, 0700)
+ if err != nil {
+ return err
+ }
+
+ // Get the credentials and host
+ monAddress, userSecret, err := cephFsConfig(s.ClusterName, s.UserName)
+ if err != nil {
+ return err
+ }
+
+ uri := fmt.Sprintf("%s:/", monAddress)
+ err = tryMount(uri, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", s.UserName, userSecret, fsName))
+ if err != nil {
+ return err
+ }
+ defer tryUnmount(mountPoint, syscall.MNT_DETACH)
+
+ // Check that the existing path is empty
+ err = os.MkdirAll(fmt.Sprintf("%s%s", mountPoint, fsPath), 0755)
+ if err != nil {
+ return err
+ }
+
+ ok, _ := shared.PathIsEmpty(fmt.Sprintf("%s%s", mountPoint, fsPath))
+ if !ok {
+ return fmt.Errorf("Only empty CEPH fs paths can be used as a LXD storage pool")
+ }
+
+ // Create the mountpoint for the storage pool.
+ poolMntPoint := getStoragePoolMountPoint(s.pool.Name)
+ err = os.MkdirAll(poolMntPoint, 0711)
+ if err != nil {
+ logger.Errorf(`Failed to create mountpoint "%s" for CEPH fs storage pool "%s" in cluster "%s": %s`, poolMntPoint, s.FsName, s.ClusterName, err)
+ return err
+ }
+ logger.Debugf(`Created mountpoint "%s" for CEPH fs storage pool "%s" in cluster "%s"`, poolMntPoint, s.FsName, s.ClusterName)
+ logger.Infof(`Created CEPH fs storage pool "%s" in cluster "%s"`, s.pool.Name, s.ClusterName)
+
+ return nil
+}
+
+func (s *storageCephFs) StoragePoolDelete() error {
+ logger.Infof(`Deleting CEPH fs storage pool "%s" in cluster "%s"`, s.pool.Name, s.ClusterName)
+
+ // Mount the storage pool
+ // Delete the content
+ // Umount the storage pool
+
+ // Delete the mountpoint for the storage pool
+ poolMntPoint := getStoragePoolMountPoint(s.pool.Name)
+ if shared.PathExists(poolMntPoint) {
+ err := os.RemoveAll(poolMntPoint)
+ if err != nil {
+ logger.Errorf(`Failed to delete mountpoint "%s" for CEPH fs storage pool "%s" in cluster "%s": %s`, poolMntPoint, s.FsName, s.ClusterName, err)
+ return err
+ }
+ logger.Debugf(`Deleted mountpoint "%s" for CEPH fs storage pool "%s" in cluster "%s"`, poolMntPoint, s.FsName, s.ClusterName)
+ }
+
+ logger.Infof(`Deleted CEPH fs storage pool "%s" in cluster "%s"`, s.pool.Name, s.ClusterName)
+ return nil
+}
+
+func (s *storageCephFs) StoragePoolMount() (bool, error) {
+ logger.Debugf("Mounting CEPHFS storage pool \"%s\"", s.pool.Name)
+
+ poolMountLockID := getPoolMountLockID(s.pool.Name)
+ lxdStorageMapLock.Lock()
+ if waitChannel, ok := lxdStorageOngoingOperationMap[poolMountLockID]; ok {
+ lxdStorageMapLock.Unlock()
+ if _, ok := <-waitChannel; ok {
+ logger.Warnf("Received value over semaphore, this should not have happened")
+ }
+ // Give the benefit of the doubt and assume that the other
+ // thread actually succeeded in mounting the storage pool.
+ return false, nil
+ }
+
+ lxdStorageOngoingOperationMap[poolMountLockID] = make(chan bool)
+ lxdStorageMapLock.Unlock()
+
+ removeLockFromMap := func() {
+ lxdStorageMapLock.Lock()
+ if waitChannel, ok := lxdStorageOngoingOperationMap[poolMountLockID]; ok {
+ close(waitChannel)
+ delete(lxdStorageOngoingOperationMap, poolMountLockID)
+ }
+ lxdStorageMapLock.Unlock()
+ }
+ defer removeLockFromMap()
+
+ // Check if already mounted
+ poolMntPoint := getStoragePoolMountPoint(s.pool.Name)
+ if shared.IsMountPoint(poolMntPoint) {
+ return false, nil
+ }
+
+ // Parse the namespace / path
+ fields := strings.SplitN(s.FsName, "/", 2)
+ fsName := fields[0]
+ fsPath := "/"
+ if len(fields) > 1 {
+ fsPath = fields[1]
+ }
+ logger.Errorf("s.FsName=%v fields=%v", s.FsName, fields)
+
+ // Get the credentials and host
+ monAddress, secret, err := cephFsConfig(s.ClusterName, s.UserName)
+ if err != nil {
+ return false, err
+ }
+
+ // Do the actual mount
+ uri := fmt.Sprintf("%s:%s", monAddress, fsPath)
+ err = tryMount(uri, poolMntPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", s.UserName, secret, fsName))
+ if err != nil {
+ return false, err
+ }
+
+ logger.Debugf("Mounted CEPHFS storage pool \"%s\"", s.pool.Name)
+
+ return true, nil
+}
+
+func (s *storageCephFs) StoragePoolUmount() (bool, error) {
+ source := s.pool.Config["source"]
+ if source == "" {
+ return false, fmt.Errorf("no \"source\" property found for the storage pool")
+ }
+ cleanSource := filepath.Clean(source)
+ poolMntPoint := getStoragePoolMountPoint(s.pool.Name)
+ if cleanSource == poolMntPoint {
+ return true, nil
+ }
+
+ logger.Debugf("Unmounting CEPHFS storage pool \"%s\"", s.pool.Name)
+
+ poolUmountLockID := getPoolUmountLockID(s.pool.Name)
+ lxdStorageMapLock.Lock()
+ if waitChannel, ok := lxdStorageOngoingOperationMap[poolUmountLockID]; ok {
+ lxdStorageMapLock.Unlock()
+ if _, ok := <-waitChannel; ok {
+ logger.Warnf("Received value over semaphore, this should not have happened")
+ }
+ // Give the benefit of the doubt and assume that the other
+ // thread actually succeeded in unmounting the storage pool.
+ return false, nil
+ }
+
+ lxdStorageOngoingOperationMap[poolUmountLockID] = make(chan bool)
+ lxdStorageMapLock.Unlock()
+
+ removeLockFromMap := func() {
+ lxdStorageMapLock.Lock()
+ if waitChannel, ok := lxdStorageOngoingOperationMap[poolUmountLockID]; ok {
+ close(waitChannel)
+ delete(lxdStorageOngoingOperationMap, poolUmountLockID)
+ }
+ lxdStorageMapLock.Unlock()
+ }
+
+ defer removeLockFromMap()
+
+ if !shared.IsMountPoint(poolMntPoint) {
+ return false, nil
+ }
+
+ err := syscall.Unmount(poolMntPoint, 0)
+ if err != nil {
+ return false, err
+ }
+
+ logger.Debugf("Unmounted CEPHFS pool \"%s\"", s.pool.Name)
+ return true, nil
+}
+
+func (s *storageCephFs) GetStoragePoolWritable() api.StoragePoolPut {
+ return s.pool.Writable()
+}
+
+func (s *storageCephFs) GetStoragePoolVolumeWritable() api.StorageVolumePut {
+ return s.volume.Writable()
+}
+
+func (s *storageCephFs) SetStoragePoolWritable(writable *api.StoragePoolPut) {
+ s.pool.StoragePoolPut = *writable
+}
+
+func (s *storageCephFs) SetStoragePoolVolumeWritable(writable *api.StorageVolumePut) {
+ s.volume.StorageVolumePut = *writable
+}
+
+func (s *storageCephFs) GetContainerPoolInfo() (int64, string, string) {
+ return s.poolID, s.pool.Name, s.pool.Name
+}
+
+func (s *storageCephFs) StoragePoolUpdate(writable *api.StoragePoolPut, changedConfig []string) error {
+ logger.Infof(`Updating CEPHFS storage pool "%s"`, s.pool.Name)
+
+ _, err := s.StoragePoolMount()
+ if err != nil {
+ return err
+ }
+
+ changeable := changeableStoragePoolProperties["cephfs"]
+ unchangeable := []string{}
+ for _, change := range changedConfig {
+ if !shared.StringInSlice(change, changeable) {
+ unchangeable = append(unchangeable, change)
+ }
+ }
+
+ if len(unchangeable) > 0 {
+ return updateStoragePoolError(unchangeable, "cephfs")
+ }
+
+ // "rsync.bwlimit" requires no on-disk modifications.
+
+ logger.Infof(`Updated CEPHFS storage pool "%s"`, s.pool.Name)
+ return nil
+}
+
+// Functions dealing with storage pools.
+func (s *storageCephFs) StoragePoolVolumeCreate() error {
+ logger.Infof("Creating CEPHFS storage volume \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+
+ _, err := s.StoragePoolMount()
+ if err != nil {
+ return err
+ }
+
+ source := s.pool.Config["source"]
+ if source == "" {
+ return fmt.Errorf("no \"source\" property found for the storage pool")
+ }
+
+ isSnapshot := shared.IsSnapshot(s.volume.Name)
+
+ var storageVolumePath string
+
+ if isSnapshot {
+ storageVolumePath = getStoragePoolVolumeSnapshotMountPoint(s.pool.Name, s.volume.Name)
+ } else {
+ storageVolumePath = getStoragePoolVolumeMountPoint(s.pool.Name, s.volume.Name)
+ }
+
+ err = os.MkdirAll(storageVolumePath, 0711)
+ if err != nil {
+ return err
+ }
+
+ logger.Infof("Created CEPHFS storage volume \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+ return nil
+}
+
+func (s *storageCephFs) StoragePoolVolumeDelete() error {
+ logger.Infof("Deleting CEPHFS storage volume \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+
+ source := s.pool.Config["source"]
+ if source == "" {
+ return fmt.Errorf("no \"source\" property found for the storage pool")
+ }
+
+ storageVolumePath := getStoragePoolVolumeMountPoint(s.pool.Name, s.volume.Name)
+ if !shared.PathExists(storageVolumePath) {
+ return nil
+ }
+
+ err := os.RemoveAll(storageVolumePath)
+ if err != nil {
+ return err
+ }
+
+ err = s.s.Cluster.StoragePoolVolumeDelete(
+ "default",
+ s.volume.Name,
+ storagePoolVolumeTypeCustom,
+ s.poolID)
+ if err != nil {
+ logger.Errorf(`Failed to delete database entry for CEPHFS storage volume "%s" on storage pool "%s"`,
+ s.volume.Name, s.pool.Name)
+ }
+
+ logger.Infof("Deleted CEPHFS storage volume \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+ return nil
+}
+
+func (s *storageCephFs) StoragePoolVolumeMount() (bool, error) {
+ return true, nil
+}
+
+func (s *storageCephFs) StoragePoolVolumeUmount() (bool, error) {
+ return true, nil
+}
+
+func (s *storageCephFs) StoragePoolVolumeUpdate(writable *api.StorageVolumePut, changedConfig []string) error {
+ if writable.Restore == "" {
+ logger.Infof(`Updating CEPHFS storage volume "%s"`, s.volume.Name)
+ }
+
+ _, err := s.StoragePoolMount()
+ if err != nil {
+ return err
+ }
+
+ if writable.Restore != "" {
+ logger.Infof(`Restoring CEPHFS storage volume "%s" from snapshot "%s"`,
+ s.volume.Name, writable.Restore)
+
+ sourcePath := getStoragePoolVolumeSnapshotMountPoint(s.pool.Name,
+ fmt.Sprintf("%s/%s", s.volume.Name, writable.Restore))
+ targetPath := getStoragePoolVolumeMountPoint(s.pool.Name, s.volume.Name)
+
+ // Restore using rsync
+ bwlimit := s.pool.Config["rsync.bwlimit"]
+ output, err := rsyncLocalCopy(sourcePath, targetPath, bwlimit)
+ if err != nil {
+ return fmt.Errorf("failed to rsync container: %s: %s", string(output), err)
+ }
+
+ logger.Infof(`Restored CEPHFS storage volume "%s" from snapshot "%s"`,
+ s.volume.Name, writable.Restore)
+ return nil
+ }
+
+ changeable := changeableStoragePoolVolumeProperties["cephfs"]
+ unchangeable := []string{}
+ for _, change := range changedConfig {
+ if !shared.StringInSlice(change, changeable) {
+ unchangeable = append(unchangeable, change)
+ }
+ }
+
+ if len(unchangeable) > 0 {
+ return updateStoragePoolVolumeError(unchangeable, "cephfs")
+ }
+
+ logger.Infof(`Updated CEPHFS storage volume "%s"`, s.volume.Name)
+ return nil
+}
+
+func (s *storageCephFs) StoragePoolVolumeRename(newName string) error {
+ logger.Infof(`Renaming CEPHFS storage volume on storage pool "%s" from "%s" to "%s`,
+ s.pool.Name, s.volume.Name, newName)
+
+ _, err := s.StoragePoolMount()
+ if err != nil {
+ return err
+ }
+
+ usedBy, err := storagePoolVolumeUsedByContainersGet(s.s, "default", s.volume.Name, storagePoolVolumeTypeNameCustom)
+ if err != nil {
+ return err
+ }
+ if len(usedBy) > 0 {
+ return fmt.Errorf(`CEPHFS storage volume "%s" on storage pool "%s" is attached to containers`,
+ s.volume.Name, s.pool.Name)
+ }
+
+ oldPath := getStoragePoolVolumeMountPoint(s.pool.Name, s.volume.Name)
+ newPath := getStoragePoolVolumeMountPoint(s.pool.Name, newName)
+ err = os.Rename(oldPath, newPath)
+ if err != nil {
+ return err
+ }
+
+ logger.Infof(`Renamed CEPHFS storage volume on storage pool "%s" from "%s" to "%s`,
+ s.pool.Name, s.volume.Name, newName)
+
+ return s.s.Cluster.StoragePoolVolumeRename("default", s.volume.Name, newName,
+ storagePoolVolumeTypeCustom, s.poolID)
+}
+
+func (s *storageCephFs) ContainerStorageReady(container container) bool {
+ containerMntPoint := getContainerMountPoint(container.Project(), s.pool.Name, container.Name())
+ ok, _ := shared.PathIsEmpty(containerMntPoint)
+ return !ok
+}
+
+func (s *storageCephFs) ContainerCreate(container container) error {
+ logger.Debugf("Creating empty CEPHFS storage volume for container \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+
+ _, err := s.StoragePoolMount()
+ if err != nil {
+ return err
+ }
+
+ source := s.pool.Config["source"]
+ if source == "" {
+ return fmt.Errorf("no \"source\" property found for the storage pool")
+ }
+
+ containerMntPoint := getContainerMountPoint(container.Project(), s.pool.Name, container.Name())
+ err = createContainerMountpoint(containerMntPoint, container.Path(), container.IsPrivileged())
+ if err != nil {
+ return err
+ }
+ revert := true
+ defer func() {
+ if !revert {
+ return
+ }
+ deleteContainerMountpoint(containerMntPoint, container.Path(), s.GetStorageTypeName())
+ }()
+
+ err = container.TemplateApply("create")
+ if err != nil {
+ return errors.Wrap(err, "Apply template")
+ }
+
+ revert = false
+
+ logger.Debugf("Created empty CEPHFS storage volume for container \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+ return nil
+}
+
+func (s *storageCephFs) ContainerCreateFromImage(container container, imageFingerprint string, tracker *ioprogress.ProgressTracker) error {
+ logger.Debugf("Creating CEPHFS storage volume for container \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+
+ _, err := s.StoragePoolMount()
+ if err != nil {
+ return err
+ }
+
+ source := s.pool.Config["source"]
+ if source == "" {
+ return fmt.Errorf("no \"source\" property found for the storage pool")
+ }
+
+ privileged := container.IsPrivileged()
+ containerName := container.Name()
+ containerMntPoint := getContainerMountPoint(container.Project(), s.pool.Name, containerName)
+ err = createContainerMountpoint(containerMntPoint, container.Path(), privileged)
+ if err != nil {
+ return errors.Wrap(err, "Create container mount point")
+ }
+ revert := true
+ defer func() {
+ if !revert {
+ return
+ }
+ s.ContainerDelete(container)
+ }()
+
+ imagePath := shared.VarPath("images", imageFingerprint)
+ err = unpackImage(imagePath, containerMntPoint, storageTypeCephFs, s.s.OS.RunningInUserNS, nil)
+ if err != nil {
+ return errors.Wrap(err, "Unpack image")
+ }
+
+ err = container.TemplateApply("create")
+ if err != nil {
+ return errors.Wrap(err, "Apply template")
+ }
+
+ revert = false
+
+ logger.Debugf("Created CEPHFS storage volume for container \"%s\" on storage pool \"%s\"", s.volume.Name, s.pool.Name)
+ return nil
+}
+
+func (s *storageCephFs) ContainerCanRestore(container container, sourceContainer container) error {
+ return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerDelete(container container) error {
+ return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerCopy(target container, source container, containerOnly bool) error {
+ return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerRefresh(target container, source container, snapshots []container) error {
+ return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerMount(c container) (bool, error) {
+ return false, fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerUmount(c container, path string) (bool, error) {
+ return false, fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerRename(container container, newName string) error {
+ return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerRestore(container container, sourceContainer container) error {
+ return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerGetUsage(c container) (int64, error) {
+ return -1, fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerSnapshotCreate(snapshotContainer container, sourceContainer container) error {
+ return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerSnapshotCreateEmpty(snapshotContainer container) error {
+ return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerSnapshotDelete(snapshotContainer container) error {
+ return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerSnapshotRename(snapshotContainer container, newName string) error {
+ return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerSnapshotStart(container container) (bool, error) {
+ return false, fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerSnapshotStop(container container) (bool, error) {
+ return false, fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerBackupCreate(backup backup, source container) error {
+ return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ContainerBackupLoad(info backupInfo, data io.ReadSeeker, tarArgs []string) error {
+ return fmt.Errorf("CEPHfs cannot be used for containers")
+}
+
+func (s *storageCephFs) ImageCreate(fingerprint string, tracker *ioprogress.ProgressTracker) error {
+ return fmt.Errorf("CEPHfs cannot be used for images")
+}
+
+func (s *storageCephFs) ImageDelete(fingerprint string) error {
+ return fmt.Errorf("CEPHfs cannot be used for images")
+}
+
+func (s *storageCephFs) ImageMount(fingerprint string) (bool, error) {
+ return false, fmt.Errorf("CEPHfs cannot be used for images")
+}
+
+func (s *storageCephFs) ImageUmount(fingerprint string) (bool, error) {
+ return false, fmt.Errorf("CEPHfs cannot be used for images")
+}
+
+func (s *storageCephFs) MigrationType() migration.MigrationFSType {
+ return migration.MigrationFSType_RSYNC
+}
+
+func (s *storageCephFs) PreservesInodes() bool {
+ return false
+}
+
+func (s *storageCephFs) MigrationSource(args MigrationSourceArgs) (MigrationStorageSourceDriver, error) {
+ return rsyncMigrationSource(args)
+}
+
+func (s *storageCephFs) MigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+ return rsyncMigrationSink(conn, op, args)
+}
+
+func (s *storageCephFs) StorageEntitySetQuota(volumeType int, size int64, data interface{}) error {
+ // FIXME this may be possible to do with
+ // setfattr -n ceph.quota.max_bytes -v 100000000 /some/dir # 100 MB
+ return fmt.Errorf("TODO")
+}
+
+func (s *storageCephFs) StoragePoolResources() (*api.ResourcesStoragePool, error) {
+ _, err := s.StoragePoolMount()
+ if err != nil {
+ return nil, err
+ }
+
+ poolMntPoint := getStoragePoolMountPoint(s.pool.Name)
+
+ return storageResource(poolMntPoint)
+}
+
+func (s *storageCephFs) StoragePoolVolumeCopy(source *api.StorageVolumeSource) error {
+ logger.Infof("Copying CEPHFS storage volume \"%s\" on storage pool \"%s\" as \"%s\" to storage pool \"%s\"", source.Name, source.Pool, s.volume.Name, s.pool.Name)
+ successMsg := fmt.Sprintf("Copied CEPHFS storage volume \"%s\" on storage pool \"%s\" as \"%s\" to storage pool \"%s\"", source.Name, source.Pool, s.volume.Name, s.pool.Name)
+
+ if s.pool.Name != source.Pool {
+ // setup storage for the source volume
+ srcStorage, err := storagePoolVolumeInit(s.s, "default", source.Pool, source.Name, storagePoolVolumeTypeCustom)
+ if err != nil {
+ logger.Errorf("Failed to initialize CEPHFS storage volume \"%s\" on storage pool \"%s\": %s", s.volume.Name, s.pool.Name, err)
+ return err
+ }
+
+ ourMount, err := srcStorage.StoragePoolMount()
+ if err != nil {
+ logger.Errorf("Failed to mount CEPHFS storage volume \"%s\" on storage pool \"%s\": %s", s.volume.Name, s.pool.Name, err)
+ return err
+ }
+ if ourMount {
+ defer srcStorage.StoragePoolUmount()
+ }
+ }
+
+ err := s.copyVolume(source.Pool, source.Name, s.volume.Name)
+ if err != nil {
+ return err
+ }
+
+ if source.VolumeOnly {
+ logger.Infof(successMsg)
+ return nil
+ }
+
+ snapshots, err := storagePoolVolumeSnapshotsGet(s.s, source.Pool, source.Name, storagePoolVolumeTypeCustom)
+ if err != nil {
+ return err
+ }
+
+ for _, snap := range snapshots {
+ _, snapOnlyName, _ := containerGetParentAndSnapshotName(snap)
+ err = s.copyVolumeSnapshot(source.Pool, snap, fmt.Sprintf("%s/%s", s.volume.Name, snapOnlyName))
+ if err != nil {
+ return err
+ }
+ }
+
+ logger.Infof(successMsg)
+ return nil
+}
+
+func (s *storageCephFs) StorageMigrationSource(args MigrationSourceArgs) (MigrationStorageSourceDriver, error) {
+ return rsyncStorageMigrationSource(args)
+}
+
+func (s *storageCephFs) StorageMigrationSink(conn *websocket.Conn, op *operation, args MigrationSinkArgs) error {
+ return rsyncStorageMigrationSink(conn, op, args)
+}
+
+func (s *storageCephFs) GetStoragePool() *api.StoragePool {
+ return s.pool
+}
+
+func (s *storageCephFs) GetStoragePoolVolume() *api.StorageVolume {
+ return s.volume
+}
+
+func (s *storageCephFs) GetState() *state.State {
+ return s.s
+}
+
+func (s *storageCephFs) StoragePoolVolumeSnapshotCreate(target *api.StorageVolumeSnapshotsPost) error {
+ return fmt.Errorf("TODO")
+}
+
+func (s *storageCephFs) StoragePoolVolumeSnapshotDelete() error {
+ return fmt.Errorf("TODO")
+}
+
+func (s *storageCephFs) StoragePoolVolumeSnapshotRename(newName string) error {
+ logger.Infof("Renaming CEPHFS storage volume on storage pool \"%s\" from \"%s\" to \"%s\"", s.pool.Name, s.volume.Name, newName)
+ var fullSnapshotName string
+
+ if shared.IsSnapshot(newName) {
+ // When renaming volume snapshots, newName will contain the full snapshot name
+ fullSnapshotName = newName
+ } else {
+ sourceName, _, ok := containerGetParentAndSnapshotName(s.volume.Name)
+ if !ok {
+ return fmt.Errorf("Not a snapshot name")
+ }
+
+ fullSnapshotName = fmt.Sprintf("%s%s%s", sourceName, shared.SnapshotDelimiter, newName)
+ }
+
+ oldPath := getStoragePoolVolumeSnapshotMountPoint(s.pool.Name, s.volume.Name)
+ newPath := getStoragePoolVolumeSnapshotMountPoint(s.pool.Name, fullSnapshotName)
+
+ if !shared.PathExists(newPath) {
+ err := os.MkdirAll(newPath, customDirMode)
+ if err != nil {
+ return err
+ }
+ }
+
+ err := os.Rename(oldPath, newPath)
+ if err != nil {
+ return err
+ }
+
+ logger.Infof("Renamed CEPHFS storage volume on storage pool \"%s\" from \"%s\" to \"%s\"", s.pool.Name, s.volume.Name, newName)
+ return s.s.Cluster.StoragePoolVolumeRename("default", s.volume.Name, fullSnapshotName, storagePoolVolumeTypeCustom, s.poolID)
+}
+
+func (s *storageCephFs) copyVolume(sourcePool string, source string, target string) error {
+ var srcMountPoint string
+
+ if shared.IsSnapshot(source) {
+ srcMountPoint = getStoragePoolVolumeSnapshotMountPoint(sourcePool, source)
+ } else {
+ srcMountPoint = getStoragePoolVolumeMountPoint(sourcePool, source)
+ }
+
+ dstMountPoint := getStoragePoolVolumeMountPoint(s.pool.Name, target)
+
+ err := os.MkdirAll(dstMountPoint, 0711)
+ if err != nil {
+ return err
+ }
+
+ bwlimit := s.pool.Config["rsync.bwlimit"]
+
+ _, err = rsyncLocalCopy(srcMountPoint, dstMountPoint, bwlimit)
+ if err != nil {
+ os.RemoveAll(dstMountPoint)
+ logger.Errorf("Failed to rsync into CEPHFS storage volume \"%s\" on storage pool \"%s\": %s", s.volume.Name, s.pool.Name, err)
+ return err
+ }
+
+ return nil
+}
+
+func (s *storageCephFs) copyVolumeSnapshot(sourcePool string, source string, target string) error {
+ srcMountPoint := getStoragePoolVolumeSnapshotMountPoint(sourcePool, source)
+ dstMountPoint := getStoragePoolVolumeSnapshotMountPoint(s.pool.Name, target)
+
+ err := os.MkdirAll(dstMountPoint, 0711)
+ if err != nil {
+ return err
+ }
+
+ bwlimit := s.pool.Config["rsync.bwlimit"]
+
+ _, err = rsyncLocalCopy(srcMountPoint, dstMountPoint, bwlimit)
+ if err != nil {
+ os.RemoveAll(dstMountPoint)
+ logger.Errorf("Failed to rsync into CEPHFS storage volume \"%s\" on storage pool \"%s\": %s", target, s.pool.Name, err)
+ return err
+ }
+
+ return nil
+}
+
+func cephFsExists(clusterName string, userName string, fsName string) bool {
+ _, err := shared.RunCommand("ceph", "--name", fmt.Sprintf("client.%s", userName), "--cluster", clusterName, "fs", "get", fsName)
+ if err != nil {
+ return false
+ }
+
+ return true
+}
+
+func cephFsConfig(clusterName string, userName string) (string, string, error) {
+ // Parse the CEPH configuration
+ cephConf, err := os.Open(fmt.Sprintf("/etc/ceph/%s.conf", clusterName))
+ if err != nil {
+ return "", "", err
+ }
+
+ var cephMon string
+
+ scan := bufio.NewScanner(cephConf)
+ for scan.Scan() {
+ line := scan.Text()
+ line = strings.TrimSpace(line)
+
+ if line == "" {
+ continue
+ }
+
+ if strings.HasPrefix(line, "mon_host") {
+ fields := strings.SplitN(line, "=", 2)
+ if len(fields) < 2 {
+ continue
+ }
+
+ cephMon = strings.TrimSpace(fields[1])
+ break
+ }
+ }
+
+ if cephMon == "" {
+ return "", "", fmt.Errorf("Couldn't find a CPEH mon")
+ }
+
+ // Parse the CEPH keyring
+ cephKeyring, err := os.Open(fmt.Sprintf("/etc/ceph/%v.client.%v.keyring", clusterName, userName))
+ if err != nil {
+ return "", "", err
+ }
+
+ var cephSecret string
+
+ scan = bufio.NewScanner(cephKeyring)
+ for scan.Scan() {
+ line := scan.Text()
+ line = strings.TrimSpace(line)
+
+ if line == "" {
+ continue
+ }
+
+ if strings.HasPrefix(line, "key") {
+ fields := strings.SplitN(line, "=", 2)
+ if len(fields) < 2 {
+ continue
+ }
+
+ cephSecret = strings.TrimSpace(fields[1])
+ break
+ }
+ }
+
+ if cephSecret == "" {
+ return "", "", fmt.Errorf("Couldn't find a keyring entry")
+ }
+
+
+ return cephMon, cephSecret, nil
+}
diff --git a/lxd/storage_pools_config.go b/lxd/storage_pools_config.go
index 228b2abb45..4e654ec01c 100644
--- a/lxd/storage_pools_config.go
+++ b/lxd/storage_pools_config.go
@@ -24,6 +24,9 @@ var changeableStoragePoolProperties = map[string][]string{
"volume.block.mount_options",
"volume.size"},
+ "cephfs": {
+ "rsync.bwlimit"},
+
"dir": {
"rsync.bwlimit"},
@@ -64,6 +67,11 @@ var storagePoolConfigKeys = map[string]func(value string) error{
"ceph.rbd.clone_copy": shared.IsBool,
"ceph.user.name": shared.IsAny,
+ // valid drivers: cephfs
+ "cephfs.cluster_name": shared.IsAny,
+ "cephfs.path": shared.IsAny,
+ "cephfs.user.name": shared.IsAny,
+
// valid drivers: lvm
"lvm.thinpool_name": shared.IsAny,
"lvm.use_thinpool": shared.IsBool,
@@ -236,7 +244,7 @@ func storagePoolFillDefault(name string, driver string, config map[string]string
}
}
- if driver == "btrfs" || driver == "ceph" || driver == "lvm" || driver == "zfs" {
+ if driver == "btrfs" || driver == "ceph" || driver == "cephfs" || driver == "lvm" || driver == "zfs" {
if config["volume.size"] != "" {
_, err := shared.ParseByteSizeString(config["volume.size"])
if err != nil {
diff --git a/lxd/storage_pools_utils.go b/lxd/storage_pools_utils.go
index e2dd28144b..5b52dc5aa9 100644
--- a/lxd/storage_pools_utils.go
+++ b/lxd/storage_pools_utils.go
@@ -11,7 +11,7 @@ import (
"github.com/lxc/lxd/shared/version"
)
-var supportedPoolTypes = []string{"btrfs", "ceph", "dir", "lvm", "zfs"}
+var supportedPoolTypes = []string{"btrfs", "ceph", "cephfs", "dir", "lvm", "zfs"}
func storagePoolUpdate(state *state.State, name, newDescription string, newConfig map[string]string, withDB bool) error {
s, err := storagePoolInit(state, name)
diff --git a/lxd/storage_volumes_config.go b/lxd/storage_volumes_config.go
index f45309063c..d834a2efe4 100644
--- a/lxd/storage_volumes_config.go
+++ b/lxd/storage_volumes_config.go
@@ -59,6 +59,11 @@ var changeableStoragePoolVolumeProperties = map[string][]string{
"security.unmapped",
"size"},
+ "cephfs": {
+ "security.unmapped",
+ "size",
+ },
+
"dir": {
"security.unmapped",
},
@@ -75,7 +80,7 @@ var changeableStoragePoolVolumeProperties = map[string][]string{
"zfs.use_refquota"},
}
-// btrfs, ceph, dir, lvm, zfs
+// btrfs, ceph, cephfs, dir, lvm, zfs
var storageVolumeConfigKeys = map[string]func(value string) ([]string, error){
"block.filesystem": func(value string) ([]string, error) {
err := shared.IsOneOf(value, []string{"btrfs", "ext4", "xfs"})
@@ -93,7 +98,7 @@ var storageVolumeConfigKeys = map[string]func(value string) ([]string, error){
},
"size": func(value string) ([]string, error) {
if value == "" {
- return []string{"btrfs", "ceph", "lvm", "zfs"}, nil
+ return []string{"btrfs", "ceph", "cephfs", "lvm", "zfs"}, nil
}
_, err := shared.ParseByteSizeString(value)
@@ -101,7 +106,7 @@ var storageVolumeConfigKeys = map[string]func(value string) ([]string, error){
return nil, err
}
- return []string{"btrfs", "ceph", "lvm", "zfs"}, nil
+ return []string{"btrfs", "ceph", "cephfs", "lvm", "zfs"}, nil
},
"volatile.idmap.last": func(value string) ([]string, error) {
return supportedPoolTypes, shared.IsAny(value)
More information about the lxc-devel
mailing list