[lxc-devel] [lxd/master] [RFC/WIP] improve zfs locking

brauner on Github lxc-bot at linuxcontainers.org
Fri Mar 3 16:45:01 UTC 2017


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20170303/037c7c08/attachment.bin>
-------------- next part --------------
From c15c48df425b49760f71a0472fe8185db64f7271 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Fri, 3 Mar 2017 12:32:22 +0100
Subject: [PATCH 1/2] zfs: start to fade out methods

Replace them with more generic functions that can be called in other places as
well (e.g. patches.go, api_internal.go etc.).

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 lxd/api_internal.go |   5 +-
 lxd/storage.go      |   9 ++++
 lxd/storage_zfs.go  | 140 +++++++++++++++++++++++++++++++++++++---------------
 3 files changed, 113 insertions(+), 41 deletions(-)

diff --git a/lxd/api_internal.go b/lxd/api_internal.go
index bc9ca00..fff45b8 100644
--- a/lxd/api_internal.go
+++ b/lxd/api_internal.go
@@ -160,11 +160,12 @@ func internalImport(d *Daemon, r *http.Request) Response {
 
 		switch pool.Driver {
 		case "zfs":
-			err = zfsMount(poolName, containerSubString[1:])
+			dataset := fmt.Sprintf("%s/%s", poolName, containerSubString[1:])
+			err = zfsPoolVolumeMount(dataset)
 			if err != nil {
 				return InternalError(err)
 			}
-			defer zfsUmount(poolName, containerSubString[1:])
+			defer zfsPoolVolumeUmount(dataset)
 		case "lvm":
 			containerLvmName := containerNameToLVName(name)
 			containerLvmPath := getLvmDevPath(poolName, storagePoolVolumeApiEndpointContainers, containerLvmName)
diff --git a/lxd/storage.go b/lxd/storage.go
index 188037c..920ac18 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -32,6 +32,8 @@ var lxdStorageMapLock sync.Mutex
 
 // The following functions are used to construct simple operation codes that are
 // unique.
+//
+// Pool lock IDs.
 func getPoolMountLockID(poolName string) string {
 	return fmt.Sprintf("mount/pool/%s", poolName)
 }
@@ -40,10 +42,16 @@ func getPoolUmountLockID(poolName string) string {
 	return fmt.Sprintf("umount/pool/%s", poolName)
 }
 
+// Image lock IDs.
 func getImageCreateLockID(poolName string, fingerprint string) string {
 	return fmt.Sprintf("create/image/%s/%s", poolName, fingerprint)
 }
 
+func getImageDeleteLockID(poolName string, fingerprint string) string {
+	return fmt.Sprintf("delete/image/%s/%s", poolName, fingerprint)
+}
+
+// Container lock IDs.
 func getContainerMountLockID(poolName string, containerName string) string {
 	return fmt.Sprintf("mount/container/%s/%s", poolName, containerName)
 }
@@ -52,6 +60,7 @@ func getContainerUmountLockID(poolName string, containerName string) string {
 	return fmt.Sprintf("umount/container/%s/%s", poolName, containerName)
 }
 
+// Custom lock IDs.
 func getCustomMountLockID(poolName string, volumeName string) string {
 	return fmt.Sprintf("mount/custom/%s/%s", poolName, volumeName)
 }
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index a3f4187..8daaa66 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -490,8 +490,8 @@ func (s *storageZfs) ContainerUmount(name string, path string) (bool, error) {
 // Things we do have to care about
 func (s *storageZfs) ContainerStorageReady(name string) bool {
 	poolName := s.getOnDiskPoolName()
-	fs := fmt.Sprintf("%s/containers/%s", poolName, name)
-	return s.zfsFilesystemEntityExists(fs, false)
+	containerDataset := fmt.Sprintf("%s/containers/%s", poolName, name)
+	return zfsFilesystemEntityExists(containerDataset)
 }
 
 func (s *storageZfs) ContainerCreate(container container) error {
@@ -545,7 +545,9 @@ func (s *storageZfs) ContainerCreateFromImage(container container, fingerprint s
 	fs := fmt.Sprintf("containers/%s", containerName)
 	containerPoolVolumeMntPoint := getContainerMountPoint(s.pool.Name, containerName)
 
-	fsImage := fmt.Sprintf("images/%s", fingerprint)
+	imageNoPoolName := fmt.Sprintf("images/%s", fingerprint)
+	poolName := s.getOnDiskPoolName()
+	imageDataset := fmt.Sprintf("%s/images/%s", poolName, fingerprint)
 
 	imageStoragePoolLockID := getImageCreateLockID(s.pool.Name, fingerprint)
 	lxdStorageMapLock.Lock()
@@ -559,7 +561,7 @@ func (s *storageZfs) ContainerCreateFromImage(container container, fingerprint s
 		lxdStorageMapLock.Unlock()
 
 		var imgerr error
-		if !s.zfsFilesystemEntityExists(fsImage, true) {
+		if !zfsFilesystemEntityExists(imageDataset) {
 			imgerr = s.ImageCreate(fingerprint)
 		}
 
@@ -575,7 +577,7 @@ func (s *storageZfs) ContainerCreateFromImage(container container, fingerprint s
 		}
 	}
 
-	err := s.zfsPoolVolumeClone(fsImage, "readonly", fs, containerPoolVolumeMntPoint)
+	err := s.zfsPoolVolumeClone(imageNoPoolName, "readonly", fs, containerPoolVolumeMntPoint)
 	if err != nil {
 		return err
 	}
@@ -1249,18 +1251,13 @@ func (s *storageZfs) ContainerSnapshotCreateEmpty(snapshotContainer container) e
 	return nil
 }
 
-// - create temporary directory ${LXD_DIR}/images/lxd_images_
-// - create new zfs volume images/<fingerprint>
-// - mount the zfs volume on ${LXD_DIR}/images/lxd_images_
-// - unpack the downloaded image in ${LXD_DIR}/images/lxd_images_
-// - mark new zfs volume images/<fingerprint> readonly
-// - remove mountpoint property from zfs volume images/<fingerprint>
-// - create read-write snapshot from zfs volume images/<fingerprint>
 func (s *storageZfs) ImageCreate(fingerprint string) error {
 	shared.LogDebugf("Creating ZFS storage volume for image \"%s\" on storage pool \"%s\".", fingerprint, s.pool.Name)
 
-	imageMntPoint := getImageMountPoint(s.pool.Name, fingerprint)
-	fs := fmt.Sprintf("images/%s", fingerprint)
+	poolName := s.getOnDiskPoolName()
+	imageBaseDataset := fmt.Sprintf("images/%s", fingerprint)
+	imageDataset := fmt.Sprintf("%s/%s", poolName, imageBaseDataset)
+
 	revert := true
 	subrevert := true
 
@@ -1275,8 +1272,9 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
 		s.deleteImageDbPoolVolume(fingerprint)
 	}()
 
-	if s.zfsFilesystemEntityExists(fmt.Sprintf("deleted/%s", fs), true) {
-		err := s.zfsPoolVolumeRename(fmt.Sprintf("deleted/%s", fs), fs)
+	imageDeletedDataset := fmt.Sprintf("%s/deleted/%s", poolName, imageBaseDataset)
+	if zfsFilesystemEntityExists(imageDeletedDataset) {
+		err := zfsPoolVolumeRename(imageDeletedDataset, imageDataset)
 		if err != nil {
 			return err
 		}
@@ -1290,16 +1288,16 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
 
 		// In case this is an image from an older lxd instance, wipe the
 		// mountpoint.
-		err = s.zfsPoolVolumeSet(fs, "mountpoint", "none")
+		err = zfsPoolVolumeSet(imageDataset, "mountpoint", "none")
 		if err != nil {
 			return err
 		}
 
 		revert = false
-
 		return nil
 	}
 
+	imageMntPoint := getImageMountPoint(s.pool.Name, fingerprint)
 	if !shared.PathExists(imageMntPoint) {
 		err := os.MkdirAll(imageMntPoint, 0700)
 		if err != nil {
@@ -1324,7 +1322,7 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
 	imagePath := shared.VarPath("images", fingerprint)
 
 	// Create a new storage volume on the storage pool for the image.
-	err = s.zfsPoolVolumeCreate(fs)
+	err = zfsPoolVolumeCreate(imageDataset)
 	if err != nil {
 		return err
 	}
@@ -1337,14 +1335,14 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
 	}()
 
 	// Set a temporary mountpoint for the image.
-	err = s.zfsPoolVolumeSet(fs, "mountpoint", tmpImageDir)
+	err = zfsPoolVolumeSet(imageDataset, "mountpoint", tmpImageDir)
 	if err != nil {
 		return err
 	}
 
 	// Make sure that the image actually got mounted.
 	if !shared.IsMountPoint(tmpImageDir) {
-		s.zfsPoolVolumeMount(fs)
+		zfsPoolVolumeMount(imageDataset)
 	}
 
 	// Unpack the image into the temporary mountpoint.
@@ -1354,25 +1352,25 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
 	}
 
 	// Mark the new storage volume for the image as readonly.
-	err = s.zfsPoolVolumeSet(fs, "readonly", "on")
+	err = zfsPoolVolumeSet(imageDataset, "readonly", "on")
 	if err != nil {
 		return err
 	}
 
 	// Remove the temporary mountpoint from the image storage volume.
-	err = s.zfsPoolVolumeSet(fs, "mountpoint", "none")
+	err = zfsPoolVolumeSet(imageDataset, "mountpoint", "none")
 	if err != nil {
 		return err
 	}
 
 	// Make sure that the image actually got unmounted.
 	if shared.IsMountPoint(tmpImageDir) {
-		s.zfsPoolVolumeUmount(fs)
+		zfsPoolVolumeUmount(imageDataset)
 	}
 
 	// Create a snapshot of that image on the storage pool which we clone for
 	// container creation.
-	err = s.zfsPoolVolumeSnapshotCreate(fs, "readonly")
+	err = zfsPoolVolumeSnapshotCreate(imageDataset, "readonly")
 	if err != nil {
 		return err
 	}
@@ -1659,6 +1657,16 @@ func (s *storageZfs) zfsPoolVolumeClone(source string, name string, dest string,
 	return nil
 }
 
+func zfsPoolVolumeCreate(dataset string) error {
+	msg, err := exec.Command("zfs", "create", "-p", dataset).CombinedOutput()
+	if err != nil {
+		shared.LogErrorf("Failed to create ZFS dataset: %s.", string(msg))
+		return err
+	}
+
+	return nil
+}
+
 func (s *storageZfs) zfsPoolVolumeCreate(path string) error {
 	poolName := s.getOnDiskPoolName()
 	output, err := exec.Command(
@@ -1833,6 +1841,36 @@ func (s *storageZfs) zfsFilesystemEntityPropertyGet(path string, key string, pre
 	return strings.TrimRight(string(output), "\n"), nil
 }
 
+func zfsPoolVolumeRename(sourceDataset string, destDataset string) error {
+	var err error
+	var output []byte
+
+	for i := 0; i < 20; i++ {
+		output, err = exec.Command(
+			"zfs",
+			"rename",
+			"-p",
+			sourceDataset,
+			destDataset).CombinedOutput()
+
+		// Success
+		if err == nil {
+			return nil
+		}
+
+		// zfs rename can fail because of descendants, yet still manage the rename
+		if !zfsFilesystemEntityExists(sourceDataset) && zfsFilesystemEntityExists(destDataset) {
+			return nil
+		}
+
+		time.Sleep(500 * time.Millisecond)
+	}
+
+	// Timeout
+	shared.LogErrorf("zfs rename failed: %s.", string(output))
+	return err
+}
+
 func (s *storageZfs) zfsPoolVolumeRename(source string, dest string) error {
 	var err error
 	var output []byte
@@ -1864,6 +1902,20 @@ func (s *storageZfs) zfsPoolVolumeRename(source string, dest string) error {
 	return fmt.Errorf("Failed to rename ZFS filesystem: %s", output)
 }
 
+func zfsPoolVolumeSet(dataset string, key string, value string) error {
+	output, err := exec.Command(
+		"zfs",
+		"set",
+		fmt.Sprintf("%s=%s", key, value),
+		dataset).CombinedOutput()
+	if err != nil {
+		shared.LogErrorf("Setting ZFS property failed: %s.", string(output))
+		return err
+	}
+
+	return nil
+}
+
 func (s *storageZfs) zfsPoolVolumeSet(path string, key string, value string) error {
 	poolName := s.getOnDiskPoolName()
 	output, err := exec.Command(
@@ -1879,6 +1931,20 @@ func (s *storageZfs) zfsPoolVolumeSet(path string, key string, value string) err
 	return nil
 }
 
+func zfsPoolVolumeSnapshotCreate(dataset string, name string) error {
+	msg, err := exec.Command(
+		"zfs",
+		"snapshot",
+		"-r",
+		fmt.Sprintf("%s@%s", dataset, name)).CombinedOutput()
+	if err != nil {
+		shared.LogErrorf("zfs snapshot failed: %s.", string(msg))
+		return err
+	}
+
+	return nil
+}
+
 func (s *storageZfs) zfsPoolVolumeSnapshotCreate(path string, name string) error {
 	poolName := s.getOnDiskPoolName()
 	output, err := exec.Command(
@@ -1964,36 +2030,32 @@ func (s *storageZfs) zfsPoolVolumeSnapshotRename(path string, oldName string, ne
 	return nil
 }
 
-func zfsMount(poolName string, path string) error {
-	output, err := tryExec(
-		"zfs",
-		"mount",
-		fmt.Sprintf("%s/%s", poolName, path))
+func zfsPoolVolumeMount(dataset string) error {
+	output, err := tryExec("zfs", "mount", dataset)
 	if err != nil {
-		return fmt.Errorf("Failed to mount ZFS filesystem: %s", output)
+		return fmt.Errorf("Failed to mount ZFS filesystem: %s.", string(output))
 	}
 
 	return nil
 }
 
 func (s *storageZfs) zfsPoolVolumeMount(path string) error {
-	return zfsMount(s.getOnDiskPoolName(), path)
+	dataset := fmt.Sprintf("%s/%s", s.getOnDiskPoolName(), path)
+	return zfsPoolVolumeMount(dataset)
 }
 
-func zfsUmount(poolName string, path string) error {
-	output, err := tryExec(
-		"zfs",
-		"unmount",
-		fmt.Sprintf("%s/%s", poolName, path))
+func zfsPoolVolumeUmount(dataset string) error {
+	msg, err := tryExec("zfs", "unmount", dataset)
 	if err != nil {
-		return fmt.Errorf("Failed to unmount ZFS filesystem: %s", output)
+		return fmt.Errorf("Failed to unmount ZFS filesystem: %s.", string(msg))
 	}
 
 	return nil
 }
 
 func (s *storageZfs) zfsPoolVolumeUmount(path string) error {
-	return zfsUmount(s.getOnDiskPoolName(), path)
+	dataset := fmt.Sprintf("%s/%s", s.getOnDiskPoolName(), path)
+	return zfsPoolVolumeUmount(dataset)
 }
 
 func (s *storageZfs) zfsPoolListSubvolumes(path string) ([]string, error) {

From c32f66ab1956dc70cf2e64f699d1fd9a3e594320 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Fri, 3 Mar 2017 17:43:17 +0100
Subject: [PATCH 2/2] zfs: improve locking around Image{Create,Delete}

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 lxd/storage.go     |  14 +++++
 lxd/storage_zfs.go | 153 +++++++++++++++++++++++++++++++++++++++++++----------
 2 files changed, 139 insertions(+), 28 deletions(-)

diff --git a/lxd/storage.go b/lxd/storage.go
index 920ac18..9ae5426 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -27,6 +27,16 @@ import (
 // Note that any access to this map must be done while holding a lock.
 var lxdStorageOngoingOperationMap = map[string]chan bool{}
 
+type containerCreationMap struct {
+	refcount map[string]int
+	wait     map[string]chan bool
+}
+
+var lxdContainerCreationMap = containerCreationMap{
+	refcount: make(map[string]int),
+	wait:     make(map[string]chan bool),
+}
+
 // lxdStorageMapLock is used to access lxdStorageOngoingOperationMap.
 var lxdStorageMapLock sync.Mutex
 
@@ -60,6 +70,10 @@ func getContainerUmountLockID(poolName string, containerName string) string {
 	return fmt.Sprintf("umount/container/%s/%s", poolName, containerName)
 }
 
+func getContainerCreateFromImageLockID(poolName string, fingerprint string) string {
+	return fmt.Sprintf("create/container/%s/%s", poolName, fingerprint)
+}
+
 // Custom lock IDs.
 func getCustomMountLockID(poolName string, volumeName string) string {
 	return fmt.Sprintf("mount/custom/%s/%s", poolName, volumeName)
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index 8daaa66..fdade56 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -544,40 +544,41 @@ func (s *storageZfs) ContainerCreateFromImage(container container, fingerprint s
 	containerName := container.Name()
 	fs := fmt.Sprintf("containers/%s", containerName)
 	containerPoolVolumeMntPoint := getContainerMountPoint(s.pool.Name, containerName)
-
 	imageNoPoolName := fmt.Sprintf("images/%s", fingerprint)
-	poolName := s.getOnDiskPoolName()
-	imageDataset := fmt.Sprintf("%s/images/%s", poolName, fingerprint)
 
-	imageStoragePoolLockID := getImageCreateLockID(s.pool.Name, fingerprint)
+	// Add the container creation to the map or increase the refcount.
+	containerCreateFromImagePoolLockID := getContainerCreateFromImageLockID(s.pool.Name, fingerprint)
 	lxdStorageMapLock.Lock()
-	if waitChannel, ok := lxdStorageOngoingOperationMap[imageStoragePoolLockID]; ok {
-		lxdStorageMapLock.Unlock()
-		if _, ok := <-waitChannel; ok {
-			shared.LogWarnf("Received value over semaphore. This should not have happened.")
-		}
-	} else {
-		lxdStorageOngoingOperationMap[imageStoragePoolLockID] = make(chan bool)
-		lxdStorageMapLock.Unlock()
-
-		var imgerr error
-		if !zfsFilesystemEntityExists(imageDataset) {
-			imgerr = s.ImageCreate(fingerprint)
-		}
+	_, ok := lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID]
+	if !ok {
+		lxdContainerCreationMap.wait[containerCreateFromImagePoolLockID] = make(chan bool)
+	}
+	lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID]++
+	lxdStorageMapLock.Unlock()
 
+	defer func() {
 		lxdStorageMapLock.Lock()
-		if waitChannel, ok := lxdStorageOngoingOperationMap[imageStoragePoolLockID]; ok {
-			close(waitChannel)
-			delete(lxdStorageOngoingOperationMap, imageStoragePoolLockID)
+		_, ok := lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID]
+		if ok {
+			lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID]--
+			if lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID] == 0 {
+				// Close wait channel.
+				close(lxdContainerCreationMap.wait[containerCreateFromImagePoolLockID])
+				// Delete refcount.
+				delete(lxdContainerCreationMap.refcount, containerCreateFromImagePoolLockID)
+				// Delete wait channel.
+				delete(lxdContainerCreationMap.wait, containerCreateFromImagePoolLockID)
+			}
 		}
 		lxdStorageMapLock.Unlock()
+	}()
 
-		if imgerr != nil {
-			return imgerr
-		}
+	err := s.ImageCreate(fingerprint)
+	if err != nil {
+		return err
 	}
 
-	err := s.zfsPoolVolumeClone(imageNoPoolName, "readonly", fs, containerPoolVolumeMntPoint)
+	err = s.zfsPoolVolumeClone(imageNoPoolName, "readonly", fs, containerPoolVolumeMntPoint)
 	if err != nil {
 		return err
 	}
@@ -1258,10 +1259,50 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
 	imageBaseDataset := fmt.Sprintf("images/%s", fingerprint)
 	imageDataset := fmt.Sprintf("%s/%s", poolName, imageBaseDataset)
 
-	revert := true
-	subrevert := true
+	// Check for any ongoing ImageCreate() operation.
+	imageCreatePoolLockID := getImageCreateLockID(s.pool.Name, fingerprint)
+	lxdStorageMapLock.Lock()
+	waitChannel, ok := lxdStorageOngoingOperationMap[imageCreatePoolLockID]
+	if ok {
+		lxdStorageMapLock.Unlock()
+		if _, ok := <-waitChannel; ok {
+			shared.LogWarnf("Received value over semaphore. This should not have happened.")
+		}
+		return nil
+	}
+	lxdStorageOngoingOperationMap[imageCreatePoolLockID] = make(chan bool)
+	lxdStorageMapLock.Unlock()
+
+	// Wait for any operation that might conflict with us.
+	imageDeletePoolLockID := getImageDeleteLockID(s.pool.Name, fingerprint)
+	lxdStorageMapLock.Lock()
+	waitChannel, ok = lxdStorageOngoingOperationMap[imageDeletePoolLockID]
+	lxdStorageMapLock.Unlock()
+	if ok {
+		// Wait for the ImageDelete() operation to finish.
+		if _, ok := <-waitChannel; ok {
+			shared.LogWarnf("Received value over semaphore. This should not have happened.")
+		}
+	}
+
+	defer func() {
+		lxdStorageMapLock.Lock()
+		waitChannel, ok := lxdStorageOngoingOperationMap[imageCreatePoolLockID]
+		if ok {
+			close(waitChannel)
+			delete(lxdStorageOngoingOperationMap, imageCreatePoolLockID)
+		}
+		lxdStorageMapLock.Unlock()
+	}()
+
+	// Check if image already exists.
+	_, err := dbStoragePoolVolumeGetTypeID(s.d.db, fingerprint, storagePoolVolumeTypeImage, s.poolID)
+	if err == nil {
+		return nil
+	}
 
-	err := s.createImageDbPoolVolume(fingerprint)
+	subrevert := true
+	err = s.createImageDbPoolVolume(fingerprint)
 	if err != nil {
 		return err
 	}
@@ -1272,6 +1313,7 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
 		s.deleteImageDbPoolVolume(fingerprint)
 	}()
 
+	revert := true
 	imageDeletedDataset := fmt.Sprintf("%s/deleted/%s", poolName, imageBaseDataset)
 	if zfsFilesystemEntityExists(imageDeletedDataset) {
 		err := zfsPoolVolumeRename(imageDeletedDataset, imageDataset)
@@ -1384,6 +1426,61 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
 func (s *storageZfs) ImageDelete(fingerprint string) error {
 	shared.LogDebugf("Deleting ZFS storage volume for image \"%s\" on storage pool \"%s\".", fingerprint, s.pool.Name)
 
+	// Take a lock to check whether there is already an ImageDelete()
+	// progress.
+	imageDeletePoolLockID := getImageDeleteLockID(s.pool.Name, fingerprint)
+	lxdStorageMapLock.Lock()
+	waitChannel, ok := lxdStorageOngoingOperationMap[imageDeletePoolLockID]
+	if ok {
+		lxdStorageMapLock.Unlock()
+		if _, ok := <-waitChannel; ok {
+			shared.LogWarnf("Received value over semaphore. This should not have happened.")
+		}
+		return nil
+	}
+	lxdStorageOngoingOperationMap[imageDeletePoolLockID] = make(chan bool)
+	lxdStorageMapLock.Unlock()
+
+	// Wait for any already ongoing operation that might conflict with us.
+
+	// Check whether an image is created atm.
+	imageCreatePoolLockID := getImageCreateLockID(s.pool.Name, fingerprint)
+	lxdStorageMapLock.Lock()
+	waitChannel, ok = lxdStorageOngoingOperationMap[imageCreatePoolLockID]
+	lxdStorageMapLock.Unlock()
+	if ok {
+		if _, ok := <-waitChannel; ok {
+			shared.LogWarnf("Received value over semaphore. This should not have happened.")
+		}
+	}
+
+	// Check whether a container is created from that image.
+	containerCreateFromImagePoolLockID := getContainerCreateFromImageLockID(s.pool.Name, fingerprint)
+	lxdStorageMapLock.Lock()
+	waitChannel, ok = lxdContainerCreationMap.wait[containerCreateFromImagePoolLockID]
+	lxdStorageMapLock.Unlock()
+	if ok {
+		if _, ok := <-waitChannel; ok {
+			shared.LogWarnf("Received value over semaphore. This should not have happened.")
+		}
+	}
+
+	defer func() {
+		lxdStorageMapLock.Lock()
+		waitChannel, ok := lxdStorageOngoingOperationMap[imageDeletePoolLockID]
+		if ok {
+			close(waitChannel)
+			delete(lxdStorageOngoingOperationMap, imageDeletePoolLockID)
+		}
+		lxdStorageMapLock.Unlock()
+	}()
+
+	// Check if image already is already deleted.
+	_, err := dbStoragePoolVolumeGetTypeID(s.d.db, fingerprint, storagePoolVolumeTypeImage, s.poolID)
+	if err == NoSuchObjectError {
+		return nil
+	}
+
 	fs := fmt.Sprintf("images/%s", fingerprint)
 
 	if s.zfsFilesystemEntityExists(fs, true) {
@@ -1410,7 +1507,7 @@ func (s *storageZfs) ImageDelete(fingerprint string) error {
 		}
 	}
 
-	err := s.deleteImageDbPoolVolume(fingerprint)
+	err = s.deleteImageDbPoolVolume(fingerprint)
 	if err != nil {
 		return err
 	}


More information about the lxc-devel mailing list