[lxc-devel] [lxd/master] ceph: add ceph.user.name

brauner on Github lxc-bot at linuxcontainers.org
Wed Jul 26 22:26:56 UTC 2017


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 381 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20170726/39a687c2/attachment.bin>
-------------- next part --------------
From fd1dea8b0004375c9275c6a4a886d8b936154d8a Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 27 Jul 2017 00:17:42 +0200
Subject: [PATCH] ceph: add ceph.user.name

Closes #3597.

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 doc/api-extensions.md         |   3 +
 doc/storage.md                |   1 +
 lxd/api_1.0.go                |   1 +
 lxd/storage_ceph.go           | 277 ++++++++++++++++++------------------------
 lxd/storage_ceph_migration.go |  24 ++--
 lxd/storage_ceph_utils.go     | 149 ++++++++++++++---------
 lxd/storage_pools_config.go   |   1 +
 7 files changed, 223 insertions(+), 233 deletions(-)

diff --git a/doc/api-extensions.md b/doc/api-extensions.md
index 9a2f0363d..a1a8a6f62 100644
--- a/doc/api-extensions.md
+++ b/doc/api-extensions.md
@@ -315,3 +315,6 @@ This enables migrating stateful container snapshots to new containers.
 
 ## storage\_driver\_ceph
 This adds a ceph storage driver.
+
+## storage\_ceph\_user\_name
+This adds the ability to specify the ceph user.
diff --git a/doc/storage.md b/doc/storage.md
index 6424d3b96..ef4edd2ec 100644
--- a/doc/storage.md
+++ b/doc/storage.md
@@ -14,6 +14,7 @@ ceph.cluster\_name              | string    | ceph driver
 ceph.osd.pool\_name             | string    | ceph driver                       | name of the pool           | storage\_driver\_ceph          | Name of the osd storage pool.
 ceph.osd.pg\_num                | string    | ceph driver                       | 32                         | storage\_driver\_ceph          | Number of placement groups for the osd storage pool.
 ceph.rbd.clone\_copy            | string    | ceph driver                       | true                       | storage\_driver\_ceph          | Whether to use RBD lightweight clones rather than full dataset copies.
+ceph.user.name                  | string    | ceph driver                       | admin                      | storage\_ceph\_user\_name      | The ceph user to use when creating storage pools and volumes.
 lvm.thinpool\_name              | string    | lvm driver                        | LXDPool                    | storage                        | Thin pool where images and containers are created.
 lvm.use\_thinpool               | bool      | lvm driver                        | true                       | storage\_lvm\_use\_thinpool    | Whether the storage pool uses a thinpool for logical volumes.
 lvm.vg\_name                    | string    | lvm driver                        | name of the pool           | storage                        | Name of the volume group to create.
diff --git a/lxd/api_1.0.go b/lxd/api_1.0.go
index 8b61b2a98..53b8d1718 100644
--- a/lxd/api_1.0.go
+++ b/lxd/api_1.0.go
@@ -119,6 +119,7 @@ func api10Get(d *Daemon, r *http.Request) Response {
 			"container_edit_metadata",
 			"container_snapshot_stateful_migration",
 			"storage_driver_ceph",
+			"storage_ceph_user_name",
 		},
 		APIStatus:  "stable",
 		APIVersion: version.APIVersion,
diff --git a/lxd/storage_ceph.go b/lxd/storage_ceph.go
index a4cb4d947..6505aa2fb 100644
--- a/lxd/storage_ceph.go
+++ b/lxd/storage_ceph.go
@@ -14,6 +14,7 @@ import (
 type storageCeph struct {
 	ClusterName string
 	OSDPoolName string
+	UserName    string
 	PGNum       string
 	storageShared
 }
@@ -55,6 +56,13 @@ func (s *storageCeph) StoragePoolInit() error {
 		s.OSDPoolName = s.pool.Config["ceph.osd.pool_name"]
 	}
 
+	// set ceph user name
+	if s.pool.Config["ceph.user.name"] != "" {
+		s.UserName = s.pool.Config["ceph.user.name"]
+	} else {
+		s.UserName = "admin"
+	}
+
 	// set default placement group number
 	if s.pool.Config["ceph.osd.pg_num"] != "" {
 		_, err = shared.ParseByteSizeString(s.pool.Config["ceph.osd.pg_num"])
@@ -103,7 +111,7 @@ func (s *storageCeph) StoragePoolCreate() error {
 		s.OSDPoolName = s.pool.Name
 	}
 
-	if !cephOSDPoolExists(s.ClusterName, s.OSDPoolName) {
+	if !cephOSDPoolExists(s.ClusterName, s.OSDPoolName, s.UserName) {
 		logger.Debugf(`CEPH OSD storage pool "%s" does not exist`, s.OSDPoolName)
 
 		// create new osd pool
@@ -124,7 +132,8 @@ func (s *storageCeph) StoragePoolCreate() error {
 				return
 			}
 
-			err := cephOSDPoolDestroy(s.ClusterName, s.OSDPoolName)
+			err := cephOSDPoolDestroy(s.ClusterName, s.OSDPoolName,
+				s.UserName)
 			if err != nil {
 				logger.Warnf(`Failed to delete ceph storage `+
 					`pool "%s" in cluster "%s": %s`,
@@ -210,19 +219,16 @@ func (s *storageCeph) StoragePoolCreate() error {
 		}
 	}()
 
-	ok := cephRBDVolumeExists(s.ClusterName, s.OSDPoolName, s.pool.Name, "lxd")
+	ok := cephRBDVolumeExists(s.ClusterName, s.OSDPoolName, s.pool.Name,
+		"lxd", s.UserName)
 	s.pool.Config["volatile.pool.pristine"] = "false"
 	if !ok {
 		s.pool.Config["volatile.pool.pristine"] = "true"
 		// Create dummy storage volume. Other LXD instances will use
 		// this to detect whether this osd pool is already in use by
 		// another LXD instance.
-		err = cephRBDVolumeCreate(
-			s.ClusterName,
-			s.OSDPoolName,
-			s.pool.Name,
-			"lxd",
-			"0")
+		err = cephRBDVolumeCreate(s.ClusterName, s.OSDPoolName,
+			s.pool.Name, "lxd", "0", s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to create RBD storage volume `+
 				`"%s" on storage pool "%s": %s`, s.pool.Name,
@@ -246,7 +252,7 @@ func (s *storageCeph) StoragePoolDelete() error {
 		s.pool.Name, s.ClusterName)
 
 	// test if pool exists
-	if !cephOSDPoolExists(s.ClusterName, s.OSDPoolName) {
+	if !cephOSDPoolExists(s.ClusterName, s.OSDPoolName, s.UserName) {
 		msg := fmt.Sprintf(`CEPH osd storage pool "%s" does not exist `+
 			`in cluster "%s"`, s.OSDPoolName, s.ClusterName)
 		logger.Errorf(msg)
@@ -261,7 +267,8 @@ func (s *storageCeph) StoragePoolDelete() error {
 			s.OSDPoolName, s.ClusterName)
 
 		// Delete the osd pool.
-		err := cephOSDPoolDestroy(s.ClusterName, s.OSDPoolName)
+		err := cephOSDPoolDestroy(s.ClusterName, s.OSDPoolName,
+			s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to delete CEPH OSD storage pool `+
 				`"%s" in cluster "%s": %s`, s.pool.Name,
@@ -337,12 +344,8 @@ func (s *storageCeph) StoragePoolVolumeCreate() error {
 		`storage pool "%s"`, RBDSize, s.volume.Name, s.pool.Name)
 
 	// create volume
-	err = cephRBDVolumeCreate(
-		s.ClusterName,
-		s.OSDPoolName,
-		s.volume.Name,
-		storagePoolVolumeTypeNameCustom,
-		RBDSize)
+	err = cephRBDVolumeCreate(s.ClusterName, s.OSDPoolName, s.volume.Name,
+		storagePoolVolumeTypeNameCustom, RBDSize, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to create RBD storage volume "%s" on `+
 			`storage pool "%s": %s`, s.volume.Name, s.pool.Name, err)
@@ -357,7 +360,7 @@ func (s *storageCeph) StoragePoolVolumeCreate() error {
 		}
 
 		err := cephRBDVolumeDelete(s.ClusterName, s.OSDPoolName,
-			s.volume.Name, storagePoolVolumeTypeNameCustom)
+			s.volume.Name, storagePoolVolumeTypeNameCustom, s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to delete RBD storage volume `+
 				`"%s" on storage pool "%s": %s`, s.volume.Name,
@@ -365,11 +368,8 @@ func (s *storageCeph) StoragePoolVolumeCreate() error {
 		}
 	}()
 
-	err = cephRBDVolumeMap(
-		s.ClusterName,
-		s.OSDPoolName,
-		s.volume.Name,
-		storagePoolVolumeTypeNameCustom)
+	err = cephRBDVolumeMap(s.ClusterName, s.OSDPoolName, s.volume.Name,
+		storagePoolVolumeTypeNameCustom, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to map RBD storage volume for "%s" on `+
 			`storage pool "%s": %s`, s.volume.Name, s.pool.Name, err)
@@ -384,7 +384,7 @@ func (s *storageCeph) StoragePoolVolumeCreate() error {
 		}
 
 		err := cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName,
-			s.volume.Name, storagePoolVolumeTypeNameCustom)
+			s.volume.Name, storagePoolVolumeTypeNameCustom, s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to unmap RBD storage volume `+
 				`"%s" on storage pool "%s": %s`, s.volume.Name,
@@ -468,11 +468,8 @@ func (s *storageCeph) StoragePoolVolumeDelete() error {
 	}
 
 	// unmap
-	err := cephRBDVolumeUnmap(
-		s.ClusterName,
-		s.OSDPoolName,
-		s.volume.Name,
-		storagePoolVolumeTypeNameCustom)
+	err := cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName, s.volume.Name,
+		storagePoolVolumeTypeNameCustom, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to unmap RBD storage volume "%s" on `+
 			`storage pool "%s": %s`, s.volume.Name, s.pool.Name, err)
@@ -482,11 +479,8 @@ func (s *storageCeph) StoragePoolVolumeDelete() error {
 		s.volume.Name, s.pool.Name)
 
 	// delete
-	err = cephRBDVolumeDelete(
-		s.ClusterName,
-		s.OSDPoolName,
-		s.volume.Name,
-		storagePoolVolumeTypeNameCustom)
+	err = cephRBDVolumeDelete(s.ClusterName, s.OSDPoolName, s.volume.Name,
+		storagePoolVolumeTypeNameCustom, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to delete RBD storage volume "%s" on `+
 			`storage pool "%s": %s`, s.volume.Name, s.pool.Name, err)
@@ -652,11 +646,8 @@ func (s *storageCeph) ContainerStorageReady(name string) bool {
 	logger.Debugf(`Checking if RBD storage volume for container "%s" `+
 		`on storage pool "%s" is ready`, name, s.pool.Name)
 
-	ok := cephRBDVolumeExists(
-		s.ClusterName,
-		s.OSDPoolName,
-		name,
-		storagePoolVolumeTypeNameContainer)
+	ok := cephRBDVolumeExists(s.ClusterName, s.OSDPoolName, name,
+		storagePoolVolumeTypeNameContainer, s.UserName)
 	if !ok {
 		logger.Debugf(`RBD storage volume for container "%s" `+
 			`on storage pool "%s" does not exist`, name, s.pool.Name)
@@ -689,12 +680,8 @@ func (s *storageCeph) ContainerCreate(container container) error {
 		s.pool.Name)
 
 	// create volume
-	err = cephRBDVolumeCreate(
-		s.ClusterName,
-		s.OSDPoolName,
-		containerName,
-		storagePoolVolumeTypeNameContainer,
-		RBDSize)
+	err = cephRBDVolumeCreate(s.ClusterName, s.OSDPoolName, containerName,
+		storagePoolVolumeTypeNameContainer, RBDSize, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to create RBD storage volume for `+
 			`container "%s" on storage pool "%s": %s`,
@@ -710,7 +697,8 @@ func (s *storageCeph) ContainerCreate(container container) error {
 		}
 
 		err := cephRBDVolumeDelete(s.ClusterName, s.OSDPoolName,
-			containerName, storagePoolVolumeTypeNameContainer)
+			containerName, storagePoolVolumeTypeNameContainer,
+			s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to delete RBD storage volume for `+
 				`container "%s" on storage pool "%s": %s`,
@@ -718,11 +706,8 @@ func (s *storageCeph) ContainerCreate(container container) error {
 		}
 	}()
 
-	err = cephRBDVolumeMap(
-		s.ClusterName,
-		s.OSDPoolName,
-		containerName,
-		storagePoolVolumeTypeNameContainer)
+	err = cephRBDVolumeMap(s.ClusterName, s.OSDPoolName, containerName,
+		storagePoolVolumeTypeNameContainer, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to map RBD storage volume for `+
 			`container "%s" on storage pool "%s": %s`,
@@ -738,7 +723,8 @@ func (s *storageCeph) ContainerCreate(container container) error {
 		}
 
 		err := cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName,
-			containerName, storagePoolVolumeTypeNameContainer)
+			containerName, storagePoolVolumeTypeNameContainer,
+			s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to unmap RBD storage volume `+
 				`for container "%s" on storage pool "%s": %s`,
@@ -846,7 +832,7 @@ func (s *storageCeph) ContainerCreateFromImage(container container, fingerprint
 
 		var imgerr error
 		if !cephRBDVolumeExists(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage) {
+			fingerprint, storagePoolVolumeTypeNameImage, s.UserName) {
 			imgerr = s.ImageCreate(fingerprint)
 		}
 
@@ -864,7 +850,7 @@ func (s *storageCeph) ContainerCreateFromImage(container container, fingerprint
 
 	err := cephRBDCloneCreate(s.ClusterName, s.OSDPoolName, fingerprint,
 		storagePoolVolumeTypeNameImage, "readonly", s.OSDPoolName,
-		containerName, storagePoolVolumeTypeNameContainer)
+		containerName, storagePoolVolumeTypeNameContainer, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to clone new RBD storage volume for `+
 			`container "%s": %s`, containerName, err)
@@ -879,7 +865,8 @@ func (s *storageCeph) ContainerCreateFromImage(container container, fingerprint
 		}
 
 		err := cephRBDVolumeDelete(s.ClusterName, s.OSDPoolName,
-			containerName, storagePoolVolumeTypeNameContainer)
+			containerName, storagePoolVolumeTypeNameContainer,
+			s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to delete RBD storage volume `+
 				`for container "%s": %s`, containerName, err)
@@ -887,7 +874,7 @@ func (s *storageCeph) ContainerCreateFromImage(container container, fingerprint
 	}()
 
 	err = cephRBDVolumeMap(s.ClusterName, s.OSDPoolName, containerName,
-		storagePoolVolumeTypeNameContainer)
+		storagePoolVolumeTypeNameContainer, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to map RBD storage volume for container `+
 			`"%s"`, containerName)
@@ -902,7 +889,8 @@ func (s *storageCeph) ContainerCreateFromImage(container container, fingerprint
 		}
 
 		err := cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName,
-			containerName, storagePoolVolumeTypeNameContainer)
+			containerName, storagePoolVolumeTypeNameContainer,
+			s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to unmap RBD storage volume `+
 				`for container "%s": %s`, containerName, err)
@@ -1011,7 +999,7 @@ func (s *storageCeph) ContainerDelete(container container) error {
 
 	// delete
 	ret := cephContainerDelete(s.ClusterName, s.OSDPoolName, containerName,
-		storagePoolVolumeTypeNameContainer)
+		storagePoolVolumeTypeNameContainer, s.UserName)
 	if ret < 0 {
 		msg := fmt.Sprintf(`Failed to delete RBD storage volume for `+
 			`container "%s" on storage pool "%s"`, containerName, s.pool.Name)
@@ -1136,12 +1124,9 @@ func (s *storageCeph) ContainerCopy(target container, source container,
 		}()
 
 		// create empty dummy volume
-		err = cephRBDVolumeCreate(
-			s.ClusterName,
-			s.OSDPoolName,
-			targetContainerName,
-			storagePoolVolumeTypeNameContainer,
-			"0")
+		err = cephRBDVolumeCreate(s.ClusterName, s.OSDPoolName,
+			targetContainerName, storagePoolVolumeTypeNameContainer,
+			"0", s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to create RBD storage volume "%s" on `+
 				`storage pool "%s": %s`, targetContainerName,
@@ -1158,7 +1143,7 @@ func (s *storageCeph) ContainerCopy(target container, source container,
 
 			err := cephRBDVolumeDelete(s.ClusterName, s.OSDPoolName,
 				targetContainerName,
-				storagePoolVolumeTypeNameContainer)
+				storagePoolVolumeTypeNameContainer, s.UserName)
 			if err != nil {
 				logger.Warnf(`Failed to delete RBD storage `+
 					`volume "%s" on storage pool "%s": %s`,
@@ -1209,7 +1194,7 @@ func (s *storageCeph) ContainerCopy(target container, source container,
 				err := cephRBDSnapshotDelete(s.ClusterName,
 					s.OSDPoolName, targetContainerName,
 					storagePoolVolumeTypeNameContainer,
-					snapOnlyName)
+					snapOnlyName, s.UserName)
 				if err != nil {
 					logger.Warnf(`Failed to delete RBD `+
 						`container storage for `+
@@ -1305,11 +1290,9 @@ func (s *storageCeph) ContainerCopy(target container, source container,
 		// registered one above for the dummy volume we created.
 
 		// map the container's volume
-		err = cephRBDVolumeMap(
-			s.ClusterName,
-			s.OSDPoolName,
-			targetContainerName,
-			storagePoolVolumeTypeNameContainer)
+		err = cephRBDVolumeMap(s.ClusterName, s.OSDPoolName,
+			targetContainerName, storagePoolVolumeTypeNameContainer,
+			s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to map RBD storage volume for `+
 				`container "%s" on storage pool "%s": %s`,
@@ -1462,11 +1445,8 @@ func (s *storageCeph) ContainerRename(c container, newName string) error {
 	}
 
 	// unmap
-	err = cephRBDVolumeUnmap(
-		s.ClusterName,
-		s.OSDPoolName,
-		oldName,
-		storagePoolVolumeTypeNameContainer)
+	err = cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName, oldName,
+		storagePoolVolumeTypeNameContainer, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to unmap RBD storage volume for `+
 			`container "%s" on storage pool "%s": %s`, oldName,
@@ -1482,19 +1462,15 @@ func (s *storageCeph) ContainerRename(c container, newName string) error {
 		}
 
 		err := cephRBDVolumeMap(s.ClusterName, s.OSDPoolName, oldName,
-			storagePoolVolumeTypeNameContainer)
+			storagePoolVolumeTypeNameContainer, s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to Map RBD storage volume `+
 				`for container "%s": %s`, oldName, err)
 		}
 	}()
 
-	err = cephRBDVolumeRename(
-		s.ClusterName,
-		s.OSDPoolName,
-		storagePoolVolumeTypeNameContainer,
-		oldName,
-		newName)
+	err = cephRBDVolumeRename(s.ClusterName, s.OSDPoolName,
+		storagePoolVolumeTypeNameContainer, oldName, newName, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to rename RBD storage volume for `+
 			`container "%s" on storage pool "%s": %s`,
@@ -1510,7 +1486,8 @@ func (s *storageCeph) ContainerRename(c container, newName string) error {
 		}
 
 		err = cephRBDVolumeRename(s.ClusterName, s.OSDPoolName,
-			storagePoolVolumeTypeNameContainer, newName, oldName)
+			storagePoolVolumeTypeNameContainer, newName, oldName,
+			s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to rename RBD storage volume for `+
 				`container "%s" on storage pool "%s": %s`,
@@ -1519,11 +1496,8 @@ func (s *storageCeph) ContainerRename(c container, newName string) error {
 	}()
 
 	// map
-	err = cephRBDVolumeMap(
-		s.ClusterName,
-		s.OSDPoolName,
-		newName,
-		storagePoolVolumeTypeNameContainer)
+	err = cephRBDVolumeMap(s.ClusterName, s.OSDPoolName, newName,
+		storagePoolVolumeTypeNameContainer, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to map RBD storage volume for `+
 			`container "%s" on storage pool "%s": %s`, newName,
@@ -1539,7 +1513,7 @@ func (s *storageCeph) ContainerRename(c container, newName string) error {
 		}
 
 		err := cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName, newName,
-			storagePoolVolumeTypeNameContainer)
+			storagePoolVolumeTypeNameContainer, s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to unmap RBD storage volume `+
 				`for container "%s": %s`, newName, err)
@@ -1655,12 +1629,9 @@ func (s *storageCeph) ContainerRestore(target container, source container) error
 
 	sourceContainerOnlyName, sourceSnapshotOnlyName, _ := containerGetParentAndSnapshotName(sourceName)
 	prefixedSourceSnapOnlyName := fmt.Sprintf("snapshot_%s", sourceSnapshotOnlyName)
-	err = cephRBDVolumeRestore(
-		s.ClusterName,
-		s.OSDPoolName,
-		sourceContainerOnlyName,
-		storagePoolVolumeTypeNameContainer,
-		prefixedSourceSnapOnlyName)
+	err = cephRBDVolumeRestore(s.ClusterName, s.OSDPoolName,
+		sourceContainerOnlyName, storagePoolVolumeTypeNameContainer,
+		prefixedSourceSnapOnlyName, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to restore RBD storage volume for `+
 			`container "%s" from "%s": %s`,
@@ -1690,7 +1661,7 @@ func (s *storageCeph) ContainerSnapshotCreate(snapshotContainer container,
 	targetSnapshotName := fmt.Sprintf("snapshot_%s", targetSnapshotOnlyName)
 	err := cephRBDSnapshotCreate(s.ClusterName, s.OSDPoolName,
 		sourceContainerName, storagePoolVolumeTypeNameContainer,
-		targetSnapshotName)
+		targetSnapshotName, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to create snapshot for RBD storage `+
 			`volume for image "%s" on storage pool "%s": %s`,
@@ -1707,7 +1678,7 @@ func (s *storageCeph) ContainerSnapshotCreate(snapshotContainer container,
 
 		err := cephRBDSnapshotDelete(s.ClusterName, s.OSDPoolName,
 			sourceContainerName, storagePoolVolumeTypeNameContainer,
-			targetSnapshotName)
+			targetSnapshotName, s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to delete RBD `+
 				`container storage for `+
@@ -1757,7 +1728,7 @@ func (s *storageCeph) ContainerSnapshotDelete(snapshotContainer container) error
 	snapshotName := fmt.Sprintf("snapshot_%s", sourceContainerSnapOnlyName)
 	ret := cephContainerSnapshotDelete(s.ClusterName, s.OSDPoolName,
 		sourceContainerName, storagePoolVolumeTypeNameContainer,
-		snapshotName)
+		snapshotName, s.UserName)
 	if ret < 0 {
 		msg := fmt.Sprintf(`Failed to delete RBD storage volume for `+
 			`snapshot "%s" on storage pool "%s"`,
@@ -1844,13 +1815,9 @@ func (s *storageCeph) ContainerSnapshotRename(c container, newName string) error
 	oldSnapOnlyName := fmt.Sprintf("snapshot_%s", snapOnlyName)
 	_, newSnapOnlyName, _ := containerGetParentAndSnapshotName(newName)
 	newSnapOnlyName = fmt.Sprintf("snapshot_%s", newSnapOnlyName)
-	err := cephRBDVolumeSnapshotRename(
-		s.ClusterName,
-		s.OSDPoolName,
-		containerOnlyName,
-		storagePoolVolumeTypeNameContainer,
-		oldSnapOnlyName,
-		newSnapOnlyName)
+	err := cephRBDVolumeSnapshotRename(s.ClusterName, s.OSDPoolName,
+		containerOnlyName, storagePoolVolumeTypeNameContainer, oldSnapOnlyName,
+		newSnapOnlyName, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to rename RBD storage volume for `+
 			`snapshot "%s" from "%s" to "%s": %s`, oldName, oldName,
@@ -1865,7 +1832,7 @@ func (s *storageCeph) ContainerSnapshotRename(c container, newName string) error
 
 		err := cephRBDVolumeSnapshotRename(s.ClusterName, s.OSDPoolName,
 			containerOnlyName, storagePoolVolumeTypeNameContainer,
-			newSnapOnlyName, oldSnapOnlyName)
+			newSnapOnlyName, oldSnapOnlyName, s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to rename RBD storage `+
 				`volume for container "%s" on storage `+
@@ -1906,12 +1873,9 @@ func (s *storageCeph) ContainerSnapshotStart(c container) (bool, error) {
 
 	// protect
 	prefixedSnapOnlyName := fmt.Sprintf("snapshot_%s", snapOnlyName)
-	err := cephRBDSnapshotProtect(
-		s.ClusterName,
-		s.OSDPoolName,
-		containerOnlyName,
-		storagePoolVolumeTypeNameContainer,
-		prefixedSnapOnlyName)
+	err := cephRBDSnapshotProtect(s.ClusterName, s.OSDPoolName,
+		containerOnlyName, storagePoolVolumeTypeNameContainer,
+		prefixedSnapOnlyName, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to protect snapshot of RBD storage `+
 			`volume for container "%s" on storage pool "%s": %s`,
@@ -1928,7 +1892,7 @@ func (s *storageCeph) ContainerSnapshotStart(c container) (bool, error) {
 
 		err := cephRBDSnapshotUnprotect(s.ClusterName, s.OSDPoolName,
 			containerOnlyName, storagePoolVolumeTypeNameContainer,
-			prefixedSnapOnlyName)
+			prefixedSnapOnlyName, s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to unprotect snapshot of RBD `+
 				`storage volume for container "%s" on storage `+
@@ -1938,15 +1902,10 @@ func (s *storageCeph) ContainerSnapshotStart(c container) (bool, error) {
 
 	cloneName := fmt.Sprintf("%s_%s_start_clone", containerOnlyName, snapOnlyName)
 	// clone
-	err = cephRBDCloneCreate(
-		s.ClusterName,
-		s.OSDPoolName,
-		containerOnlyName,
-		storagePoolVolumeTypeNameContainer,
-		prefixedSnapOnlyName,
-		s.OSDPoolName,
-		cloneName,
-		"snapshots")
+	err = cephRBDCloneCreate(s.ClusterName, s.OSDPoolName,
+		containerOnlyName, storagePoolVolumeTypeNameContainer,
+		prefixedSnapOnlyName, s.OSDPoolName, cloneName, "snapshots",
+		s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to create clone of RBD storage volume `+
 			`for container "%s" on storage pool "%s": %s`,
@@ -1963,7 +1922,7 @@ func (s *storageCeph) ContainerSnapshotStart(c container) (bool, error) {
 
 		// delete
 		err = cephRBDVolumeDelete(s.ClusterName, s.OSDPoolName,
-			cloneName, "snapshots")
+			cloneName, "snapshots", s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to delete clone of RBD storage `+
 				`volume for container "%s" on storage pool `+
@@ -1972,11 +1931,8 @@ func (s *storageCeph) ContainerSnapshotStart(c container) (bool, error) {
 	}()
 
 	// map
-	err = cephRBDVolumeMap(
-		s.ClusterName,
-		s.OSDPoolName,
-		cloneName,
-		"snapshots")
+	err = cephRBDVolumeMap(s.ClusterName, s.OSDPoolName, cloneName,
+		"snapshots", s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to map RBD storage volume for `+
 			`container "%s" on storage pool "%s": %s`,
@@ -1992,7 +1948,7 @@ func (s *storageCeph) ContainerSnapshotStart(c container) (bool, error) {
 		}
 
 		err := cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName,
-			cloneName, "snapshots")
+			cloneName, "snapshots", s.UserName)
 		if err != nil {
 			logger.Warnf(`Failed to unmap RBD storage volume for `+
 				`container "%s" on storage pool "%s": %s`,
@@ -2043,11 +1999,8 @@ func (s *storageCeph) ContainerSnapshotStop(c container) (bool, error) {
 	containerOnlyName, snapOnlyName, _ := containerGetParentAndSnapshotName(containerName)
 	cloneName := fmt.Sprintf("%s_%s_start_clone", containerOnlyName, snapOnlyName)
 	// unmap
-	err = cephRBDVolumeUnmap(
-		s.ClusterName,
-		s.OSDPoolName,
-		cloneName,
-		"snapshots")
+	err = cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName, cloneName,
+		"snapshots", s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to unmap RBD storage volume for `+
 			`container "%s" on storage pool "%s": %s`,
@@ -2058,11 +2011,8 @@ func (s *storageCeph) ContainerSnapshotStop(c container) (bool, error) {
 		`storage pool "%s"`, containerName, s.pool.Name)
 
 	// delete
-	err = cephRBDVolumeDelete(
-		s.ClusterName,
-		s.OSDPoolName,
-		cloneName,
-		"snapshots")
+	err = cephRBDVolumeDelete(s.ClusterName, s.OSDPoolName, cloneName,
+		"snapshots", s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to delete clone of RBD storage volume `+
 			`for container "%s" on storage pool "%s": %s`,
@@ -2124,7 +2074,7 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
 
 	prefixedType := fmt.Sprintf("zombie_%s", storagePoolVolumeTypeNameImage)
 	ok := cephRBDVolumeExists(s.ClusterName, s.OSDPoolName, fingerprint,
-		prefixedType)
+		prefixedType, s.UserName)
 	if !ok {
 		logger.Debugf(`RBD storage volume for image "%s" on storage `+
 			`pool "%s" does not exist`, fingerprint, s.pool.Name)
@@ -2143,7 +2093,8 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
 
 		// create volume
 		err = cephRBDVolumeCreate(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage, RBDSize)
+			fingerprint, storagePoolVolumeTypeNameImage, RBDSize,
+			s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to create RBD storage volume `+
 				`for image "%s" on storage pool "%s": %s`,
@@ -2159,7 +2110,8 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
 			}
 
 			err := cephRBDVolumeDelete(s.ClusterName, s.OSDPoolName,
-				fingerprint, storagePoolVolumeTypeNameImage)
+				fingerprint, storagePoolVolumeTypeNameImage,
+				s.UserName)
 			if err != nil {
 				logger.Warnf(`Failed to delete RBD storage `+
 					`volume for image "%s" on storage `+
@@ -2169,7 +2121,7 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
 		}()
 
 		err = cephRBDVolumeMap(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage)
+			fingerprint, storagePoolVolumeTypeNameImage, s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to map RBD storage volume for `+
 				`image "%s" on storage pool "%s": %s`,
@@ -2185,7 +2137,8 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
 			}
 
 			err := cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName,
-				fingerprint, storagePoolVolumeTypeNameImage)
+				fingerprint, storagePoolVolumeTypeNameImage,
+				s.UserName)
 			if err != nil {
 				logger.Warnf(`Failed to unmap RBD storage `+
 					`volume for image "%s" on storage `+
@@ -2238,7 +2191,7 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
 
 		// unmap
 		err = cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage)
+			fingerprint, storagePoolVolumeTypeNameImage, s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to unmap RBD storage volume for `+
 				`image "%s" on storage pool "%s": %s`,
@@ -2250,7 +2203,8 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
 
 		// make snapshot of volume
 		err = cephRBDSnapshotCreate(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage, "readonly")
+			fingerprint, storagePoolVolumeTypeNameImage, "readonly",
+			s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to create snapshot for RBD `+
 				`storage volume for image "%s" on storage `+
@@ -2268,7 +2222,8 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
 
 			err := cephRBDSnapshotDelete(s.ClusterName,
 				s.OSDPoolName, fingerprint,
-				storagePoolVolumeTypeNameImage, "readonly")
+				storagePoolVolumeTypeNameImage, "readonly",
+				s.UserName)
 			if err != nil {
 				logger.Warnf(`Failed to delete snapshot for `+
 					`RBD storage volume for image "%s" on `+
@@ -2279,7 +2234,8 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
 
 		// protect volume so we can create clones of it
 		err = cephRBDSnapshotProtect(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage, "readonly")
+			fingerprint, storagePoolVolumeTypeNameImage, "readonly",
+			s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to protect snapshot for RBD `+
 				`storage volume for image "%s" on storage `+
@@ -2298,7 +2254,8 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
 
 			err := cephRBDSnapshotUnprotect(s.ClusterName,
 				s.OSDPoolName, fingerprint,
-				storagePoolVolumeTypeNameImage, "readonly")
+				storagePoolVolumeTypeNameImage, "readonly",
+				s.UserName)
 			if err != nil {
 				logger.Warnf(`Failed to unprotect snapshot for `+
 					`RBD storage volume for image "%s" on `+
@@ -2312,7 +2269,7 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
 
 		// unmark deleted
 		err := cephRBDVolumeUnmarkDeleted(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage)
+			fingerprint, storagePoolVolumeTypeNameImage, s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to unmark RBD storage volume `+
 				`for image "%s" on storage pool "%s" as
@@ -2329,7 +2286,7 @@ func (s *storageCeph) ImageCreate(fingerprint string) error {
 
 			err := cephRBDVolumeMarkDeleted(s.ClusterName,
 				s.OSDPoolName, fingerprint,
-				storagePoolVolumeTypeNameImage)
+				storagePoolVolumeTypeNameImage, s.UserName)
 			if err != nil {
 				logger.Warnf(`Failed to mark RBD storage `+
 					`volume for image "%s" on storage `+
@@ -2366,7 +2323,8 @@ func (s *storageCeph) ImageDelete(fingerprint string) error {
 
 	// check if image has dependent snapshots
 	_, err := cephRBDSnapshotListClones(s.ClusterName, s.OSDPoolName,
-		fingerprint, storagePoolVolumeTypeNameImage, "readonly")
+		fingerprint, storagePoolVolumeTypeNameImage, "readonly",
+		s.UserName)
 	if err != nil {
 		if err != NoSuchObjectError {
 			logger.Errorf(`Failed to list clones of RBD storage `+
@@ -2380,7 +2338,8 @@ func (s *storageCeph) ImageDelete(fingerprint string) error {
 
 		// unprotect snapshot
 		err = cephRBDSnapshotUnprotect(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage, "readonly")
+			fingerprint, storagePoolVolumeTypeNameImage, "readonly",
+			s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to unprotect snapshot for RBD `+
 				`storage volume for image "%s" on storage `+
@@ -2393,7 +2352,7 @@ func (s *storageCeph) ImageDelete(fingerprint string) error {
 
 		// delete snapshots
 		err = cephRBDSnapshotsPurge(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage)
+			fingerprint, storagePoolVolumeTypeNameImage, s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to delete snapshot for RBD `+
 				`storage volume for image "%s" on storage `+
@@ -2406,7 +2365,7 @@ func (s *storageCeph) ImageDelete(fingerprint string) error {
 
 		// unmap
 		err = cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage)
+			fingerprint, storagePoolVolumeTypeNameImage, s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to unmap RBD storage volume for `+
 				`image "%s" on storage pool "%s": %s`,
@@ -2418,7 +2377,7 @@ func (s *storageCeph) ImageDelete(fingerprint string) error {
 
 		// delete volume
 		err = cephRBDVolumeDelete(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage)
+			fingerprint, storagePoolVolumeTypeNameImage, s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to delete RBD storage volume `+
 				`for image "%s" on storage pool "%s": %s`,
@@ -2430,7 +2389,7 @@ func (s *storageCeph) ImageDelete(fingerprint string) error {
 	} else {
 		// unmap
 		err = cephRBDVolumeUnmap(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage)
+			fingerprint, storagePoolVolumeTypeNameImage, s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to unmap RBD storage volume for `+
 				`image "%s" on storage pool "%s": %s`,
@@ -2442,7 +2401,7 @@ func (s *storageCeph) ImageDelete(fingerprint string) error {
 
 		// mark deleted
 		err := cephRBDVolumeMarkDeleted(s.ClusterName, s.OSDPoolName,
-			fingerprint, storagePoolVolumeTypeNameImage)
+			fingerprint, storagePoolVolumeTypeNameImage, s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to mark RBD storage volume for `+
 				`image "%s" on storage pool "%s" as zombie: %s`,
diff --git a/lxd/storage_ceph_migration.go b/lxd/storage_ceph_migration.go
index 125b4ce37..f15bd5075 100644
--- a/lxd/storage_ceph_migration.go
+++ b/lxd/storage_ceph_migration.go
@@ -156,11 +156,9 @@ func (s *storageCeph) MigrationSource(c container, containerOnly bool) (Migratio
 	// List all the snapshots in order of reverse creation. The idea here is
 	// that we send the oldest to newest snapshot, hopefully saving on xfer
 	// costs. Then, after all that, we send the container itself.
-	snapshots, err := cephRBDVolumeListSnapshots(
-		s.ClusterName,
-		s.OSDPoolName,
-		containerName,
-		storagePoolVolumeTypeNameContainer)
+	snapshots, err := cephRBDVolumeListSnapshots(s.ClusterName,
+		s.OSDPoolName, containerName,
+		storagePoolVolumeTypeNameContainer, s.UserName)
 	if err != nil {
 		if err != NoSuchObjectError {
 			logger.Errorf(`Failed to list snapshots for RBD storage volume "%s" on storage pool "%s": %s`, containerName, s.pool.Name, err)
@@ -218,17 +216,11 @@ func (s *storageCeph) MigrationSink(live bool, c container, snapshots []*Snapsho
 	// set to the correct cluster name for that LXD instance. Yeah, I think
 	// that's actually correct.
 	containerName := c.Name()
-	if !cephRBDVolumeExists(
-		s.ClusterName,
-		s.OSDPoolName,
-		containerName,
-		storagePoolVolumeTypeNameContainer) {
-
-		err := cephRBDVolumeCreate(
-			s.ClusterName,
-			s.OSDPoolName,
-			containerName,
-			storagePoolVolumeTypeNameContainer, "0")
+	if !cephRBDVolumeExists(s.ClusterName, s.OSDPoolName, containerName,
+		storagePoolVolumeTypeNameContainer, s.UserName) {
+		err := cephRBDVolumeCreate(s.ClusterName, s.OSDPoolName,
+			containerName, storagePoolVolumeTypeNameContainer, "0",
+			s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to create RBD storage volume "%s" for cluster "%s" in OSD pool "%s" on storage pool "%s": %s`, containerName, s.ClusterName, s.OSDPoolName, s.pool.Name, err)
 			return err
diff --git a/lxd/storage_ceph_utils.go b/lxd/storage_ceph_utils.go
index 2375f46ce..96785436c 100644
--- a/lxd/storage_ceph_utils.go
+++ b/lxd/storage_ceph_utils.go
@@ -15,9 +15,10 @@ import (
 )
 
 // cephOSDPoolExists checks whether a given OSD pool exists.
-func cephOSDPoolExists(ClusterName string, poolName string) bool {
+func cephOSDPoolExists(ClusterName string, poolName string, userName string) bool {
 	_, err := shared.RunCommand(
 		"ceph",
+		"--name", fmt.Sprintf("client.%s", userName),
 		"--cluster", ClusterName,
 		"osd",
 		"pool",
@@ -38,8 +39,9 @@ func cephOSDPoolExists(ClusterName string, poolName string) bool {
 //   command will still exit 0. This means that if the caller wants to be sure
 //   that this call actually deleted an OSD pool it needs to check for the
 //   existence of the pool first.
-func cephOSDPoolDestroy(clusterName string, poolName string) error {
+func cephOSDPoolDestroy(clusterName string, poolName string, userName string) error {
 	_, err := shared.RunCommand("ceph",
+		"--name", fmt.Sprintf("client.%s", userName),
 		"--cluster", clusterName,
 		"osd",
 		"pool",
@@ -72,9 +74,10 @@ func getRBDDevPath(poolName string, volumeType string, volumeName string) string
 // library and the kernel module are minimized. Otherwise random panics might
 // occur.
 func cephRBDVolumeCreate(clusterName string, poolName string, volumeName string,
-	volumeType string, size string) error {
+	volumeType string, size string, userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--image-feature", "layering,",
 		"--cluster", clusterName,
 		"--pool", poolName,
@@ -86,9 +89,10 @@ func cephRBDVolumeCreate(clusterName string, poolName string, volumeName string,
 
 // cephRBDVolumeExists checks whether a given RBD storage volume exists.
 func cephRBDVolumeExists(clusterName string, poolName string, volumeName string,
-	volumeType string) bool {
+	volumeType string, userName string) bool {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"image-meta",
@@ -106,9 +110,10 @@ func cephRBDVolumeExists(clusterName string, poolName string, volumeName string,
 //   to be sure that this call actually deleted an RBD storage volume it needs
 //   to check for the existence of the pool first.
 func cephRBDVolumeDelete(clusterName string, poolName string, volumeName string,
-	volumeType string) error {
+	volumeType string, userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"rm",
@@ -124,9 +129,10 @@ func cephRBDVolumeDelete(clusterName string, poolName string, volumeName string,
 // This will ensure that the RBD storage volume is accessible as a block device
 // in the /dev directory and is therefore necessary in order to mount it.
 func cephRBDVolumeMap(clusterName string, poolName string, volumeName string,
-	volumeType string) error {
+	volumeType string, userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"map",
@@ -152,9 +158,10 @@ func cephRBDVolumeMap(clusterName string, poolName string, volumeName string,
 // cephRBDVolumeUnmap unmaps a given RBD storage volume
 // This is a precondition in order to delete an RBD storage volume can.
 func cephRBDVolumeUnmap(clusterName string, poolName string, volumeName string,
-	volumeType string) error {
+	volumeType string, userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"unmap",
@@ -180,9 +187,11 @@ func cephRBDVolumeUnmap(clusterName string, poolName string, volumeName string,
 // cephRBDVolumeSnapshotUnmap unmaps a given RBD snapshot
 // This is a precondition in order to delete an RBD snapshot can.
 func cephRBDVolumeSnapshotUnmap(clusterName string, poolName string,
-	volumeName string, volumeType string, snapshotName string) error {
+	volumeName string, volumeType string, snapshotName string,
+	userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"unmap",
@@ -208,9 +217,11 @@ func cephRBDVolumeSnapshotUnmap(clusterName string, poolName string,
 // cephRBDSnapshotCreate creates a read-write snapshot of a given RBD storage
 // volume
 func cephRBDSnapshotCreate(clusterName string, poolName string,
-	volumeName string, volumeType string, snapshotName string) error {
+	volumeName string, volumeType string, snapshotName string,
+	userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"snap",
@@ -227,9 +238,10 @@ func cephRBDSnapshotCreate(clusterName string, poolName string,
 // cephRBDSnapshotsPurge deletes all snapshot of a given RBD storage volume
 // Note that this will only succeed if none of the snapshots are protected.
 func cephRBDSnapshotsPurge(clusterName string, poolName string,
-	volumeName string, volumeType string) error {
+	volumeName string, volumeType string, userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"snap",
@@ -245,9 +257,11 @@ func cephRBDSnapshotsPurge(clusterName string, poolName string,
 // cephRBDSnapshotProtect protects a given snapshot from being deleted
 // This is a precondition to be able to create RBD clones from a given snapshot.
 func cephRBDSnapshotProtect(clusterName string, poolName string,
-	volumeName string, volumeType string, snapshotName string) error {
+	volumeName string, volumeType string, snapshotName string,
+	userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"snap",
@@ -276,9 +290,11 @@ func cephRBDSnapshotProtect(clusterName string, poolName string,
 // - This is a precondition to be able to delete an RBD snapshot.
 // - This command will only succeed if the snapshot does not have any clones.
 func cephRBDSnapshotUnprotect(clusterName string, poolName string,
-	volumeName string, volumeType string, snapshotName string) error {
+	volumeName string, volumeType string, snapshotName string,
+	userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"snap",
@@ -307,9 +323,11 @@ func cephRBDSnapshotUnprotect(clusterName string, poolName string,
 func cephRBDCloneCreate(sourceClusterName string, sourcePoolName string,
 	sourceVolumeName string, sourceVolumeType string,
 	sourceSnapshotName string, targetPoolName string,
-	targetVolumeName string, targetVolumeType string) error {
+	targetVolumeName string, targetVolumeType string,
+	userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", sourceClusterName,
 		"clone",
 		fmt.Sprintf("%s/%s_%s@%s", sourcePoolName, sourceVolumeType,
@@ -326,9 +344,10 @@ func cephRBDCloneCreate(sourceClusterName string, sourcePoolName string,
 // cephRBDSnapshotListClones list all clones of an RBD snapshot
 func cephRBDSnapshotListClones(clusterName string, poolName string,
 	volumeName string, volumeType string,
-	snapshotName string) ([]string, error) {
+	snapshotName string, userName string) ([]string, error) {
 	msg, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"children",
@@ -356,9 +375,10 @@ func cephRBDSnapshotListClones(clusterName string, poolName string,
 // creating a sparse copy of a container or when LXD updated an image and the
 // image still has dependent container clones.
 func cephRBDVolumeMarkDeleted(clusterName string, poolName string,
-	volumeName string, volumeType string) error {
+	volumeName string, volumeType string, userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"mv",
 		fmt.Sprintf("%s/%s_%s", poolName, volumeType, volumeName),
@@ -381,9 +401,10 @@ func cephRBDVolumeMarkDeleted(clusterName string, poolName string,
 //   the pool but is marked as "zombie" it will unmark it as a zombie instead of
 //   creating another storage volume for the image.
 func cephRBDVolumeUnmarkDeleted(clusterName string, poolName string,
-	volumeName string, volumeType string) error {
+	volumeName string, volumeType string, userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"mv",
 		fmt.Sprintf("%s/zombie_%s_%s", poolName, volumeType, volumeName),
@@ -401,9 +422,10 @@ func cephRBDVolumeUnmarkDeleted(clusterName string, poolName string,
 // under its original name and the callers maps it under its new name the image
 // will be mapped twice. This will prevent it from being deleted.
 func cephRBDVolumeRename(clusterName string, poolName string, volumeType string,
-	oldVolumeName string, newVolumeName string) error {
+	oldVolumeName string, newVolumeName string, userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"mv",
 		fmt.Sprintf("%s/%s_%s", poolName, volumeType, oldVolumeName),
@@ -423,9 +445,10 @@ func cephRBDVolumeRename(clusterName string, poolName string, volumeType string,
 // mapped twice. This will prevent it from being deleted.
 func cephRBDVolumeSnapshotRename(clusterName string, poolName string,
 	volumeName string, volumeType string, oldSnapshotName string,
-	newSnapshotName string) error {
+	newSnapshotName string, userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"snap",
 		"rename",
@@ -449,9 +472,10 @@ func cephRBDVolumeSnapshotRename(clusterName string, poolName string,
 //   The caller will usually want to parse this according to its needs. This
 //   helper library provides two small functions to do this but see below.
 func cephRBDVolumeGetParent(clusterName string, poolName string,
-	volumeName string, volumeType string) (string, error) {
+	volumeName string, volumeType string, userName string) (string, error) {
 	msg, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"info",
@@ -483,9 +507,11 @@ func cephRBDVolumeGetParent(clusterName string, poolName string,
 // This requires that the snapshot does not have any clones and is unmapped and
 // unprotected.
 func cephRBDSnapshotDelete(clusterName string, poolName string,
-	volumeName string, volumeType string, snapshotName string) error {
+	volumeName string, volumeType string, snapshotName string,
+	userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"snap",
@@ -504,9 +530,10 @@ func cephRBDSnapshotDelete(clusterName string, poolName string,
 // operations is similar to creating an empty RBD storage volume and rsyncing
 // the contents of the source RBD storage volume into it.
 func cephRBDVolumeCopy(clusterName string, oldVolumeName string,
-	newVolumeName string) error {
+	newVolumeName string, userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"cp",
 		oldVolumeName,
@@ -525,9 +552,11 @@ func cephRBDVolumeCopy(clusterName string, oldVolumeName string,
 // this will only return
 // <rbd-snapshot-name>
 func cephRBDVolumeListSnapshots(clusterName string, poolName string,
-	volumeName string, volumeType string) ([]string, error) {
+	volumeName string, volumeType string,
+	userName string) ([]string, error) {
 	msg, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--format", "json",
 		"--cluster", clusterName,
 		"--pool", poolName,
@@ -569,9 +598,10 @@ func cephRBDVolumeListSnapshots(clusterName string, poolName string,
 // cephRBDVolumeRestore restores an RBD storage volume to the state of one of
 // its snapshots
 func cephRBDVolumeRestore(clusterName string, poolName string, volumeName string,
-	volumeType string, snapshotName string) error {
+	volumeType string, snapshotName string, userName string) error {
 	_, err := shared.RunCommand(
 		"rbd",
+		"--id", userName,
 		"--cluster", clusterName,
 		"--pool", poolName,
 		"snap",
@@ -656,7 +686,8 @@ func (s *storageCeph) copyWithoutSnapshotsFull(target container,
 			sourceSnapshotOnlyName)
 	}
 
-	err := cephRBDVolumeCopy(s.ClusterName, oldVolumeName, newVolumeName)
+	err := cephRBDVolumeCopy(s.ClusterName, oldVolumeName, newVolumeName,
+		s.UserName)
 	if err != nil {
 		logger.Debugf(`Failed to create full RBD copy "%s" -> `+
 			`"%s": %s`, source.Name(), target.Name(), err)
@@ -664,7 +695,7 @@ func (s *storageCeph) copyWithoutSnapshotsFull(target container,
 	}
 
 	err = cephRBDVolumeMap(s.ClusterName, s.OSDPoolName, targetContainerName,
-		storagePoolVolumeTypeNameContainer)
+		storagePoolVolumeTypeNameContainer, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to map RBD storage volume for image `+
 			`"%s" on storage pool "%s": %s`, targetContainerName,
@@ -726,7 +757,7 @@ func (s *storageCeph) copyWithoutSnapshotsSparse(target container,
 		// create snapshot
 		err := cephRBDSnapshotCreate(s.ClusterName, s.OSDPoolName,
 			sourceContainerName, storagePoolVolumeTypeNameContainer,
-			snapshotName)
+			snapshotName, s.UserName)
 		if err != nil {
 			logger.Errorf(`Failed to create snapshot for RBD `+
 				`storage volume for image "%s" on storage `+
@@ -739,7 +770,7 @@ func (s *storageCeph) copyWithoutSnapshotsSparse(target container,
 	// protect volume so we can create clones of it
 	err := cephRBDSnapshotProtect(s.ClusterName, s.OSDPoolName,
 		sourceContainerOnlyName, storagePoolVolumeTypeNameContainer,
-		snapshotName)
+		snapshotName, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to protect snapshot for RBD storage `+
 			`volume for image "%s" on storage pool "%s": %s`,
@@ -750,7 +781,7 @@ func (s *storageCeph) copyWithoutSnapshotsSparse(target container,
 	err = cephRBDCloneCreate(s.ClusterName, s.OSDPoolName,
 		sourceContainerOnlyName, storagePoolVolumeTypeNameContainer,
 		snapshotName, s.OSDPoolName, targetContainerName,
-		storagePoolVolumeTypeNameContainer)
+		storagePoolVolumeTypeNameContainer, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to clone new RBD storage volume for `+
 			`container "%s": %s`, targetContainerName, err)
@@ -758,7 +789,7 @@ func (s *storageCeph) copyWithoutSnapshotsSparse(target container,
 	}
 
 	err = cephRBDVolumeMap(s.ClusterName, s.OSDPoolName, targetContainerName,
-		storagePoolVolumeTypeNameContainer)
+		storagePoolVolumeTypeNameContainer, s.UserName)
 	if err != nil {
 		logger.Errorf(`Failed to map RBD storage volume for image `+
 			`"%s" on storage pool "%s": %s`, targetContainerName,
@@ -807,6 +838,7 @@ func (s *storageCeph) copyWithSnapshots(sourceVolumeName string,
 
 	args := []string{
 		"export-diff",
+		"--id", s.UserName,
 		"--cluster", s.ClusterName,
 		sourceVolumeName,
 	}
@@ -821,6 +853,7 @@ func (s *storageCeph) copyWithSnapshots(sourceVolumeName string,
 	rbdSendCmd := exec.Command("rbd", args...)
 	rbdRecvCmd := exec.Command(
 		"rbd",
+		"--id", s.UserName,
 		"import-diff",
 		"--cluster", s.ClusterName,
 		"-",
@@ -869,11 +902,11 @@ func (s *storageCeph) copyWithSnapshots(sourceVolumeName string,
 //   entities that were kept around because of dependency relations but are not
 //   deletable.
 func cephContainerDelete(clusterName string, poolName string, volumeName string,
-	volumeType string) int {
+	volumeType string, userName string) int {
 	logEntry := fmt.Sprintf("%s/%s_%s", poolName, volumeType, volumeName)
 
 	snaps, err := cephRBDVolumeListSnapshots(clusterName, poolName,
-		volumeName, volumeType)
+		volumeName, volumeType, userName)
 	if err == nil {
 		var zombies int
 		for _, snap := range snaps {
@@ -881,7 +914,7 @@ func cephContainerDelete(clusterName string, poolName string, volumeName string,
 				volumeType, volumeName, snap)
 
 			ret := cephContainerSnapshotDelete(clusterName,
-				poolName, volumeName, volumeType, snap)
+				poolName, volumeName, volumeType, snap, userName)
 			if ret < 0 {
 				logger.Errorf(`Failed to delete RBD storage `+
 					`volume "%s"`, logEntry)
@@ -899,7 +932,7 @@ func cephContainerDelete(clusterName string, poolName string, volumeName string,
 		if zombies > 0 {
 			// unmap
 			err = cephRBDVolumeUnmap(clusterName, poolName,
-				volumeName, volumeType)
+				volumeName, volumeType, userName)
 			if err != nil {
 				logger.Errorf(`Failed to unmap RBD storage `+
 					`volume "%s": %s`, logEntry, err)
@@ -915,7 +948,7 @@ func cephContainerDelete(clusterName string, poolName string, volumeName string,
 			}
 
 			err := cephRBDVolumeMarkDeleted(clusterName, poolName,
-				volumeName, volumeType)
+				volumeName, volumeType, userName)
 			if err != nil {
 				logger.Errorf(`Failed to mark RBD storage `+
 					`volume "%s" as zombie: %s`, logEntry,
@@ -935,7 +968,7 @@ func cephContainerDelete(clusterName string, poolName string, volumeName string,
 		}
 
 		parent, err := cephRBDVolumeGetParent(clusterName, poolName,
-			volumeName, volumeType)
+			volumeName, volumeType, userName)
 		if err == nil {
 			logger.Debugf(`Detected "%s" as parent of RBD storage `+
 				`volume "%s"`, parent, logEntry)
@@ -955,7 +988,7 @@ func cephContainerDelete(clusterName string, poolName string, volumeName string,
 
 			// unmap
 			err = cephRBDVolumeUnmap(clusterName, poolName,
-				volumeName, volumeType)
+				volumeName, volumeType, userName)
 			if err != nil {
 				logger.Errorf(`Failed to unmap RBD storage `+
 					`volume "%s": %s`, logEntry, err)
@@ -965,7 +998,7 @@ func cephContainerDelete(clusterName string, poolName string, volumeName string,
 
 			// delete
 			err = cephRBDVolumeDelete(clusterName, poolName,
-				volumeName, volumeType)
+				volumeName, volumeType, userName)
 			if err != nil {
 				logger.Errorf(`Failed to delete RBD storage `+
 					`volume "%s": %s`, logEntry, err)
@@ -980,7 +1013,8 @@ func cephContainerDelete(clusterName string, poolName string, volumeName string,
 				strings.HasPrefix(parentSnapshotName, "zombie_") {
 				ret := cephContainerSnapshotDelete(clusterName,
 					poolName, parentVolumeName,
-					parentVolumeType, parentSnapshotName)
+					parentVolumeType, parentSnapshotName,
+					userName)
 				if ret < 0 {
 					logger.Errorf(`Failed to delete `+
 						`snapshot "%s" of RBD storage `+
@@ -1005,7 +1039,7 @@ func cephContainerDelete(clusterName string, poolName string, volumeName string,
 
 			// unmap
 			err = cephRBDVolumeUnmap(clusterName, poolName,
-				volumeName, volumeType)
+				volumeName, volumeType, userName)
 			if err != nil {
 				logger.Errorf(`Failed to unmap RBD storage `+
 					`volume "%s": %s`, logEntry, err)
@@ -1015,7 +1049,7 @@ func cephContainerDelete(clusterName string, poolName string, volumeName string,
 
 			// delete
 			err = cephRBDVolumeDelete(clusterName, poolName,
-				volumeName, volumeType)
+				volumeName, volumeType, userName)
 			if err != nil {
 				logger.Errorf(`Failed to delete RBD storage `+
 					`volume "%s": %s`, logEntry, err)
@@ -1048,17 +1082,14 @@ func cephContainerDelete(clusterName string, poolName string, volumeName string,
 //   entities that were kept around because of dependency relations but are not
 //   deletable.
 func cephContainerSnapshotDelete(clusterName string, poolName string,
-	volumeName string, volumeType string, snapshotName string) int {
+	volumeName string, volumeType string, snapshotName string,
+	userName string) int {
 	logImageEntry := fmt.Sprintf("%s/%s_%s", poolName, volumeType, volumeName)
 	logSnapshotEntry := fmt.Sprintf("%s/%s_%s@%s", poolName, volumeType,
 		volumeName, snapshotName)
 
-	clones, err := cephRBDSnapshotListClones(
-		clusterName,
-		poolName,
-		volumeName,
-		volumeType,
-		snapshotName)
+	clones, err := cephRBDSnapshotListClones(clusterName, poolName,
+		volumeName, volumeType, snapshotName, userName)
 	if err != nil {
 		if err != NoSuchObjectError {
 			logger.Errorf(`Failed to list clones of RBD `+
@@ -1072,7 +1103,7 @@ func cephContainerSnapshotDelete(clusterName string, poolName string,
 
 		// unprotect
 		err = cephRBDSnapshotUnprotect(clusterName, poolName, volumeName,
-			volumeType, snapshotName)
+			volumeType, snapshotName, userName)
 		if err != nil {
 			logger.Errorf(`Failed to unprotect RBD snapshot "%s" `+
 				`of RBD storage volume "%s": %s`,
@@ -1084,7 +1115,7 @@ func cephContainerSnapshotDelete(clusterName string, poolName string,
 
 		// unmap
 		err = cephRBDVolumeSnapshotUnmap(clusterName, poolName,
-			volumeName, volumeType, snapshotName)
+			volumeName, volumeType, snapshotName, userName)
 		if err != nil {
 			logger.Errorf(`Failed to unmap RBD snapshot "%s" of `+
 				`RBD storage volume "%s": %s`, logSnapshotEntry,
@@ -1096,7 +1127,7 @@ func cephContainerSnapshotDelete(clusterName string, poolName string,
 
 		// delete
 		err = cephRBDSnapshotDelete(clusterName, poolName, volumeName,
-			volumeType, snapshotName)
+			volumeType, snapshotName, userName)
 		if err != nil {
 			logger.Errorf(`Failed to delete RBD snapshot "%s" of `+
 				`RBD storage volume "%s": %s`, logSnapshotEntry,
@@ -1110,7 +1141,7 @@ func cephContainerSnapshotDelete(clusterName string, poolName string,
 		// we know that LXD is still using it.
 		if strings.HasPrefix(volumeType, "zombie_") {
 			ret := cephContainerDelete(clusterName, poolName,
-				volumeName, volumeType)
+				volumeName, volumeType, userName)
 			if ret < 0 {
 				logger.Errorf(`Failed to delete RBD storage volume "%s"`,
 					logImageEntry)
@@ -1147,7 +1178,7 @@ func cephContainerSnapshotDelete(clusterName string, poolName string,
 			}
 
 			ret := cephContainerDelete(clusterName, clonePool,
-				cloneName, cloneType)
+				cloneName, cloneType, userName)
 			if ret < 0 {
 				logger.Errorf(`Failed to delete clone "%s" `+
 					`of RBD snapshot "%s" of RBD `+
@@ -1167,7 +1198,7 @@ func cephContainerSnapshotDelete(clusterName string, poolName string,
 
 			// unprotect
 			err = cephRBDSnapshotUnprotect(clusterName, poolName,
-				volumeName, volumeType, snapshotName)
+				volumeName, volumeType, snapshotName, userName)
 			if err != nil {
 				logger.Errorf(`Failed to unprotect RBD `+
 					`snapshot "%s" of RBD storage volume `+
@@ -1181,7 +1212,7 @@ func cephContainerSnapshotDelete(clusterName string, poolName string,
 
 			// unmap
 			err = cephRBDVolumeSnapshotUnmap(clusterName, poolName,
-				volumeName, volumeType, snapshotName)
+				volumeName, volumeType, snapshotName, userName)
 			if err != nil {
 				logger.Errorf(`Failed to unmap RBD snapshot `+
 					`"%s" of RBD storage volume "%s": %s`,
@@ -1194,7 +1225,7 @@ func cephContainerSnapshotDelete(clusterName string, poolName string,
 
 			// delete
 			err = cephRBDSnapshotDelete(clusterName, poolName,
-				volumeName, volumeType, snapshotName)
+				volumeName, volumeType, snapshotName, userName)
 			if err != nil {
 				logger.Errorf(`Failed to delete RBD snapshot `+
 					`"%s" of RBD storage volume "%s": %s`,
@@ -1209,7 +1240,8 @@ func cephContainerSnapshotDelete(clusterName string, poolName string,
 			// is not we know that LXD is still using it.
 			if strings.HasPrefix(volumeType, "zombie_") {
 				ret := cephContainerDelete(clusterName,
-					poolName, volumeName, volumeType)
+					poolName, volumeName, volumeType,
+					userName)
 				if ret < 0 {
 					logger.Errorf(`Failed to delete RBD `+
 						`storage volume "%s"`,
@@ -1229,7 +1261,7 @@ func cephContainerSnapshotDelete(clusterName string, poolName string,
 			}
 
 			err := cephRBDVolumeSnapshotUnmap(clusterName, poolName,
-				volumeName, volumeType, snapshotName)
+				volumeName, volumeType, snapshotName, userName)
 			if err != nil {
 				logger.Errorf(`Failed to unmap RBD `+
 					`snapshot "%s" of RBD storage volume "%s": %s`,
@@ -1244,7 +1276,8 @@ func cephContainerSnapshotDelete(clusterName string, poolName string,
 			logSnapshotNewEntry := fmt.Sprintf("%s/%s_%s@%s",
 				poolName, volumeName, volumeType, newSnapshotName)
 			err = cephRBDVolumeSnapshotRename(clusterName, poolName,
-				volumeName, volumeType, snapshotName, newSnapshotName)
+				volumeName, volumeType, snapshotName,
+				newSnapshotName, userName)
 			if err != nil {
 				logger.Errorf(`Failed to rename RBD `+
 					`snapshot "%s" of RBD storage volume "%s" `+
diff --git a/lxd/storage_pools_config.go b/lxd/storage_pools_config.go
index 5bb4455c0..b3a3d2042 100644
--- a/lxd/storage_pools_config.go
+++ b/lxd/storage_pools_config.go
@@ -29,6 +29,7 @@ var storagePoolConfigKeys = map[string]func(value string) error{
 		return err
 	},
 	"ceph.rbd.clone_copy": shared.IsBool,
+	"ceph.user.name":      shared.IsAny,
 
 	// valid drivers: lvm
 	"lvm.thinpool_name": shared.IsAny,


More information about the lxc-devel mailing list