[lxc-devel] [lxd/master] [RFC/WIP] improve zfs locking
brauner on Github
lxc-bot at linuxcontainers.org
Fri Mar 3 16:45:01 UTC 2017
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20170303/037c7c08/attachment.bin>
-------------- next part --------------
From c15c48df425b49760f71a0472fe8185db64f7271 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Fri, 3 Mar 2017 12:32:22 +0100
Subject: [PATCH 1/2] zfs: start to fade out methods
Replace them with more generic functions that can be called in other places as
well (e.g. patches.go, api_internal.go etc.).
Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
lxd/api_internal.go | 5 +-
lxd/storage.go | 9 ++++
lxd/storage_zfs.go | 140 +++++++++++++++++++++++++++++++++++++---------------
3 files changed, 113 insertions(+), 41 deletions(-)
diff --git a/lxd/api_internal.go b/lxd/api_internal.go
index bc9ca00..fff45b8 100644
--- a/lxd/api_internal.go
+++ b/lxd/api_internal.go
@@ -160,11 +160,12 @@ func internalImport(d *Daemon, r *http.Request) Response {
switch pool.Driver {
case "zfs":
- err = zfsMount(poolName, containerSubString[1:])
+ dataset := fmt.Sprintf("%s/%s", poolName, containerSubString[1:])
+ err = zfsPoolVolumeMount(dataset)
if err != nil {
return InternalError(err)
}
- defer zfsUmount(poolName, containerSubString[1:])
+ defer zfsPoolVolumeUmount(dataset)
case "lvm":
containerLvmName := containerNameToLVName(name)
containerLvmPath := getLvmDevPath(poolName, storagePoolVolumeApiEndpointContainers, containerLvmName)
diff --git a/lxd/storage.go b/lxd/storage.go
index 188037c..920ac18 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -32,6 +32,8 @@ var lxdStorageMapLock sync.Mutex
// The following functions are used to construct simple operation codes that are
// unique.
+//
+// Pool lock IDs.
func getPoolMountLockID(poolName string) string {
return fmt.Sprintf("mount/pool/%s", poolName)
}
@@ -40,10 +42,16 @@ func getPoolUmountLockID(poolName string) string {
return fmt.Sprintf("umount/pool/%s", poolName)
}
+// Image lock IDs.
func getImageCreateLockID(poolName string, fingerprint string) string {
return fmt.Sprintf("create/image/%s/%s", poolName, fingerprint)
}
+func getImageDeleteLockID(poolName string, fingerprint string) string {
+ return fmt.Sprintf("delete/image/%s/%s", poolName, fingerprint)
+}
+
+// Container lock IDs.
func getContainerMountLockID(poolName string, containerName string) string {
return fmt.Sprintf("mount/container/%s/%s", poolName, containerName)
}
@@ -52,6 +60,7 @@ func getContainerUmountLockID(poolName string, containerName string) string {
return fmt.Sprintf("umount/container/%s/%s", poolName, containerName)
}
+// Custom lock IDs.
func getCustomMountLockID(poolName string, volumeName string) string {
return fmt.Sprintf("mount/custom/%s/%s", poolName, volumeName)
}
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index a3f4187..8daaa66 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -490,8 +490,8 @@ func (s *storageZfs) ContainerUmount(name string, path string) (bool, error) {
// Things we do have to care about
func (s *storageZfs) ContainerStorageReady(name string) bool {
poolName := s.getOnDiskPoolName()
- fs := fmt.Sprintf("%s/containers/%s", poolName, name)
- return s.zfsFilesystemEntityExists(fs, false)
+ containerDataset := fmt.Sprintf("%s/containers/%s", poolName, name)
+ return zfsFilesystemEntityExists(containerDataset)
}
func (s *storageZfs) ContainerCreate(container container) error {
@@ -545,7 +545,9 @@ func (s *storageZfs) ContainerCreateFromImage(container container, fingerprint s
fs := fmt.Sprintf("containers/%s", containerName)
containerPoolVolumeMntPoint := getContainerMountPoint(s.pool.Name, containerName)
- fsImage := fmt.Sprintf("images/%s", fingerprint)
+ imageNoPoolName := fmt.Sprintf("images/%s", fingerprint)
+ poolName := s.getOnDiskPoolName()
+ imageDataset := fmt.Sprintf("%s/images/%s", poolName, fingerprint)
imageStoragePoolLockID := getImageCreateLockID(s.pool.Name, fingerprint)
lxdStorageMapLock.Lock()
@@ -559,7 +561,7 @@ func (s *storageZfs) ContainerCreateFromImage(container container, fingerprint s
lxdStorageMapLock.Unlock()
var imgerr error
- if !s.zfsFilesystemEntityExists(fsImage, true) {
+ if !zfsFilesystemEntityExists(imageDataset) {
imgerr = s.ImageCreate(fingerprint)
}
@@ -575,7 +577,7 @@ func (s *storageZfs) ContainerCreateFromImage(container container, fingerprint s
}
}
- err := s.zfsPoolVolumeClone(fsImage, "readonly", fs, containerPoolVolumeMntPoint)
+ err := s.zfsPoolVolumeClone(imageNoPoolName, "readonly", fs, containerPoolVolumeMntPoint)
if err != nil {
return err
}
@@ -1249,18 +1251,13 @@ func (s *storageZfs) ContainerSnapshotCreateEmpty(snapshotContainer container) e
return nil
}
-// - create temporary directory ${LXD_DIR}/images/lxd_images_
-// - create new zfs volume images/<fingerprint>
-// - mount the zfs volume on ${LXD_DIR}/images/lxd_images_
-// - unpack the downloaded image in ${LXD_DIR}/images/lxd_images_
-// - mark new zfs volume images/<fingerprint> readonly
-// - remove mountpoint property from zfs volume images/<fingerprint>
-// - create read-write snapshot from zfs volume images/<fingerprint>
func (s *storageZfs) ImageCreate(fingerprint string) error {
shared.LogDebugf("Creating ZFS storage volume for image \"%s\" on storage pool \"%s\".", fingerprint, s.pool.Name)
- imageMntPoint := getImageMountPoint(s.pool.Name, fingerprint)
- fs := fmt.Sprintf("images/%s", fingerprint)
+ poolName := s.getOnDiskPoolName()
+ imageBaseDataset := fmt.Sprintf("images/%s", fingerprint)
+ imageDataset := fmt.Sprintf("%s/%s", poolName, imageBaseDataset)
+
revert := true
subrevert := true
@@ -1275,8 +1272,9 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
s.deleteImageDbPoolVolume(fingerprint)
}()
- if s.zfsFilesystemEntityExists(fmt.Sprintf("deleted/%s", fs), true) {
- err := s.zfsPoolVolumeRename(fmt.Sprintf("deleted/%s", fs), fs)
+ imageDeletedDataset := fmt.Sprintf("%s/deleted/%s", poolName, imageBaseDataset)
+ if zfsFilesystemEntityExists(imageDeletedDataset) {
+ err := zfsPoolVolumeRename(imageDeletedDataset, imageDataset)
if err != nil {
return err
}
@@ -1290,16 +1288,16 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
// In case this is an image from an older lxd instance, wipe the
// mountpoint.
- err = s.zfsPoolVolumeSet(fs, "mountpoint", "none")
+ err = zfsPoolVolumeSet(imageDataset, "mountpoint", "none")
if err != nil {
return err
}
revert = false
-
return nil
}
+ imageMntPoint := getImageMountPoint(s.pool.Name, fingerprint)
if !shared.PathExists(imageMntPoint) {
err := os.MkdirAll(imageMntPoint, 0700)
if err != nil {
@@ -1324,7 +1322,7 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
imagePath := shared.VarPath("images", fingerprint)
// Create a new storage volume on the storage pool for the image.
- err = s.zfsPoolVolumeCreate(fs)
+ err = zfsPoolVolumeCreate(imageDataset)
if err != nil {
return err
}
@@ -1337,14 +1335,14 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
}()
// Set a temporary mountpoint for the image.
- err = s.zfsPoolVolumeSet(fs, "mountpoint", tmpImageDir)
+ err = zfsPoolVolumeSet(imageDataset, "mountpoint", tmpImageDir)
if err != nil {
return err
}
// Make sure that the image actually got mounted.
if !shared.IsMountPoint(tmpImageDir) {
- s.zfsPoolVolumeMount(fs)
+ zfsPoolVolumeMount(imageDataset)
}
// Unpack the image into the temporary mountpoint.
@@ -1354,25 +1352,25 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
}
// Mark the new storage volume for the image as readonly.
- err = s.zfsPoolVolumeSet(fs, "readonly", "on")
+ err = zfsPoolVolumeSet(imageDataset, "readonly", "on")
if err != nil {
return err
}
// Remove the temporary mountpoint from the image storage volume.
- err = s.zfsPoolVolumeSet(fs, "mountpoint", "none")
+ err = zfsPoolVolumeSet(imageDataset, "mountpoint", "none")
if err != nil {
return err
}
// Make sure that the image actually got unmounted.
if shared.IsMountPoint(tmpImageDir) {
- s.zfsPoolVolumeUmount(fs)
+ zfsPoolVolumeUmount(imageDataset)
}
// Create a snapshot of that image on the storage pool which we clone for
// container creation.
- err = s.zfsPoolVolumeSnapshotCreate(fs, "readonly")
+ err = zfsPoolVolumeSnapshotCreate(imageDataset, "readonly")
if err != nil {
return err
}
@@ -1659,6 +1657,16 @@ func (s *storageZfs) zfsPoolVolumeClone(source string, name string, dest string,
return nil
}
+func zfsPoolVolumeCreate(dataset string) error {
+ msg, err := exec.Command("zfs", "create", "-p", dataset).CombinedOutput()
+ if err != nil {
+ shared.LogErrorf("Failed to create ZFS dataset: %s.", string(msg))
+ return err
+ }
+
+ return nil
+}
+
func (s *storageZfs) zfsPoolVolumeCreate(path string) error {
poolName := s.getOnDiskPoolName()
output, err := exec.Command(
@@ -1833,6 +1841,36 @@ func (s *storageZfs) zfsFilesystemEntityPropertyGet(path string, key string, pre
return strings.TrimRight(string(output), "\n"), nil
}
+func zfsPoolVolumeRename(sourceDataset string, destDataset string) error {
+ var err error
+ var output []byte
+
+ for i := 0; i < 20; i++ {
+ output, err = exec.Command(
+ "zfs",
+ "rename",
+ "-p",
+ sourceDataset,
+ destDataset).CombinedOutput()
+
+ // Success
+ if err == nil {
+ return nil
+ }
+
+ // zfs rename can fail because of descendants, yet still manage the rename
+ if !zfsFilesystemEntityExists(sourceDataset) && zfsFilesystemEntityExists(destDataset) {
+ return nil
+ }
+
+ time.Sleep(500 * time.Millisecond)
+ }
+
+ // Timeout
+ shared.LogErrorf("zfs rename failed: %s.", string(output))
+ return err
+}
+
func (s *storageZfs) zfsPoolVolumeRename(source string, dest string) error {
var err error
var output []byte
@@ -1864,6 +1902,20 @@ func (s *storageZfs) zfsPoolVolumeRename(source string, dest string) error {
return fmt.Errorf("Failed to rename ZFS filesystem: %s", output)
}
+func zfsPoolVolumeSet(dataset string, key string, value string) error {
+ output, err := exec.Command(
+ "zfs",
+ "set",
+ fmt.Sprintf("%s=%s", key, value),
+ dataset).CombinedOutput()
+ if err != nil {
+ shared.LogErrorf("Setting ZFS property failed: %s.", string(output))
+ return err
+ }
+
+ return nil
+}
+
func (s *storageZfs) zfsPoolVolumeSet(path string, key string, value string) error {
poolName := s.getOnDiskPoolName()
output, err := exec.Command(
@@ -1879,6 +1931,20 @@ func (s *storageZfs) zfsPoolVolumeSet(path string, key string, value string) err
return nil
}
+func zfsPoolVolumeSnapshotCreate(dataset string, name string) error {
+ msg, err := exec.Command(
+ "zfs",
+ "snapshot",
+ "-r",
+ fmt.Sprintf("%s@%s", dataset, name)).CombinedOutput()
+ if err != nil {
+ shared.LogErrorf("zfs snapshot failed: %s.", string(msg))
+ return err
+ }
+
+ return nil
+}
+
func (s *storageZfs) zfsPoolVolumeSnapshotCreate(path string, name string) error {
poolName := s.getOnDiskPoolName()
output, err := exec.Command(
@@ -1964,36 +2030,32 @@ func (s *storageZfs) zfsPoolVolumeSnapshotRename(path string, oldName string, ne
return nil
}
-func zfsMount(poolName string, path string) error {
- output, err := tryExec(
- "zfs",
- "mount",
- fmt.Sprintf("%s/%s", poolName, path))
+func zfsPoolVolumeMount(dataset string) error {
+ output, err := tryExec("zfs", "mount", dataset)
if err != nil {
- return fmt.Errorf("Failed to mount ZFS filesystem: %s", output)
+ return fmt.Errorf("Failed to mount ZFS filesystem: %s.", string(output))
}
return nil
}
func (s *storageZfs) zfsPoolVolumeMount(path string) error {
- return zfsMount(s.getOnDiskPoolName(), path)
+ dataset := fmt.Sprintf("%s/%s", s.getOnDiskPoolName(), path)
+ return zfsPoolVolumeMount(dataset)
}
-func zfsUmount(poolName string, path string) error {
- output, err := tryExec(
- "zfs",
- "unmount",
- fmt.Sprintf("%s/%s", poolName, path))
+func zfsPoolVolumeUmount(dataset string) error {
+ msg, err := tryExec("zfs", "unmount", dataset)
if err != nil {
- return fmt.Errorf("Failed to unmount ZFS filesystem: %s", output)
+ return fmt.Errorf("Failed to unmount ZFS filesystem: %s.", string(msg))
}
return nil
}
func (s *storageZfs) zfsPoolVolumeUmount(path string) error {
- return zfsUmount(s.getOnDiskPoolName(), path)
+ dataset := fmt.Sprintf("%s/%s", s.getOnDiskPoolName(), path)
+ return zfsPoolVolumeUmount(dataset)
}
func (s *storageZfs) zfsPoolListSubvolumes(path string) ([]string, error) {
From c32f66ab1956dc70cf2e64f699d1fd9a3e594320 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Fri, 3 Mar 2017 17:43:17 +0100
Subject: [PATCH 2/2] zfs: improve locking around Image{Create,Delete}
Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
lxd/storage.go | 14 +++++
lxd/storage_zfs.go | 153 +++++++++++++++++++++++++++++++++++++++++++----------
2 files changed, 139 insertions(+), 28 deletions(-)
diff --git a/lxd/storage.go b/lxd/storage.go
index 920ac18..9ae5426 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -27,6 +27,16 @@ import (
// Note that any access to this map must be done while holding a lock.
var lxdStorageOngoingOperationMap = map[string]chan bool{}
+type containerCreationMap struct {
+ refcount map[string]int
+ wait map[string]chan bool
+}
+
+var lxdContainerCreationMap = containerCreationMap{
+ refcount: make(map[string]int),
+ wait: make(map[string]chan bool),
+}
+
// lxdStorageMapLock is used to access lxdStorageOngoingOperationMap.
var lxdStorageMapLock sync.Mutex
@@ -60,6 +70,10 @@ func getContainerUmountLockID(poolName string, containerName string) string {
return fmt.Sprintf("umount/container/%s/%s", poolName, containerName)
}
+func getContainerCreateFromImageLockID(poolName string, fingerprint string) string {
+ return fmt.Sprintf("create/container/%s/%s", poolName, fingerprint)
+}
+
// Custom lock IDs.
func getCustomMountLockID(poolName string, volumeName string) string {
return fmt.Sprintf("mount/custom/%s/%s", poolName, volumeName)
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index 8daaa66..fdade56 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -544,40 +544,41 @@ func (s *storageZfs) ContainerCreateFromImage(container container, fingerprint s
containerName := container.Name()
fs := fmt.Sprintf("containers/%s", containerName)
containerPoolVolumeMntPoint := getContainerMountPoint(s.pool.Name, containerName)
-
imageNoPoolName := fmt.Sprintf("images/%s", fingerprint)
- poolName := s.getOnDiskPoolName()
- imageDataset := fmt.Sprintf("%s/images/%s", poolName, fingerprint)
- imageStoragePoolLockID := getImageCreateLockID(s.pool.Name, fingerprint)
+ // Add the container creation to the map or increase the refcount.
+ containerCreateFromImagePoolLockID := getContainerCreateFromImageLockID(s.pool.Name, fingerprint)
lxdStorageMapLock.Lock()
- if waitChannel, ok := lxdStorageOngoingOperationMap[imageStoragePoolLockID]; ok {
- lxdStorageMapLock.Unlock()
- if _, ok := <-waitChannel; ok {
- shared.LogWarnf("Received value over semaphore. This should not have happened.")
- }
- } else {
- lxdStorageOngoingOperationMap[imageStoragePoolLockID] = make(chan bool)
- lxdStorageMapLock.Unlock()
-
- var imgerr error
- if !zfsFilesystemEntityExists(imageDataset) {
- imgerr = s.ImageCreate(fingerprint)
- }
+ _, ok := lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID]
+ if !ok {
+ lxdContainerCreationMap.wait[containerCreateFromImagePoolLockID] = make(chan bool)
+ }
+ lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID]++
+ lxdStorageMapLock.Unlock()
+ defer func() {
lxdStorageMapLock.Lock()
- if waitChannel, ok := lxdStorageOngoingOperationMap[imageStoragePoolLockID]; ok {
- close(waitChannel)
- delete(lxdStorageOngoingOperationMap, imageStoragePoolLockID)
+ _, ok := lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID]
+ if ok {
+ lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID]--
+ if lxdContainerCreationMap.refcount[containerCreateFromImagePoolLockID] == 0 {
+ // Close wait channel.
+ close(lxdContainerCreationMap.wait[containerCreateFromImagePoolLockID])
+ // Delete refcount.
+ delete(lxdContainerCreationMap.refcount, containerCreateFromImagePoolLockID)
+ // Delete wait channel.
+ delete(lxdContainerCreationMap.wait, containerCreateFromImagePoolLockID)
+ }
}
lxdStorageMapLock.Unlock()
+ }()
- if imgerr != nil {
- return imgerr
- }
+ err := s.ImageCreate(fingerprint)
+ if err != nil {
+ return err
}
- err := s.zfsPoolVolumeClone(imageNoPoolName, "readonly", fs, containerPoolVolumeMntPoint)
+ err = s.zfsPoolVolumeClone(imageNoPoolName, "readonly", fs, containerPoolVolumeMntPoint)
if err != nil {
return err
}
@@ -1258,10 +1259,50 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
imageBaseDataset := fmt.Sprintf("images/%s", fingerprint)
imageDataset := fmt.Sprintf("%s/%s", poolName, imageBaseDataset)
- revert := true
- subrevert := true
+ // Check for any ongoing ImageCreate() operation.
+ imageCreatePoolLockID := getImageCreateLockID(s.pool.Name, fingerprint)
+ lxdStorageMapLock.Lock()
+ waitChannel, ok := lxdStorageOngoingOperationMap[imageCreatePoolLockID]
+ if ok {
+ lxdStorageMapLock.Unlock()
+ if _, ok := <-waitChannel; ok {
+ shared.LogWarnf("Received value over semaphore. This should not have happened.")
+ }
+ return nil
+ }
+ lxdStorageOngoingOperationMap[imageCreatePoolLockID] = make(chan bool)
+ lxdStorageMapLock.Unlock()
+
+ // Wait for any operation that might conflict with us.
+ imageDeletePoolLockID := getImageDeleteLockID(s.pool.Name, fingerprint)
+ lxdStorageMapLock.Lock()
+ waitChannel, ok = lxdStorageOngoingOperationMap[imageDeletePoolLockID]
+ lxdStorageMapLock.Unlock()
+ if ok {
+ // Wait for the ImageDelete() operation to finish.
+ if _, ok := <-waitChannel; ok {
+ shared.LogWarnf("Received value over semaphore. This should not have happened.")
+ }
+ }
+
+ defer func() {
+ lxdStorageMapLock.Lock()
+ waitChannel, ok := lxdStorageOngoingOperationMap[imageCreatePoolLockID]
+ if ok {
+ close(waitChannel)
+ delete(lxdStorageOngoingOperationMap, imageCreatePoolLockID)
+ }
+ lxdStorageMapLock.Unlock()
+ }()
+
+ // Check if image already exists.
+ _, err := dbStoragePoolVolumeGetTypeID(s.d.db, fingerprint, storagePoolVolumeTypeImage, s.poolID)
+ if err == nil {
+ return nil
+ }
- err := s.createImageDbPoolVolume(fingerprint)
+ subrevert := true
+ err = s.createImageDbPoolVolume(fingerprint)
if err != nil {
return err
}
@@ -1272,6 +1313,7 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
s.deleteImageDbPoolVolume(fingerprint)
}()
+ revert := true
imageDeletedDataset := fmt.Sprintf("%s/deleted/%s", poolName, imageBaseDataset)
if zfsFilesystemEntityExists(imageDeletedDataset) {
err := zfsPoolVolumeRename(imageDeletedDataset, imageDataset)
@@ -1384,6 +1426,61 @@ func (s *storageZfs) ImageCreate(fingerprint string) error {
func (s *storageZfs) ImageDelete(fingerprint string) error {
shared.LogDebugf("Deleting ZFS storage volume for image \"%s\" on storage pool \"%s\".", fingerprint, s.pool.Name)
+ // Take a lock to check whether there is already an ImageDelete()
+ // progress.
+ imageDeletePoolLockID := getImageDeleteLockID(s.pool.Name, fingerprint)
+ lxdStorageMapLock.Lock()
+ waitChannel, ok := lxdStorageOngoingOperationMap[imageDeletePoolLockID]
+ if ok {
+ lxdStorageMapLock.Unlock()
+ if _, ok := <-waitChannel; ok {
+ shared.LogWarnf("Received value over semaphore. This should not have happened.")
+ }
+ return nil
+ }
+ lxdStorageOngoingOperationMap[imageDeletePoolLockID] = make(chan bool)
+ lxdStorageMapLock.Unlock()
+
+ // Wait for any already ongoing operation that might conflict with us.
+
+ // Check whether an image is created atm.
+ imageCreatePoolLockID := getImageCreateLockID(s.pool.Name, fingerprint)
+ lxdStorageMapLock.Lock()
+ waitChannel, ok = lxdStorageOngoingOperationMap[imageCreatePoolLockID]
+ lxdStorageMapLock.Unlock()
+ if ok {
+ if _, ok := <-waitChannel; ok {
+ shared.LogWarnf("Received value over semaphore. This should not have happened.")
+ }
+ }
+
+ // Check whether a container is created from that image.
+ containerCreateFromImagePoolLockID := getContainerCreateFromImageLockID(s.pool.Name, fingerprint)
+ lxdStorageMapLock.Lock()
+ waitChannel, ok = lxdContainerCreationMap.wait[containerCreateFromImagePoolLockID]
+ lxdStorageMapLock.Unlock()
+ if ok {
+ if _, ok := <-waitChannel; ok {
+ shared.LogWarnf("Received value over semaphore. This should not have happened.")
+ }
+ }
+
+ defer func() {
+ lxdStorageMapLock.Lock()
+ waitChannel, ok := lxdStorageOngoingOperationMap[imageDeletePoolLockID]
+ if ok {
+ close(waitChannel)
+ delete(lxdStorageOngoingOperationMap, imageDeletePoolLockID)
+ }
+ lxdStorageMapLock.Unlock()
+ }()
+
+ // Check if image already is already deleted.
+ _, err := dbStoragePoolVolumeGetTypeID(s.d.db, fingerprint, storagePoolVolumeTypeImage, s.poolID)
+ if err == NoSuchObjectError {
+ return nil
+ }
+
fs := fmt.Sprintf("images/%s", fingerprint)
if s.zfsFilesystemEntityExists(fs, true) {
@@ -1410,7 +1507,7 @@ func (s *storageZfs) ImageDelete(fingerprint string) error {
}
}
- err := s.deleteImageDbPoolVolume(fingerprint)
+ err = s.deleteImageDbPoolVolume(fingerprint)
if err != nil {
return err
}
More information about the lxc-devel
mailing list