[lxc-devel] [lxd/master] migration: attempt to be slightly smart about moving filesystems

tych0 on Github lxc-bot at linuxcontainers.org
Tue Mar 8 18:39:10 UTC 2016


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 646 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20160308/f1de5eb5/attachment.bin>
-------------- next part --------------
From 390975ac7157ed9c229f63c7a53ba625ee86cc18 Mon Sep 17 00:00:00 2001
From: Tycho Andersen <tycho.andersen at canonical.com>
Date: Thu, 3 Mar 2016 07:48:19 -0700
Subject: [PATCH] migration: attempt to be slightly smart about moving
 filesystems

This breaks up live migrations into two parts: first we send a snapshot of
the filesystem while the container is running (and all of its snapshots),
then we do the checkpoint and do a final incremental send. The goal here is
to minimize the amount of downtime for the container.

Signed-off-by: Tycho Andersen <tycho.andersen at canonical.com>
---
 lxd/migrate.go       | 163 ++++++++++++++++++++---------------------
 lxd/storage.go       |  86 ++++++++++++++--------
 lxd/storage_btrfs.go | 192 ++++++++++++++++++++++++++-----------------------
 lxd/storage_dir.go   |   6 +-
 lxd/storage_lvm.go   |   6 +-
 lxd/storage_zfs.go   | 199 ++++++++++++++++++++++++++++++---------------------
 6 files changed, 368 insertions(+), 284 deletions(-)

diff --git a/lxd/migrate.go b/lxd/migrate.go
index e6ec1a1..0150e66 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -299,25 +299,15 @@ func (s *migrationSourceWs) Do(op *operation) error {
 		}
 	}
 
-	sources, fsErr := s.container.Storage().MigrationSource(s.container)
+	driver, fsErr := s.container.Storage().MigrationSource(s.container)
 	/* the protocol says we have to send a header no matter what, so let's
 	 * do that, but then immediately send an error.
 	 */
 	snapshots := []string{}
 	if fsErr == nil {
-		/* A bit of a special case here: doing lxc launch
-		 * host2:c1/snap1 host1:container we're sending a snapshot, but
-		 * it ends up as the container on the other end. So, we want to
-		 * send it as the main container (i.e. ignore its IsSnapshot()).
-		 */
-		if len(sources) > 1 {
-			for _, snap := range sources {
-				if !snap.IsSnapshot() {
-					continue
-				}
-				name := shared.ExtractSnapshotName(snap.Name())
-				snapshots = append(snapshots, name)
-			}
+		fullSnaps := driver.Snapshots()
+		for _, snap := range fullSnaps {
+			snapshots = append(snapshots, shared.ExtractSnapshotName(snap.Name()))
 		}
 	}
 
@@ -348,7 +338,14 @@ func (s *migrationSourceWs) Do(op *operation) error {
 		myType = MigrationFSType_RSYNC
 		header.Fs = &myType
 
-		sources, _ = rsyncMigrationSource(s.container)
+		driver, _ = rsyncMigrationSource(s.container)
+	}
+
+	defer driver.Cleanup()
+
+	if err := driver.SendWhileRunning(s.fsConn); err != nil {
+		s.sendControl(err)
+		return err
 	}
 
 	if s.live {
@@ -402,11 +399,8 @@ func (s *migrationSourceWs) Do(op *operation) error {
 			s.sendControl(err)
 			return err
 		}
-	}
 
-	for _, source := range sources {
-		shared.Debugf("sending fs object %s", source.Name())
-		if err := source.Send(s.fsConn); err != nil {
+		if err := driver.SendAfterCheckpoint(s.fsConn); err != nil {
 			s.sendControl(err)
 			return err
 		}
@@ -536,12 +530,68 @@ func (c *migrationSink) do() error {
 		imagesDir := ""
 		srcIdmap := new(shared.IdmapSet)
 
+		snapshots := []container{}
+		for _, snap := range header.Snapshots {
+			// TODO: we need to propagate snapshot configurations
+			// as well. Right now the container configuration is
+			// done through the initial migration post. Should we
+			// post the snapshots and their configs as well, or do
+			// it some other way?
+			name := c.container.Name() + shared.SnapshotDelimiter + snap
+			args := containerArgs{
+				Ctype:        cTypeSnapshot,
+				Config:       c.container.LocalConfig(),
+				Profiles:     c.container.Profiles(),
+				Ephemeral:    c.container.IsEphemeral(),
+				Architecture: c.container.Architecture(),
+				Devices:      c.container.LocalDevices(),
+				Name:         name,
+			}
+
+			ct, err := containerCreateEmptySnapshot(c.container.Daemon(), args)
+			if err != nil {
+				restore <- err
+				return
+			}
+			snapshots = append(snapshots, ct)
+		}
+
+		for _, idmap := range header.Idmap {
+			e := shared.IdmapEntry{
+				Isuid:    *idmap.Isuid,
+				Isgid:    *idmap.Isgid,
+				Nsid:     int(*idmap.Nsid),
+				Hostid:   int(*idmap.Hostid),
+				Maprange: int(*idmap.Maprange)}
+			srcIdmap.Idmap = shared.Extend(srcIdmap.Idmap, e)
+		}
+
+		/* We do the fs receive in parallel so we don't have to reason
+		 * about when to receive what. The sending side is smart enough
+		 * to send the filesystem bits that it can before it seizes the
+		 * container to start checkpointing, so the total transfer time
+		 * will be minimized even if we're dumb here.
+		 */
+		fsTransfer := make(chan error)
+		go func() {
+			if err := mySink(c.live, c.container, snapshots, c.fsConn); err != nil {
+				fsTransfer <- err
+				return
+			}
+
+			if err := ShiftIfNecessary(c.container, srcIdmap); err != nil {
+				fsTransfer <- err
+				return
+			}
+
+			fsTransfer <- nil
+		}()
+
 		if c.live {
 			var err error
 			imagesDir, err = ioutil.TempDir("", "lxd_restore_")
 			if err != nil {
 				os.RemoveAll(imagesDir)
-				c.sendControl(err)
 				return
 			}
 
@@ -560,8 +610,6 @@ func (c *migrationSink) do() error {
 
 			if err := RsyncRecv(shared.AddSlash(imagesDir), c.criuConn); err != nil {
 				restore <- err
-				os.RemoveAll(imagesDir)
-				c.sendControl(err)
 				return
 			}
 
@@ -574,70 +622,17 @@ func (c *migrationSink) do() error {
 			if !c.container.IsPrivileged() {
 				if err := c.container.IdmapSet().ShiftRootfs(imagesDir); err != nil {
 					restore <- err
-					os.RemoveAll(imagesDir)
-					c.sendControl(err)
 					return
 				}
 			}
 		}
 
-		snapshots := []container{}
-		for _, snap := range header.Snapshots {
-			// TODO: we need to propagate snapshot configurations
-			// as well. Right now the container configuration is
-			// done through the initial migration post. Should we
-			// post the snapshots and their configs as well, or do
-			// it some other way?
-			name := c.container.Name() + shared.SnapshotDelimiter + snap
-			args := containerArgs{
-				Ctype:        cTypeSnapshot,
-				Config:       c.container.LocalConfig(),
-				Profiles:     c.container.Profiles(),
-				Ephemeral:    c.container.IsEphemeral(),
-				Architecture: c.container.Architecture(),
-				Devices:      c.container.LocalDevices(),
-				Name:         name,
-			}
-
-			ct, err := containerCreateEmptySnapshot(c.container.Daemon(), args)
-			if err != nil {
-				restore <- err
-				c.sendControl(err)
-				return
-			}
-			snapshots = append(snapshots, ct)
-		}
-
-		for _, idmap := range header.Idmap {
-			e := shared.IdmapEntry{
-				Isuid:    *idmap.Isuid,
-				Isgid:    *idmap.Isgid,
-				Nsid:     int(*idmap.Nsid),
-				Hostid:   int(*idmap.Hostid),
-				Maprange: int(*idmap.Maprange)}
-			srcIdmap.Idmap = shared.Extend(srcIdmap.Idmap, e)
-		}
-
-		if err := mySink(c.container, snapshots, c.fsConn); err != nil {
-			restore <- err
-			c.sendControl(err)
-			return
-		}
-
-		if err := ShiftIfNecessary(c.container, srcIdmap); err != nil {
+		err := <-fsTransfer
+		if err != nil {
 			restore <- err
-			c.sendControl(err)
 			return
 		}
 
-		for _, snap := range snapshots {
-			if err := ShiftIfNecessary(snap, srcIdmap); err != nil {
-				restore <- err
-				c.sendControl(err)
-				return
-			}
-		}
-
 		if c.live {
 			err := c.container.StartFromMigration(imagesDir)
 			if err != nil {
@@ -648,12 +643,20 @@ func (c *migrationSink) do() error {
 					log = err.Error()
 				}
 				err = fmt.Errorf("restore failed:\n%s", log)
+				restore <- err
+				return
 			}
 
-			restore <- err
-		} else {
-			restore <- nil
 		}
+
+		for _, snap := range snapshots {
+			if err := ShiftIfNecessary(snap, srcIdmap); err != nil {
+				restore <- err
+				return
+			}
+		}
+
+		restore <- nil
 	}(c)
 
 	source := c.controlChannel()
diff --git a/lxd/storage.go b/lxd/storage.go
index 2af685a..5b3863a 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -109,10 +109,26 @@ func storageTypeToString(sType storageType) string {
 	return "dir"
 }
 
-type MigrationStorageSource interface {
-	Name() string
-	IsSnapshot() bool
-	Send(conn *websocket.Conn) error
+type MigrationStorageSourceDriver interface {
+	/* snapshots for this container, if any */
+	Snapshots() []container
+
+	/* send any bits of the container/snapshots that are possible while the
+	 * container is still running.
+	 */
+	SendWhileRunning(conn *websocket.Conn) error
+
+	/* send the final bits (e.g. a final delta snapshot for zfs, btrfs, or
+	 * do a final rsync) of the fs after the container has been
+	 * checkpointed. This will only be called when a container is actually
+	 * being live migrated.
+	 */
+	SendAfterCheckpoint(conn *websocket.Conn) error
+
+	/* Called after either success or failure of a migration, can be used
+	 * to clean up any temporary snapshots, etc.
+	 */
+	Cleanup()
 }
 
 type storage interface {
@@ -170,8 +186,8 @@ type storage interface {
 	// We leave sending containers which are snapshots of other containers
 	// already present on the target instance as an exercise for the
 	// enterprising developer.
-	MigrationSource(container container) ([]MigrationStorageSource, error)
-	MigrationSink(container container, objects []container, conn *websocket.Conn) error
+	MigrationSource(container container) (MigrationStorageSourceDriver, error)
+	MigrationSink(live bool, container container, objects []container, conn *websocket.Conn) error
 }
 
 func newStorage(d *Daemon, sType storageType) (storage, error) {
@@ -521,23 +537,24 @@ func (lw *storageLogWrapper) MigrationType() MigrationFSType {
 	return lw.w.MigrationType()
 }
 
-func (lw *storageLogWrapper) MigrationSource(container container) ([]MigrationStorageSource, error) {
+func (lw *storageLogWrapper) MigrationSource(container container) (MigrationStorageSourceDriver, error) {
 	lw.log.Debug("MigrationSource", log.Ctx{"container": container.Name()})
 	return lw.w.MigrationSource(container)
 }
 
-func (lw *storageLogWrapper) MigrationSink(container container, objects []container, conn *websocket.Conn) error {
+func (lw *storageLogWrapper) MigrationSink(live bool, container container, objects []container, conn *websocket.Conn) error {
 	objNames := []string{}
 	for _, obj := range objects {
 		objNames = append(objNames, obj.Name())
 	}
 
 	lw.log.Debug("MigrationSink", log.Ctx{
+		"live":      live,
 		"container": container.Name(),
 		"objects":   objNames,
 	})
 
-	return lw.w.MigrationSink(container, objects, conn)
+	return lw.w.MigrationSink(live, container, objects, conn)
 }
 
 func ShiftIfNecessary(container container, srcIdmap *shared.IdmapSet) error {
@@ -567,41 +584,47 @@ func ShiftIfNecessary(container container, srcIdmap *shared.IdmapSet) error {
 	return nil
 }
 
-type rsyncStorageSource struct {
+type rsyncStorageSourceDriver struct {
 	container container
+	snapshots []container
 }
 
-func (s *rsyncStorageSource) Name() string {
-	return s.container.Name()
+func (s rsyncStorageSourceDriver) Snapshots() []container {
+	return s.snapshots
 }
 
-func (s *rsyncStorageSource) IsSnapshot() bool {
-	return s.container.IsSnapshot()
+func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
+	toSend := append([]container{s.container}, s.snapshots...)
+
+	for _, send := range toSend {
+		path := send.Path()
+		if err := RsyncSend(shared.AddSlash(path), conn); err != nil {
+			return err
+		}
+	}
+
+	return nil
 }
 
-func (s *rsyncStorageSource) Send(conn *websocket.Conn) error {
-	path := s.container.Path()
-	return RsyncSend(shared.AddSlash(path), conn)
+func (s rsyncStorageSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error {
+	/* resync anything that changed between our first send and the checkpoint */
+	return RsyncSend(shared.AddSlash(s.container.Path()), conn)
 }
 
-func rsyncMigrationSource(container container) ([]MigrationStorageSource, error) {
-	sources := []MigrationStorageSource{}
+func (s rsyncStorageSourceDriver) Cleanup() {
+	/* no-op */
+}
 
-	/* transfer the container, and then all the snapshots */
-	sources = append(sources, &rsyncStorageSource{container})
-	snaps, err := container.Snapshots()
+func rsyncMigrationSource(container container) (MigrationStorageSourceDriver, error) {
+	snapshots, err := container.Snapshots()
 	if err != nil {
 		return nil, err
 	}
 
-	for _, snap := range snaps {
-		sources = append(sources, &rsyncStorageSource{snap})
-	}
-
-	return sources, nil
+	return rsyncStorageSourceDriver{container, snapshots}, nil
 }
 
-func rsyncMigrationSink(container container, snapshots []container, conn *websocket.Conn) error {
+func rsyncMigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error {
 	/* the first object is the actual container */
 	if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
 		return err
@@ -620,5 +643,12 @@ func rsyncMigrationSink(container container, snapshots []container, conn *websoc
 		}
 	}
 
+	if live {
+		/* now receive the final sync */
+		if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
+			return err
+		}
+	}
+
 	return nil
 }
diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go
index e4ca28f..e648e90 100644
--- a/lxd/storage_btrfs.go
+++ b/lxd/storage_btrfs.go
@@ -807,56 +807,38 @@ func (s *storageBtrfs) getSubVolumes(path string) ([]string, error) {
 	return result, nil
 }
 
-type btrfsMigrationSource struct {
-	lxdName            string
-	deleteAfterSending bool
-	btrfsPath          string
-	btrfsParent        string
-
-	btrfs *storageBtrfs
-}
-
-func (s btrfsMigrationSource) Name() string {
-	return s.lxdName
+type btrfsMigrationSourceDriver struct {
+	container          container
+	snapshots          []container
+	btrfsSnapshotNames []string
+	btrfs              *storageBtrfs
+	runningSnapName    string
+	stoppedSnapName    string
 }
 
-func (s btrfsMigrationSource) IsSnapshot() bool {
-	return !s.deleteAfterSending
+func (s *btrfsMigrationSourceDriver) Snapshots() []container {
+	return s.snapshots
 }
 
-func (s btrfsMigrationSource) Send(conn *websocket.Conn) error {
-	args := []string{"send", s.btrfsPath}
-	if s.btrfsParent != "" {
-		args = append(args, "-p", s.btrfsParent)
+func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string, btrfsParent string) error {
+	args := []string{"send", btrfsPath}
+	if btrfsParent != "" {
+		args = append(args, "-p", btrfsParent)
 	}
 
 	cmd := exec.Command("btrfs", args...)
 
-	deleteAfterSending := func(path string) {
-		s.btrfs.subvolsDelete(path)
-		os.Remove(filepath.Dir(path))
-	}
-
 	stdout, err := cmd.StdoutPipe()
 	if err != nil {
-		if s.deleteAfterSending {
-			deleteAfterSending(s.btrfsPath)
-		}
 		return err
 	}
 
 	stderr, err := cmd.StderrPipe()
 	if err != nil {
-		if s.deleteAfterSending {
-			deleteAfterSending(s.btrfsPath)
-		}
 		return err
 	}
 
 	if err := cmd.Start(); err != nil {
-		if s.deleteAfterSending {
-			deleteAfterSending(s.btrfsPath)
-		}
 		return err
 	}
 
@@ -871,97 +853,125 @@ func (s btrfsMigrationSource) Send(conn *websocket.Conn) error {
 	if err != nil {
 		shared.Log.Error("problem with btrfs send", "output", string(output))
 	}
-	if s.deleteAfterSending {
-		deleteAfterSending(s.btrfsPath)
-	}
 	return err
 }
 
-func (s *storageBtrfs) MigrationType() MigrationFSType {
-	if runningInUserns {
-		return MigrationFSType_RSYNC
-	} else {
-		return MigrationFSType_BTRFS
-	}
-}
-
-func (s *storageBtrfs) MigrationSource(c container) ([]MigrationStorageSource, error) {
-	if runningInUserns {
-		return rsyncMigrationSource(c)
-	}
-
-	sources := []MigrationStorageSource{}
-
-	/* If the container is a snapshot, let's just send that; we don't need
-	 * to send anything else, because that's all the user asked for.
-	 */
-	if c.IsSnapshot() {
-		tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", c.Name(), uuid.NewRandom().String()), true)
+func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
+	if s.container.IsSnapshot() {
+		tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", s.container.Name(), uuid.NewRandom().String()), true)
 		err := os.MkdirAll(tmpPath, 0700)
 		if err != nil {
-			return nil, err
+			return err
 		}
 
 		btrfsPath := fmt.Sprintf("%s/.root", tmpPath)
-		if err := s.subvolSnapshot(c.Path(), btrfsPath, true); err != nil {
-			return nil, err
+		if err := s.btrfs.subvolSnapshot(s.container.Path(), btrfsPath, true); err != nil {
+			return err
 		}
 
-		sources = append(sources, btrfsMigrationSource{c.Name(), true, btrfsPath, "", s})
-		return sources, nil
-	}
+		defer s.btrfs.subvolDelete(btrfsPath)
 
-	/* List all the snapshots in order of reverse creation. The idea here
-	 * is that we send the oldest to newest snapshot, hopefully saving on
-	 * xfer costs. Then, after all that, we send the container itself.
-	 */
-	snapshots, err := c.Snapshots()
-	if err != nil {
-		return nil, err
+		return s.send(conn, btrfsPath, "")
 	}
 
-	for i, snap := range snapshots {
-		var prev container
+	for i, snap := range s.snapshots {
+		prev := ""
 		if i > 0 {
-			prev = snapshots[i-1]
+			prev = s.snapshots[i-1].Path()
 		}
 
-		btrfsPath := snap.Path()
-		parentName := ""
-		if prev != nil {
-			parentName = prev.Path()
+		if err := s.send(conn, snap.Path(), prev); err != nil {
+			return err
 		}
-
-		sources = append(sources, btrfsMigrationSource{snap.Name(), false, btrfsPath, parentName, s})
 	}
 
 	/* We can't send running fses, so let's snapshot the fs and send
 	 * the snapshot.
 	 */
-	tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", c.Name(), uuid.NewRandom().String()), true)
-	err = os.MkdirAll(tmpPath, 0700)
+	tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", s.container.Name(), uuid.NewRandom().String()), true)
+	err := os.MkdirAll(tmpPath, 0700)
 	if err != nil {
-		return nil, err
+		return err
 	}
 
-	btrfsPath := fmt.Sprintf("%s/.root", tmpPath)
-	if err := s.subvolSnapshot(c.Path(), btrfsPath, true); err != nil {
-		return nil, err
+	s.runningSnapName = fmt.Sprintf("%s/.root", tmpPath)
+	if err := s.btrfs.subvolSnapshot(s.container.Path(), s.runningSnapName, true); err != nil {
+		return err
 	}
 
 	btrfsParent := ""
-	if len(sources) > 0 {
-		btrfsParent = sources[len(sources)-1].(btrfsMigrationSource).btrfsPath
+	if len(s.btrfsSnapshotNames) > 0 {
+		btrfsParent = s.btrfsSnapshotNames[len(s.btrfsSnapshotNames)-1]
+	}
+
+	return s.send(conn, s.runningSnapName, btrfsParent)
+}
+
+func (s *btrfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error {
+	tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", s.container.Name(), uuid.NewRandom().String()), true)
+	err := os.MkdirAll(tmpPath, 0700)
+	if err != nil {
+		return err
+	}
+
+	s.stoppedSnapName = fmt.Sprintf("%s/.root", tmpPath)
+	if err := s.btrfs.subvolSnapshot(s.container.Path(), s.stoppedSnapName, true); err != nil {
+		return err
 	}
 
-	sources = append(sources, btrfsMigrationSource{c.Name(), true, btrfsPath, btrfsParent, s})
+	return s.send(conn, s.stoppedSnapName, s.runningSnapName)
+}
+
+func (s *btrfsMigrationSourceDriver) Cleanup() {
+	if s.stoppedSnapName != "" {
+		s.btrfs.subvolDelete(s.stoppedSnapName)
+	}
+
+	if s.runningSnapName != "" {
+		s.btrfs.subvolDelete(s.runningSnapName)
+	}
+}
 
-	return sources, nil
+func (s *storageBtrfs) MigrationType() MigrationFSType {
+	if runningInUserns {
+		return MigrationFSType_RSYNC
+	} else {
+		return MigrationFSType_BTRFS
+	}
 }
 
-func (s *storageBtrfs) MigrationSink(container container, snapshots []container, conn *websocket.Conn) error {
+func (s *storageBtrfs) MigrationSource(c container) (MigrationStorageSourceDriver, error) {
 	if runningInUserns {
-		return rsyncMigrationSink(container, snapshots, conn)
+		return rsyncMigrationSource(c)
+	}
+
+	/* List all the snapshots in order of reverse creation. The idea here
+	 * is that we send the oldest to newest snapshot, hopefully saving on
+	 * xfer costs. Then, after all that, we send the container itself.
+	 */
+	snapshots, err := c.Snapshots()
+	if err != nil {
+		return nil, err
+	}
+
+	driver := &btrfsMigrationSourceDriver{
+		container:          c,
+		snapshots:          snapshots,
+		btrfsSnapshotNames: []string{},
+		btrfs:              s,
+	}
+
+	for _, snap := range snapshots {
+		btrfsPath := snap.Path()
+		driver.btrfsSnapshotNames = append(driver.btrfsSnapshotNames, btrfsPath)
+	}
+
+	return driver, nil
+}
+
+func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error {
+	if runningInUserns {
+		return rsyncMigrationSink(live, container, snapshots, conn)
 	}
 
 	cName := container.Name()
@@ -1041,6 +1051,12 @@ func (s *storageBtrfs) MigrationSink(container container, snapshots []container,
 		return err
 	}
 
+	if live {
+		if err := btrfsRecv(containerPath(cName, true), container.Path(), false); err != nil {
+			return err
+		}
+	}
+
 	// Cleanup
 	if ok, _ := shared.PathIsEmpty(snapshotsPath); ok {
 		err := os.Remove(snapshotsPath)
diff --git a/lxd/storage_dir.go b/lxd/storage_dir.go
index 98d5320..5126c34 100644
--- a/lxd/storage_dir.go
+++ b/lxd/storage_dir.go
@@ -266,10 +266,10 @@ func (s *storageDir) MigrationType() MigrationFSType {
 	return MigrationFSType_RSYNC
 }
 
-func (s *storageDir) MigrationSource(container container) ([]MigrationStorageSource, error) {
+func (s *storageDir) MigrationSource(container container) (MigrationStorageSourceDriver, error) {
 	return rsyncMigrationSource(container)
 }
 
-func (s *storageDir) MigrationSink(container container, snapshots []container, conn *websocket.Conn) error {
-	return rsyncMigrationSink(container, snapshots, conn)
+func (s *storageDir) MigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error {
+	return rsyncMigrationSink(live, container, snapshots, conn)
 }
diff --git a/lxd/storage_lvm.go b/lxd/storage_lvm.go
index f0fd62e..cb4f066 100644
--- a/lxd/storage_lvm.go
+++ b/lxd/storage_lvm.go
@@ -1056,10 +1056,10 @@ func (s *storageLvm) MigrationType() MigrationFSType {
 	return MigrationFSType_RSYNC
 }
 
-func (s *storageLvm) MigrationSource(container container) ([]MigrationStorageSource, error) {
+func (s *storageLvm) MigrationSource(container container) (MigrationStorageSourceDriver, error) {
 	return rsyncMigrationSource(container)
 }
 
-func (s *storageLvm) MigrationSink(container container, snapshots []container, conn *websocket.Conn) error {
-	return rsyncMigrationSink(container, snapshots, conn)
+func (s *storageLvm) MigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error {
+	return rsyncMigrationSink(live, container, snapshots, conn)
 }
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index ec6cdb1..d06f5cc 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -1157,66 +1157,38 @@ func storageZFSSetPoolNameConfig(d *Daemon, poolname string) error {
 	return nil
 }
 
-type zfsMigrationSource struct {
-	lxdName            string
-	deleteAfterSending bool
-	zfsName            string
-	zfsParent          string
-
-	zfs *storageZfs
-}
-
-func (s zfsMigrationSource) Name() string {
-	return s.lxdName
+type zfsMigrationSourceDriver struct {
+	container        container
+	snapshots        []container
+	zfsSnapshotNames []string
+	zfs              *storageZfs
+	runningSnapName  string
+	stoppedSnapName  string
 }
 
-func (s zfsMigrationSource) IsSnapshot() bool {
-	return !s.deleteAfterSending
+func (s *zfsMigrationSourceDriver) Snapshots() []container {
+	return s.snapshots
 }
 
-func (s zfsMigrationSource) Send(conn *websocket.Conn) error {
-	args := []string{"send", fmt.Sprintf("%s/%s", s.zfs.zfsPool, s.zfsName)}
-	if s.zfsParent != "" {
-		args = append(args, "-i", fmt.Sprintf("%s/%s", s.zfs.zfsPool, s.zfsParent))
+func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zfsParent string) error {
+	args := []string{"send", fmt.Sprintf("%s/containers/%s@%s", s.zfs.zfsPool, s.container.Name(), zfsName)}
+	if zfsParent != "" {
+		args = append(args, "-i", fmt.Sprintf("%s/containers/%s@%s", s.zfs.zfsPool, s.container.Name(), zfsParent))
 	}
 
 	cmd := exec.Command("zfs", args...)
 
 	stdout, err := cmd.StdoutPipe()
 	if err != nil {
-		/* If this is not a lxd snapshot, that means it is the root container.
-		 * The way we zfs send a root container is by taking a temporary zfs
-		 * snapshot and sending that, then deleting that snapshot. Here's where
-		 * we delete it.
-		 *
-		 * Note that we can't use a defer here, because zfsDestroy
-		 * takes some time, and defer doesn't block the current
-		 * goroutine. Due to our retry mechanism for network failures
-		 * (and because zfsDestroy takes a while), we might retry
-		 * moving (and thus creating a temporary snapshot) before the
-		 * last one is deleted, resulting in either a snapshot name
-		 * collision if it was fast enough, or an extra snapshot with
-		 * an odd name on the destination side. Instead, we don't use
-		 * defer so we always block until the snapshot is dead.
-		 */
-		if s.deleteAfterSending {
-			s.zfs.zfsDestroy(s.zfsName)
-		}
 		return err
 	}
 
 	stderr, err := cmd.StderrPipe()
 	if err != nil {
-		if s.deleteAfterSending {
-			s.zfs.zfsDestroy(s.zfsName)
-		}
 		return err
 	}
 
 	if err := cmd.Start(); err != nil {
-		if s.deleteAfterSending {
-			s.zfs.zfsDestroy(s.zfsName)
-		}
 		return err
 	}
 
@@ -1231,39 +1203,97 @@ func (s zfsMigrationSource) Send(conn *websocket.Conn) error {
 	if err != nil {
 		shared.Log.Error("problem with zfs send", "output", string(output))
 	}
-	if s.deleteAfterSending {
-		s.zfs.zfsDestroy(s.zfsName)
-	}
+
 	return err
 }
 
+func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
+	if s.container.IsSnapshot() {
+		fields := strings.SplitN(s.container.Name(), shared.SnapshotDelimiter, 2)
+		snapshotName := fmt.Sprintf("containers/%s at snapshot-%s", fields[0], fields[1])
+		return s.send(conn, snapshotName, "")
+	}
+
+	lastSnap := ""
+
+	for i, snap := range s.zfsSnapshotNames {
+
+		prev := ""
+		if i > 0 {
+			prev = s.zfsSnapshotNames[i-1]
+		}
+
+		lastSnap = snap
+
+		if err := s.send(conn, snap, prev); err != nil {
+			return err
+		}
+	}
+
+	s.runningSnapName = fmt.Sprintf("migration-send-%s", uuid.NewRandom().String())
+	if err := s.zfs.zfsSnapshotCreate(fmt.Sprintf("containers/%s", s.container.Name()), s.runningSnapName); err != nil {
+		return err
+	}
+
+	if err := s.send(conn, s.runningSnapName, lastSnap); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (s *zfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error {
+	s.stoppedSnapName = fmt.Sprintf("migration-send-%s", uuid.NewRandom().String())
+	if err := s.zfs.zfsSnapshotCreate(fmt.Sprintf("containers/%s", s.container.Name()), s.stoppedSnapName); err != nil {
+		return err
+	}
+
+	if err := s.send(conn, s.stoppedSnapName, s.runningSnapName); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (s *zfsMigrationSourceDriver) Cleanup() {
+	if s.stoppedSnapName != "" {
+		s.zfs.zfsSnapshotDestroy(fmt.Sprintf("containers/%s", s.container.Name()), s.stoppedSnapName)
+	}
+
+	if s.runningSnapName != "" {
+		s.zfs.zfsSnapshotDestroy(fmt.Sprintf("containers/%s", s.container.Name()), s.runningSnapName)
+	}
+}
+
 func (s *storageZfs) MigrationType() MigrationFSType {
 	return MigrationFSType_ZFS
 }
 
-func (s *storageZfs) MigrationSource(container container) ([]MigrationStorageSource, error) {
-	sources := []MigrationStorageSource{}
-
+func (s *storageZfs) MigrationSource(ct container) (MigrationStorageSourceDriver, error) {
 	/* If the container is a snapshot, let's just send that; we don't need
 	 * to send anything else, because that's all the user asked for.
 	 */
-	if container.IsSnapshot() {
-		fields := strings.SplitN(container.Name(), shared.SnapshotDelimiter, 2)
-		snapshotName := fmt.Sprintf("containers/%s at snapshot-%s", fields[0], fields[1])
-		sources = append(sources, zfsMigrationSource{container.Name(), false, snapshotName, "", s})
-		return sources, nil
+	if ct.IsSnapshot() {
+		return &zfsMigrationSourceDriver{container: ct}, nil
+	}
+
+	driver := zfsMigrationSourceDriver{
+		container:        ct,
+		snapshots:        []container{},
+		zfsSnapshotNames: []string{},
+		zfs:              s,
 	}
 
 	/* List all the snapshots in order of reverse creation. The idea here
 	 * is that we send the oldest to newest snapshot, hopefully saving on
 	 * xfer costs. Then, after all that, we send the container itself.
 	 */
-	snapshots, err := s.zfsListSnapshots(fmt.Sprintf("containers/%s", container.Name()))
+	snapshots, err := s.zfsListSnapshots(fmt.Sprintf("containers/%s", ct.Name()))
 	if err != nil {
 		return nil, err
 	}
 
-	for i, snap := range snapshots {
+	for _, snap := range snapshots {
 		/* In the case of e.g. multiple copies running at the same
 		 * time, we will have potentially multiple migration-send
 		 * snapshots. (Or in the case of the test suite, sometimes one
@@ -1273,41 +1303,22 @@ func (s *storageZfs) MigrationSource(container container) ([]MigrationStorageSou
 			continue
 		}
 
-		prev := ""
-		if i > 0 {
-			prev = snapshots[i-1]
-		}
+		lxdName := fmt.Sprintf("%s%s%s", ct.Name(), shared.SnapshotDelimiter, snap[len("snapshot-"):])
+		zfsName := fmt.Sprintf("containers/%s@%s", ct.Name(), snap)
 
-		lxdName := fmt.Sprintf("%s%s%s", container.Name(), shared.SnapshotDelimiter, snap[len("snapshot-"):])
-		zfsName := fmt.Sprintf("containers/%s@%s", container.Name(), snap)
-		parentName := ""
-		if prev != "" {
-			parentName = fmt.Sprintf("containers/%s@%s", container.Name(), prev)
+		snapshot, err := containerLoadByName(s.d, lxdName)
+		if err != nil {
+			return nil, err
 		}
 
-		sources = append(sources, zfsMigrationSource{lxdName, false, zfsName, parentName, s})
+		driver.snapshots = append(driver.snapshots, snapshot)
+		driver.zfsSnapshotNames = append(driver.zfsSnapshotNames, zfsName)
 	}
 
-	/* We can't send running fses, so let's snapshot the fs and send
-	 * the snapshot.
-	 */
-	snapshotName := fmt.Sprintf("migration-send-%s", uuid.NewRandom().String())
-	if err := s.zfsSnapshotCreate(fmt.Sprintf("containers/%s", container.Name()), snapshotName); err != nil {
-		return nil, err
-	}
-
-	zfsName := fmt.Sprintf("containers/%s@%s", container.Name(), snapshotName)
-	zfsParent := ""
-	if len(sources) > 0 {
-		zfsParent = sources[len(sources)-1].(zfsMigrationSource).zfsName
-	}
-
-	sources = append(sources, zfsMigrationSource{container.Name(), true, zfsName, zfsParent, s})
-
-	return sources, nil
+	return &driver, nil
 }
 
-func (s *storageZfs) MigrationSink(container container, snapshots []container, conn *websocket.Conn) error {
+func (s *storageZfs) MigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error {
 	zfsRecv := func(zfsName string) error {
 		zfsFsName := fmt.Sprintf("%s/%s", s.zfsPool, zfsName)
 		args := []string{"receive", "-F", "-u", zfsFsName}
@@ -1384,11 +1395,35 @@ func (s *storageZfs) MigrationSink(container container, snapshots []container, c
 		}
 	}
 
+	defer func() {
+		/* clean up our migration-send snapshots that we got from recv. */
+		snapshots, err := s.zfsListSnapshots(fmt.Sprintf("containers/%s", container.Name()))
+		if err != nil {
+			shared.Log.Error("failed listing snapshots post migration", "err", err)
+			return
+		}
+
+		for _, snap := range snapshots {
+			if !strings.HasPrefix(snap, "migration-send") {
+				continue
+			}
+
+			s.zfsSnapshotDestroy(fmt.Sprintf("containers/%s", container.Name()), snap)
+		}
+	}()
+
 	/* finally, do the real container */
 	if err := zfsRecv(zfsName); err != nil {
 		return err
 	}
 
+	if live {
+		/* and again for the post-running snapshot if this was a live migration */
+		if err := zfsRecv(zfsName); err != nil {
+			return err
+		}
+	}
+
 	/* Sometimes, zfs recv mounts this anyway, even if we pass -u
 	 * (https://forums.freebsd.org/threads/zfs-receive-u-shouldnt-mount-received-filesystem-right.36844/)
 	 * but sometimes it doesn't. Let's try to mount, but not complain about


More information about the lxc-devel mailing list