[lxc-devel] [lxd/master] Adds migration volume source/target types and moves migration progress functions

tomponline on Github lxc-bot at linuxcontainers.org
Tue Oct 22 11:23:53 UTC 2019


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 505 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20191022/1fa5fbc6/attachment-0001.bin>
-------------- next part --------------
From 66f61a19bc7877a77a7c67bdc17febb6c5b73c68 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Tue, 22 Oct 2019 11:50:33 +0100
Subject: [PATCH 1/3] lxd/migration/migration/volumes: Adds migration volume
 arg types

And helper functions.

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/migration/migration_volumes.go | 181 +++++++++++++++++++++++++++++
 1 file changed, 181 insertions(+)
 create mode 100644 lxd/migration/migration_volumes.go

diff --git a/lxd/migration/migration_volumes.go b/lxd/migration/migration_volumes.go
new file mode 100644
index 0000000000..50a01f587d
--- /dev/null
+++ b/lxd/migration/migration_volumes.go
@@ -0,0 +1,181 @@
+package migration
+
+import (
+	"fmt"
+	"io"
+
+	"github.com/lxc/lxd/lxd/operations"
+	"github.com/lxc/lxd/shared"
+	"github.com/lxc/lxd/shared/ioprogress"
+	"github.com/lxc/lxd/shared/units"
+)
+
+// Type represents the migration transport type. It indicates the method by which the migration can
+// take place and what optional features are available.
+type Type struct {
+	FSType   MigrationFSType
+	Features []string
+}
+
+// ToHeader converts a Type to a MigrationHeader.
+func (t *Type) ToHeader() MigrationHeader {
+	missingFeature := false
+	hasFeature := true
+	header := MigrationHeader{Fs: &t.FSType}
+
+	if t.FSType == MigrationFSType_RSYNC {
+		features := RsyncFeatures{
+			Xattrs:        &missingFeature,
+			Delete:        &missingFeature,
+			Compress:      &missingFeature,
+			Bidirectional: &missingFeature,
+		}
+
+		for _, feature := range t.Features {
+			if feature == "xattrs" {
+				features.Xattrs = &hasFeature
+			} else if feature == "delete" {
+				features.Delete = &hasFeature
+			} else if feature == "compress" {
+				features.Compress = &hasFeature
+			} else if feature == "bidirectional" {
+				features.Bidirectional = &hasFeature
+			}
+		}
+
+		header.RsyncFeatures = &features
+	} else if t.FSType == MigrationFSType_ZFS {
+		features := ZfsFeatures{
+			Compress: &missingFeature,
+		}
+		for _, feature := range t.Features {
+			if feature == "xattrs" {
+				features.Compress = &hasFeature
+			}
+		}
+
+		header.ZfsFeatures = &features
+	}
+
+	return header
+}
+
+// VolumeSourceArgs represents the arguments needed to setup a volume migration source.
+type VolumeSourceArgs struct {
+	Name          string
+	Snapshots     []string
+	MigrationType Type
+	LXDExecPath   string
+}
+
+// VolumeTargetArgs represents the arguments needed to setup a volume migration sink.
+type VolumeTargetArgs struct {
+	Name          string
+	Description   string
+	Config        map[string]string
+	Snapshots     []string
+	MigrationType Type
+}
+
+// MatchTypes attempts to find a matching migration transport type between an offered type sent
+// from a remote source and the types supported by a local storage pool. If a match is found the
+// Type is returned containing only the matching optional features present in both.
+func MatchTypes(offeredType Type, ourTypes []Type) (Type, error) {
+	// Find first matching type.
+	for _, ourType := range ourTypes {
+		if offeredType.FSType != ourType.FSType {
+			continue // Not a much, try the next one.
+		}
+
+		// Find common features in both.
+		commonFeatures := []string{}
+		for _, ourFeature := range ourType.Features {
+			if shared.StringInSlice(ourFeature, offeredType.Features) {
+				commonFeatures = append(commonFeatures, ourFeature)
+			}
+		}
+
+		// Return type with combined features.
+		return Type{
+			FSType:   ourType.FSType,
+			Features: commonFeatures,
+		}, nil
+	}
+
+	// No matching transport type found.
+	return Type{}, fmt.Errorf("No matching migration type found")
+}
+
+// HeaderToType generates a Type from a Header. It fills the Type's Features property with the
+// relevant features slice in the Header based on the FS Type.
+func HeaderToType(header MigrationHeader) Type {
+	migrationType := Type{FSType: *header.Fs}
+	if *header.Fs == MigrationFSType_RSYNC {
+		migrationType.Features = header.GetRsyncFeaturesSlice()
+	} else if *header.Fs == MigrationFSType_ZFS {
+		migrationType.Features = header.GetZfsFeaturesSlice()
+	}
+
+	return migrationType
+}
+
+func progressWrapperRender(op *operations.Operation, key string, description string, progressInt int64, speedInt int64) {
+	meta := op.Metadata()
+	if meta == nil {
+		meta = make(map[string]interface{})
+	}
+
+	progress := fmt.Sprintf("%s (%s/s)", units.GetByteSizeString(progressInt, 2), units.GetByteSizeString(speedInt, 2))
+	if description != "" {
+		progress = fmt.Sprintf("%s: %s (%s/s)", description, units.GetByteSizeString(progressInt, 2), units.GetByteSizeString(speedInt, 2))
+	}
+
+	if meta[key] != progress {
+		meta[key] = progress
+		op.UpdateMetadata(meta)
+	}
+}
+
+// ProgressReader reports the read progress.
+func ProgressReader(op *operations.Operation, key string, description string) func(io.ReadCloser) io.ReadCloser {
+	return func(reader io.ReadCloser) io.ReadCloser {
+		if op == nil {
+			return reader
+		}
+
+		progress := func(progressInt int64, speedInt int64) {
+			progressWrapperRender(op, key, description, progressInt, speedInt)
+		}
+
+		readPipe := &ioprogress.ProgressReader{
+			ReadCloser: reader,
+			Tracker: &ioprogress.ProgressTracker{
+				Handler: progress,
+			},
+		}
+
+		return readPipe
+	}
+}
+
+// ProgressWriter reports the write progress.
+func ProgressWriter(op *operations.Operation, key string, description string) func(io.WriteCloser) io.WriteCloser {
+	return func(writer io.WriteCloser) io.WriteCloser {
+		if op == nil {
+			return writer
+		}
+
+		progress := func(progressInt int64, speedInt int64) {
+			progressWrapperRender(op, key, description, progressInt, speedInt)
+		}
+
+		writePipe := &ioprogress.ProgressWriter{
+			WriteCloser: writer,
+			Tracker: &ioprogress.ProgressTracker{
+				Handler: progress,
+			},
+		}
+
+		return writePipe
+	}
+}

From 69d41dc411060b3d47648381fd4434443ca0100a Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Tue, 22 Oct 2019 11:53:27 +0100
Subject: [PATCH 2/3] lxd/storage: Removes progress wrapper functions

These have been moved into migration package.

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/storage.go | 62 --------------------------------------------------
 1 file changed, 62 deletions(-)

diff --git a/lxd/storage.go b/lxd/storage.go
index 4f5a1d6289..7ee430bb13 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -24,7 +24,6 @@ import (
 	"github.com/lxc/lxd/shared/idmap"
 	"github.com/lxc/lxd/shared/ioprogress"
 	"github.com/lxc/lxd/shared/logger"
-	"github.com/lxc/lxd/shared/units"
 	"github.com/lxc/lxd/shared/version"
 )
 
@@ -741,67 +740,6 @@ func resetContainerDiskIdmap(container container, srcIdmap *idmap.IdmapSet) erro
 	return nil
 }
 
-func progressWrapperRender(op *operations.Operation, key string, description string, progressInt int64, speedInt int64) {
-	meta := op.Metadata()
-	if meta == nil {
-		meta = make(map[string]interface{})
-	}
-
-	progress := fmt.Sprintf("%s (%s/s)", units.GetByteSizeString(progressInt, 2), units.GetByteSizeString(speedInt, 2))
-	if description != "" {
-		progress = fmt.Sprintf("%s: %s (%s/s)", description, units.GetByteSizeString(progressInt, 2), units.GetByteSizeString(speedInt, 2))
-	}
-
-	if meta[key] != progress {
-		meta[key] = progress
-		op.UpdateMetadata(meta)
-	}
-}
-
-// StorageProgressReader reports the read progress.
-func StorageProgressReader(op *operations.Operation, key string, description string) func(io.ReadCloser) io.ReadCloser {
-	return func(reader io.ReadCloser) io.ReadCloser {
-		if op == nil {
-			return reader
-		}
-
-		progress := func(progressInt int64, speedInt int64) {
-			progressWrapperRender(op, key, description, progressInt, speedInt)
-		}
-
-		readPipe := &ioprogress.ProgressReader{
-			ReadCloser: reader,
-			Tracker: &ioprogress.ProgressTracker{
-				Handler: progress,
-			},
-		}
-
-		return readPipe
-	}
-}
-
-// StorageProgressWriter reports the write progress.
-func StorageProgressWriter(op *operations.Operation, key string, description string) func(io.WriteCloser) io.WriteCloser {
-	return func(writer io.WriteCloser) io.WriteCloser {
-		if op == nil {
-			return writer
-		}
-
-		progress := func(progressInt int64, speedInt int64) {
-			progressWrapperRender(op, key, description, progressInt, speedInt)
-		}
-
-		writePipe := &ioprogress.ProgressWriter{
-			WriteCloser: writer,
-			Tracker: &ioprogress.ProgressTracker{
-				Handler: progress,
-			},
-		}
-
-		return writePipe
-	}
-}
-
 func SetupStorageDriver(s *state.State, forceCheck bool) error {
 	pools, err := s.Cluster.StoragePoolsNotPending()
 	if err != nil {

From 67d96c8d85b9c7aa686bf539829ac5a63fc68f63 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Tue, 22 Oct 2019 12:22:21 +0100
Subject: [PATCH 3/3] lxd: Update use of migration progress functions

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/storage_btrfs.go           |  4 ++--
 lxd/storage_ceph.go            |  4 ++--
 lxd/storage_migration.go       | 22 +++++++++++-----------
 lxd/storage_migration_btrfs.go |  7 ++++---
 lxd/storage_migration_ceph.go  |  7 ++++---
 lxd/storage_migration_zfs.go   |  7 ++++---
 lxd/storage_zfs.go             |  6 +++---
 7 files changed, 30 insertions(+), 27 deletions(-)

diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go
index 8690f2a8ae..f875194b0e 100644
--- a/lxd/storage_btrfs.go
+++ b/lxd/storage_btrfs.go
@@ -2612,7 +2612,7 @@ func (s *storageBtrfs) MigrationSink(conn *websocket.Conn, op *operations.Operat
 				return err
 			}
 
-			wrapper := StorageProgressWriter(op, "fs_progress", *snap.Name)
+			wrapper := migration.ProgressWriter(op, "fs_progress", *snap.Name)
 			err = btrfsRecv(*(snap.Name), tmpSnapshotMntPoint, snapshotMntPoint, true, wrapper)
 			if err != nil {
 				return err
@@ -2633,7 +2633,7 @@ func (s *storageBtrfs) MigrationSink(conn *websocket.Conn, op *operations.Operat
 		return err
 	}
 
-	wrapper := StorageProgressWriter(op, "fs_progress", instanceName)
+	wrapper := migration.ProgressWriter(op, "fs_progress", instanceName)
 	containerMntPoint := driver.GetContainerMountPoint(args.Instance.Project(), s.pool.Name, instanceName)
 	err = btrfsRecv("", tmpContainerMntPoint, containerMntPoint, false, wrapper)
 	if err != nil {
diff --git a/lxd/storage_ceph.go b/lxd/storage_ceph.go
index b6c4d5e29d..24e8f7fd6a 100644
--- a/lxd/storage_ceph.go
+++ b/lxd/storage_ceph.go
@@ -2930,7 +2930,7 @@ func (s *storageCeph) MigrationSink(conn *websocket.Conn, op *operations.Operati
 		}
 		logger.Debugf(`Created empty RBD storage volume for container "%s" on storage pool "%s`, instanceName, s.OSDPoolName)
 
-		wrapper := StorageProgressWriter(op, "fs_progress", curSnapName)
+		wrapper := migration.ProgressWriter(op, "fs_progress", curSnapName)
 		err = s.rbdRecv(conn, recvName, wrapper)
 		if err != nil {
 			logger.Errorf(`Failed to receive RBD storage volume "%s": %s`, curSnapName, err)
@@ -2965,7 +2965,7 @@ func (s *storageCeph) MigrationSink(conn *websocket.Conn, op *operations.Operati
 	}()
 
 	// receive the container itself
-	wrapper := StorageProgressWriter(op, "fs_progress", instanceName)
+	wrapper := migration.ProgressWriter(op, "fs_progress", instanceName)
 	err := s.rbdRecv(conn, recvName, wrapper)
 	if err != nil {
 		logger.Errorf(`Failed to receive RBD storage volume "%s": %s`, recvName, err)
diff --git a/lxd/storage_migration.go b/lxd/storage_migration.go
index 7bb005266d..01e26f406d 100644
--- a/lxd/storage_migration.go
+++ b/lxd/storage_migration.go
@@ -68,7 +68,7 @@ func (s rsyncStorageSourceDriver) SendStorageVolume(conn *websocket.Conn, op *op
 		}
 
 		for _, snap := range snapshots {
-			wrapper := StorageProgressReader(op, "fs_progress", snap)
+			wrapper := migration.ProgressReader(op, "fs_progress", snap)
 			path := driver.GetStoragePoolVolumeSnapshotMountPoint(pool.Name, snap)
 			path = shared.AddSlash(path)
 			logger.Debugf("Starting to send storage volume snapshot %s on storage pool %s from %s", snap, pool.Name, path)
@@ -80,7 +80,7 @@ func (s rsyncStorageSourceDriver) SendStorageVolume(conn *websocket.Conn, op *op
 		}
 	}
 
-	wrapper := StorageProgressReader(op, "fs_progress", volume.Name)
+	wrapper := migration.ProgressReader(op, "fs_progress", volume.Name)
 	path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name)
 	path = shared.AddSlash(path)
 	logger.Debugf("Starting to send storage volume %s on storage pool %s from %s", volume.Name, pool.Name, path)
@@ -106,7 +106,7 @@ func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn, op *ope
 			}
 
 			path := send.Path()
-			wrapper := StorageProgressReader(op, "fs_progress", send.Name())
+			wrapper := migration.ProgressReader(op, "fs_progress", send.Name())
 			state := s.container.DaemonState()
 			err = rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(path), conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
 			if err != nil {
@@ -115,7 +115,7 @@ func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn, op *ope
 		}
 	}
 
-	wrapper := StorageProgressReader(op, "fs_progress", s.container.Name())
+	wrapper := migration.ProgressReader(op, "fs_progress", s.container.Name())
 	state := s.container.DaemonState()
 
 	// Attempt to freeze the container to avoid changing files during transfer
@@ -259,7 +259,7 @@ func rsyncStorageMigrationSink(conn *websocket.Conn, op *operations.Operation, a
 				return err
 			}
 
-			wrapper := StorageProgressWriter(op, "fs_progress", target.Name)
+			wrapper := migration.ProgressWriter(op, "fs_progress", target.Name)
 			path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name)
 			path = shared.AddSlash(path)
 			logger.Debugf("Starting to receive storage volume snapshot %s on storage pool %s into %s", target.Name, pool.Name, path)
@@ -276,7 +276,7 @@ func rsyncStorageMigrationSink(conn *websocket.Conn, op *operations.Operation, a
 		}
 	}
 
-	wrapper := StorageProgressWriter(op, "fs_progress", volume.Name)
+	wrapper := migration.ProgressWriter(op, "fs_progress", volume.Name)
 	path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name)
 	path = shared.AddSlash(path)
 	logger.Debugf("Starting to receive storage volume %s on storage pool %s into %s", volume.Name, pool.Name, path)
@@ -356,7 +356,7 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
 					}
 				}
 
-				wrapper := StorageProgressWriter(op, "fs_progress", s.Name())
+				wrapper := migration.ProgressWriter(op, "fs_progress", s.Name())
 				if err := rsync.Recv(shared.AddSlash(s.Path()), conn, wrapper, args.RsyncFeatures); err != nil {
 					return err
 				}
@@ -371,7 +371,7 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
 			}
 		}
 
-		wrapper := StorageProgressWriter(op, "fs_progress", args.Instance.Name())
+		wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name())
 		err = rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures)
 		if err != nil {
 			return err
@@ -409,7 +409,7 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
 					}
 				}
 
-				wrapper := StorageProgressWriter(op, "fs_progress", snap.GetName())
+				wrapper := migration.ProgressWriter(op, "fs_progress", snap.GetName())
 				err := rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures)
 				if err != nil {
 					return err
@@ -434,7 +434,7 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
 			}
 		}
 
-		wrapper := StorageProgressWriter(op, "fs_progress", args.Instance.Name())
+		wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name())
 		err = rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures)
 		if err != nil {
 			return err
@@ -443,7 +443,7 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
 
 	if args.Live {
 		/* now receive the final sync */
-		wrapper := StorageProgressWriter(op, "fs_progress", args.Instance.Name())
+		wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name())
 		err := rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures)
 		if err != nil {
 			return err
diff --git a/lxd/storage_migration_btrfs.go b/lxd/storage_migration_btrfs.go
index 15a3c936c3..55313e0e38 100644
--- a/lxd/storage_migration_btrfs.go
+++ b/lxd/storage_migration_btrfs.go
@@ -9,6 +9,7 @@ import (
 
 	"github.com/gorilla/websocket"
 
+	"github.com/lxc/lxd/lxd/migration"
 	"github.com/lxc/lxd/lxd/operations"
 	driver "github.com/lxc/lxd/lxd/storage"
 	"github.com/lxc/lxd/shared"
@@ -98,7 +99,7 @@ func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *
 		}
 		defer btrfsSubVolumesDelete(migrationSendSnapshot)
 
-		wrapper := StorageProgressReader(op, "fs_progress", containerName)
+		wrapper := migration.ProgressReader(op, "fs_progress", containerName)
 		return s.send(conn, migrationSendSnapshot, "", wrapper)
 	}
 
@@ -110,7 +111,7 @@ func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *
 			}
 
 			snapMntPoint := driver.GetSnapshotMountPoint(snap.Project(), containerPool, snap.Name())
-			wrapper := StorageProgressReader(op, "fs_progress", snap.Name())
+			wrapper := migration.ProgressReader(op, "fs_progress", snap.Name())
 			if err := s.send(conn, snapMntPoint, prev, wrapper); err != nil {
 				return err
 			}
@@ -141,7 +142,7 @@ func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *
 		btrfsParent = s.btrfsSnapshotNames[len(s.btrfsSnapshotNames)-1]
 	}
 
-	wrapper := StorageProgressReader(op, "fs_progress", containerName)
+	wrapper := migration.ProgressReader(op, "fs_progress", containerName)
 	return s.send(conn, migrationSendSnapshot, btrfsParent, wrapper)
 }
 
diff --git a/lxd/storage_migration_ceph.go b/lxd/storage_migration_ceph.go
index aa13e326d4..1817bfec07 100644
--- a/lxd/storage_migration_ceph.go
+++ b/lxd/storage_migration_ceph.go
@@ -9,6 +9,7 @@ import (
 	"github.com/gorilla/websocket"
 	"github.com/pborman/uuid"
 
+	"github.com/lxc/lxd/lxd/migration"
 	"github.com/lxc/lxd/lxd/operations"
 	"github.com/lxc/lxd/lxd/project"
 	"github.com/lxc/lxd/shared"
@@ -85,7 +86,7 @@ func (s *rbdMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn,
 			s.ceph.OSDPoolName,
 			containerOnlyName,
 			snapOnlyName)
-		wrapper := StorageProgressReader(op, "fs_progress", containerName)
+		wrapper := migration.ProgressReader(op, "fs_progress", containerName)
 
 		err := s.rbdSend(conn, sendName, "", wrapper)
 		if err != nil {
@@ -113,7 +114,7 @@ func (s *rbdMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn,
 				project.Prefix(s.container.Project(), containerName),
 				snap)
 
-			wrapper := StorageProgressReader(op, "fs_progress", snap)
+			wrapper := migration.ProgressReader(op, "fs_progress", snap)
 
 			err := s.rbdSend(
 				conn,
@@ -139,7 +140,7 @@ func (s *rbdMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn,
 
 	cur := fmt.Sprintf("%s/container_%s@%s", s.ceph.OSDPoolName,
 		project.Prefix(s.container.Project(), containerName), s.runningSnapName)
-	wrapper := StorageProgressReader(op, "fs_progress", containerName)
+	wrapper := migration.ProgressReader(op, "fs_progress", containerName)
 	err = s.rbdSend(conn, cur, lastSnap, wrapper)
 	if err != nil {
 		logger.Errorf(`Failed to send exported diff of RBD storage volume "%s" from snapshot "%s": %s`, s.runningSnapName, lastSnap, err)
diff --git a/lxd/storage_migration_zfs.go b/lxd/storage_migration_zfs.go
index f3bc582efe..72747a08a4 100644
--- a/lxd/storage_migration_zfs.go
+++ b/lxd/storage_migration_zfs.go
@@ -9,6 +9,7 @@ import (
 	"github.com/gorilla/websocket"
 	"github.com/pborman/uuid"
 
+	"github.com/lxc/lxd/lxd/migration"
 	"github.com/lxc/lxd/lxd/operations"
 	"github.com/lxc/lxd/lxd/project"
 	"github.com/lxc/lxd/shared"
@@ -83,7 +84,7 @@ func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *op
 	if s.instance.IsSnapshot() {
 		_, snapOnlyName, _ := shared.ContainerGetParentAndSnapshotName(s.instance.Name())
 		snapshotName := fmt.Sprintf("snapshot-%s", snapOnlyName)
-		wrapper := StorageProgressReader(op, "fs_progress", s.instance.Name())
+		wrapper := migration.ProgressReader(op, "fs_progress", s.instance.Name())
 		return s.send(conn, snapshotName, "", wrapper)
 	}
 
@@ -97,7 +98,7 @@ func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *op
 
 			lastSnap = snap
 
-			wrapper := StorageProgressReader(op, "fs_progress", snap)
+			wrapper := migration.ProgressReader(op, "fs_progress", snap)
 			if err := s.send(conn, snap, prev, wrapper); err != nil {
 				return err
 			}
@@ -109,7 +110,7 @@ func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *op
 		return err
 	}
 
-	wrapper := StorageProgressReader(op, "fs_progress", s.instance.Name())
+	wrapper := migration.ProgressReader(op, "fs_progress", s.instance.Name())
 	if err := s.send(conn, s.runningSnapName, lastSnap, wrapper); err != nil {
 		return err
 	}
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index a124173704..da4606ce8f 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -2653,7 +2653,7 @@ func (s *storageZfs) MigrationSink(conn *websocket.Conn, op *operations.Operatio
 			return err
 		}
 
-		wrapper := StorageProgressWriter(op, "fs_progress", snap.GetName())
+		wrapper := migration.ProgressWriter(op, "fs_progress", snap.GetName())
 		name := fmt.Sprintf("containers/%s at snapshot-%s", project.Prefix(args.Instance.Project(), args.Instance.Name()), snap.GetName())
 		if err := zfsRecv(name, wrapper); err != nil {
 			return err
@@ -2687,14 +2687,14 @@ func (s *storageZfs) MigrationSink(conn *websocket.Conn, op *operations.Operatio
 	}()
 
 	/* finally, do the real container */
-	wrapper := StorageProgressWriter(op, "fs_progress", args.Instance.Name())
+	wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name())
 	if err := zfsRecv(zfsName, wrapper); err != nil {
 		return err
 	}
 
 	if args.Live {
 		/* and again for the post-running snapshot if this was a live migration */
-		wrapper := StorageProgressWriter(op, "fs_progress", args.Instance.Name())
+		wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name())
 		if err := zfsRecv(zfsName, wrapper); err != nil {
 			return err
 		}


More information about the lxc-devel mailing list