[lxc-devel] [lxd/master] migration: attempt to be slightly smart about moving filesystems
tych0 on Github
lxc-bot at linuxcontainers.org
Tue Mar 8 18:39:10 UTC 2016
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 646 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20160308/f1de5eb5/attachment.bin>
-------------- next part --------------
From 390975ac7157ed9c229f63c7a53ba625ee86cc18 Mon Sep 17 00:00:00 2001
From: Tycho Andersen <tycho.andersen at canonical.com>
Date: Thu, 3 Mar 2016 07:48:19 -0700
Subject: [PATCH] migration: attempt to be slightly smart about moving
filesystems
This breaks up live migrations into two parts: first we send a snapshot of
the filesystem while the container is running (and all of its snapshots),
then we do the checkpoint and do a final incremental send. The goal here is
to minimize the amount of downtime for the container.
Signed-off-by: Tycho Andersen <tycho.andersen at canonical.com>
---
lxd/migrate.go | 163 ++++++++++++++++++++---------------------
lxd/storage.go | 86 ++++++++++++++--------
lxd/storage_btrfs.go | 192 ++++++++++++++++++++++++++-----------------------
lxd/storage_dir.go | 6 +-
lxd/storage_lvm.go | 6 +-
lxd/storage_zfs.go | 199 ++++++++++++++++++++++++++++++---------------------
6 files changed, 368 insertions(+), 284 deletions(-)
diff --git a/lxd/migrate.go b/lxd/migrate.go
index e6ec1a1..0150e66 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -299,25 +299,15 @@ func (s *migrationSourceWs) Do(op *operation) error {
}
}
- sources, fsErr := s.container.Storage().MigrationSource(s.container)
+ driver, fsErr := s.container.Storage().MigrationSource(s.container)
/* the protocol says we have to send a header no matter what, so let's
* do that, but then immediately send an error.
*/
snapshots := []string{}
if fsErr == nil {
- /* A bit of a special case here: doing lxc launch
- * host2:c1/snap1 host1:container we're sending a snapshot, but
- * it ends up as the container on the other end. So, we want to
- * send it as the main container (i.e. ignore its IsSnapshot()).
- */
- if len(sources) > 1 {
- for _, snap := range sources {
- if !snap.IsSnapshot() {
- continue
- }
- name := shared.ExtractSnapshotName(snap.Name())
- snapshots = append(snapshots, name)
- }
+ fullSnaps := driver.Snapshots()
+ for _, snap := range fullSnaps {
+ snapshots = append(snapshots, shared.ExtractSnapshotName(snap.Name()))
}
}
@@ -348,7 +338,14 @@ func (s *migrationSourceWs) Do(op *operation) error {
myType = MigrationFSType_RSYNC
header.Fs = &myType
- sources, _ = rsyncMigrationSource(s.container)
+ driver, _ = rsyncMigrationSource(s.container)
+ }
+
+ defer driver.Cleanup()
+
+ if err := driver.SendWhileRunning(s.fsConn); err != nil {
+ s.sendControl(err)
+ return err
}
if s.live {
@@ -402,11 +399,8 @@ func (s *migrationSourceWs) Do(op *operation) error {
s.sendControl(err)
return err
}
- }
- for _, source := range sources {
- shared.Debugf("sending fs object %s", source.Name())
- if err := source.Send(s.fsConn); err != nil {
+ if err := driver.SendAfterCheckpoint(s.fsConn); err != nil {
s.sendControl(err)
return err
}
@@ -536,12 +530,68 @@ func (c *migrationSink) do() error {
imagesDir := ""
srcIdmap := new(shared.IdmapSet)
+ snapshots := []container{}
+ for _, snap := range header.Snapshots {
+ // TODO: we need to propagate snapshot configurations
+ // as well. Right now the container configuration is
+ // done through the initial migration post. Should we
+ // post the snapshots and their configs as well, or do
+ // it some other way?
+ name := c.container.Name() + shared.SnapshotDelimiter + snap
+ args := containerArgs{
+ Ctype: cTypeSnapshot,
+ Config: c.container.LocalConfig(),
+ Profiles: c.container.Profiles(),
+ Ephemeral: c.container.IsEphemeral(),
+ Architecture: c.container.Architecture(),
+ Devices: c.container.LocalDevices(),
+ Name: name,
+ }
+
+ ct, err := containerCreateEmptySnapshot(c.container.Daemon(), args)
+ if err != nil {
+ restore <- err
+ return
+ }
+ snapshots = append(snapshots, ct)
+ }
+
+ for _, idmap := range header.Idmap {
+ e := shared.IdmapEntry{
+ Isuid: *idmap.Isuid,
+ Isgid: *idmap.Isgid,
+ Nsid: int(*idmap.Nsid),
+ Hostid: int(*idmap.Hostid),
+ Maprange: int(*idmap.Maprange)}
+ srcIdmap.Idmap = shared.Extend(srcIdmap.Idmap, e)
+ }
+
+ /* We do the fs receive in parallel so we don't have to reason
+ * about when to receive what. The sending side is smart enough
+ * to send the filesystem bits that it can before it seizes the
+ * container to start checkpointing, so the total transfer time
+ * will be minimized even if we're dumb here.
+ */
+ fsTransfer := make(chan error)
+ go func() {
+ if err := mySink(c.live, c.container, snapshots, c.fsConn); err != nil {
+ fsTransfer <- err
+ return
+ }
+
+ if err := ShiftIfNecessary(c.container, srcIdmap); err != nil {
+ fsTransfer <- err
+ return
+ }
+
+ fsTransfer <- nil
+ }()
+
if c.live {
var err error
imagesDir, err = ioutil.TempDir("", "lxd_restore_")
if err != nil {
os.RemoveAll(imagesDir)
- c.sendControl(err)
return
}
@@ -560,8 +610,6 @@ func (c *migrationSink) do() error {
if err := RsyncRecv(shared.AddSlash(imagesDir), c.criuConn); err != nil {
restore <- err
- os.RemoveAll(imagesDir)
- c.sendControl(err)
return
}
@@ -574,70 +622,17 @@ func (c *migrationSink) do() error {
if !c.container.IsPrivileged() {
if err := c.container.IdmapSet().ShiftRootfs(imagesDir); err != nil {
restore <- err
- os.RemoveAll(imagesDir)
- c.sendControl(err)
return
}
}
}
- snapshots := []container{}
- for _, snap := range header.Snapshots {
- // TODO: we need to propagate snapshot configurations
- // as well. Right now the container configuration is
- // done through the initial migration post. Should we
- // post the snapshots and their configs as well, or do
- // it some other way?
- name := c.container.Name() + shared.SnapshotDelimiter + snap
- args := containerArgs{
- Ctype: cTypeSnapshot,
- Config: c.container.LocalConfig(),
- Profiles: c.container.Profiles(),
- Ephemeral: c.container.IsEphemeral(),
- Architecture: c.container.Architecture(),
- Devices: c.container.LocalDevices(),
- Name: name,
- }
-
- ct, err := containerCreateEmptySnapshot(c.container.Daemon(), args)
- if err != nil {
- restore <- err
- c.sendControl(err)
- return
- }
- snapshots = append(snapshots, ct)
- }
-
- for _, idmap := range header.Idmap {
- e := shared.IdmapEntry{
- Isuid: *idmap.Isuid,
- Isgid: *idmap.Isgid,
- Nsid: int(*idmap.Nsid),
- Hostid: int(*idmap.Hostid),
- Maprange: int(*idmap.Maprange)}
- srcIdmap.Idmap = shared.Extend(srcIdmap.Idmap, e)
- }
-
- if err := mySink(c.container, snapshots, c.fsConn); err != nil {
- restore <- err
- c.sendControl(err)
- return
- }
-
- if err := ShiftIfNecessary(c.container, srcIdmap); err != nil {
+ err := <-fsTransfer
+ if err != nil {
restore <- err
- c.sendControl(err)
return
}
- for _, snap := range snapshots {
- if err := ShiftIfNecessary(snap, srcIdmap); err != nil {
- restore <- err
- c.sendControl(err)
- return
- }
- }
-
if c.live {
err := c.container.StartFromMigration(imagesDir)
if err != nil {
@@ -648,12 +643,20 @@ func (c *migrationSink) do() error {
log = err.Error()
}
err = fmt.Errorf("restore failed:\n%s", log)
+ restore <- err
+ return
}
- restore <- err
- } else {
- restore <- nil
}
+
+ for _, snap := range snapshots {
+ if err := ShiftIfNecessary(snap, srcIdmap); err != nil {
+ restore <- err
+ return
+ }
+ }
+
+ restore <- nil
}(c)
source := c.controlChannel()
diff --git a/lxd/storage.go b/lxd/storage.go
index 2af685a..5b3863a 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -109,10 +109,26 @@ func storageTypeToString(sType storageType) string {
return "dir"
}
-type MigrationStorageSource interface {
- Name() string
- IsSnapshot() bool
- Send(conn *websocket.Conn) error
+type MigrationStorageSourceDriver interface {
+ /* snapshots for this container, if any */
+ Snapshots() []container
+
+ /* send any bits of the container/snapshots that are possible while the
+ * container is still running.
+ */
+ SendWhileRunning(conn *websocket.Conn) error
+
+ /* send the final bits (e.g. a final delta snapshot for zfs, btrfs, or
+ * do a final rsync) of the fs after the container has been
+ * checkpointed. This will only be called when a container is actually
+ * being live migrated.
+ */
+ SendAfterCheckpoint(conn *websocket.Conn) error
+
+ /* Called after either success or failure of a migration, can be used
+ * to clean up any temporary snapshots, etc.
+ */
+ Cleanup()
}
type storage interface {
@@ -170,8 +186,8 @@ type storage interface {
// We leave sending containers which are snapshots of other containers
// already present on the target instance as an exercise for the
// enterprising developer.
- MigrationSource(container container) ([]MigrationStorageSource, error)
- MigrationSink(container container, objects []container, conn *websocket.Conn) error
+ MigrationSource(container container) (MigrationStorageSourceDriver, error)
+ MigrationSink(live bool, container container, objects []container, conn *websocket.Conn) error
}
func newStorage(d *Daemon, sType storageType) (storage, error) {
@@ -521,23 +537,24 @@ func (lw *storageLogWrapper) MigrationType() MigrationFSType {
return lw.w.MigrationType()
}
-func (lw *storageLogWrapper) MigrationSource(container container) ([]MigrationStorageSource, error) {
+func (lw *storageLogWrapper) MigrationSource(container container) (MigrationStorageSourceDriver, error) {
lw.log.Debug("MigrationSource", log.Ctx{"container": container.Name()})
return lw.w.MigrationSource(container)
}
-func (lw *storageLogWrapper) MigrationSink(container container, objects []container, conn *websocket.Conn) error {
+func (lw *storageLogWrapper) MigrationSink(live bool, container container, objects []container, conn *websocket.Conn) error {
objNames := []string{}
for _, obj := range objects {
objNames = append(objNames, obj.Name())
}
lw.log.Debug("MigrationSink", log.Ctx{
+ "live": live,
"container": container.Name(),
"objects": objNames,
})
- return lw.w.MigrationSink(container, objects, conn)
+ return lw.w.MigrationSink(live, container, objects, conn)
}
func ShiftIfNecessary(container container, srcIdmap *shared.IdmapSet) error {
@@ -567,41 +584,47 @@ func ShiftIfNecessary(container container, srcIdmap *shared.IdmapSet) error {
return nil
}
-type rsyncStorageSource struct {
+type rsyncStorageSourceDriver struct {
container container
+ snapshots []container
}
-func (s *rsyncStorageSource) Name() string {
- return s.container.Name()
+func (s rsyncStorageSourceDriver) Snapshots() []container {
+ return s.snapshots
}
-func (s *rsyncStorageSource) IsSnapshot() bool {
- return s.container.IsSnapshot()
+func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
+ toSend := append([]container{s.container}, s.snapshots...)
+
+ for _, send := range toSend {
+ path := send.Path()
+ if err := RsyncSend(shared.AddSlash(path), conn); err != nil {
+ return err
+ }
+ }
+
+ return nil
}
-func (s *rsyncStorageSource) Send(conn *websocket.Conn) error {
- path := s.container.Path()
- return RsyncSend(shared.AddSlash(path), conn)
+func (s rsyncStorageSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error {
+ /* resync anything that changed between our first send and the checkpoint */
+ return RsyncSend(shared.AddSlash(s.container.Path()), conn)
}
-func rsyncMigrationSource(container container) ([]MigrationStorageSource, error) {
- sources := []MigrationStorageSource{}
+func (s rsyncStorageSourceDriver) Cleanup() {
+ /* no-op */
+}
- /* transfer the container, and then all the snapshots */
- sources = append(sources, &rsyncStorageSource{container})
- snaps, err := container.Snapshots()
+func rsyncMigrationSource(container container) (MigrationStorageSourceDriver, error) {
+ snapshots, err := container.Snapshots()
if err != nil {
return nil, err
}
- for _, snap := range snaps {
- sources = append(sources, &rsyncStorageSource{snap})
- }
-
- return sources, nil
+ return rsyncStorageSourceDriver{container, snapshots}, nil
}
-func rsyncMigrationSink(container container, snapshots []container, conn *websocket.Conn) error {
+func rsyncMigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error {
/* the first object is the actual container */
if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
return err
@@ -620,5 +643,12 @@ func rsyncMigrationSink(container container, snapshots []container, conn *websoc
}
}
+ if live {
+ /* now receive the final sync */
+ if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
+ return err
+ }
+ }
+
return nil
}
diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go
index e4ca28f..e648e90 100644
--- a/lxd/storage_btrfs.go
+++ b/lxd/storage_btrfs.go
@@ -807,56 +807,38 @@ func (s *storageBtrfs) getSubVolumes(path string) ([]string, error) {
return result, nil
}
-type btrfsMigrationSource struct {
- lxdName string
- deleteAfterSending bool
- btrfsPath string
- btrfsParent string
-
- btrfs *storageBtrfs
-}
-
-func (s btrfsMigrationSource) Name() string {
- return s.lxdName
+type btrfsMigrationSourceDriver struct {
+ container container
+ snapshots []container
+ btrfsSnapshotNames []string
+ btrfs *storageBtrfs
+ runningSnapName string
+ stoppedSnapName string
}
-func (s btrfsMigrationSource) IsSnapshot() bool {
- return !s.deleteAfterSending
+func (s *btrfsMigrationSourceDriver) Snapshots() []container {
+ return s.snapshots
}
-func (s btrfsMigrationSource) Send(conn *websocket.Conn) error {
- args := []string{"send", s.btrfsPath}
- if s.btrfsParent != "" {
- args = append(args, "-p", s.btrfsParent)
+func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string, btrfsParent string) error {
+ args := []string{"send", btrfsPath}
+ if btrfsParent != "" {
+ args = append(args, "-p", btrfsParent)
}
cmd := exec.Command("btrfs", args...)
- deleteAfterSending := func(path string) {
- s.btrfs.subvolsDelete(path)
- os.Remove(filepath.Dir(path))
- }
-
stdout, err := cmd.StdoutPipe()
if err != nil {
- if s.deleteAfterSending {
- deleteAfterSending(s.btrfsPath)
- }
return err
}
stderr, err := cmd.StderrPipe()
if err != nil {
- if s.deleteAfterSending {
- deleteAfterSending(s.btrfsPath)
- }
return err
}
if err := cmd.Start(); err != nil {
- if s.deleteAfterSending {
- deleteAfterSending(s.btrfsPath)
- }
return err
}
@@ -871,97 +853,125 @@ func (s btrfsMigrationSource) Send(conn *websocket.Conn) error {
if err != nil {
shared.Log.Error("problem with btrfs send", "output", string(output))
}
- if s.deleteAfterSending {
- deleteAfterSending(s.btrfsPath)
- }
return err
}
-func (s *storageBtrfs) MigrationType() MigrationFSType {
- if runningInUserns {
- return MigrationFSType_RSYNC
- } else {
- return MigrationFSType_BTRFS
- }
-}
-
-func (s *storageBtrfs) MigrationSource(c container) ([]MigrationStorageSource, error) {
- if runningInUserns {
- return rsyncMigrationSource(c)
- }
-
- sources := []MigrationStorageSource{}
-
- /* If the container is a snapshot, let's just send that; we don't need
- * to send anything else, because that's all the user asked for.
- */
- if c.IsSnapshot() {
- tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", c.Name(), uuid.NewRandom().String()), true)
+func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
+ if s.container.IsSnapshot() {
+ tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", s.container.Name(), uuid.NewRandom().String()), true)
err := os.MkdirAll(tmpPath, 0700)
if err != nil {
- return nil, err
+ return err
}
btrfsPath := fmt.Sprintf("%s/.root", tmpPath)
- if err := s.subvolSnapshot(c.Path(), btrfsPath, true); err != nil {
- return nil, err
+ if err := s.btrfs.subvolSnapshot(s.container.Path(), btrfsPath, true); err != nil {
+ return err
}
- sources = append(sources, btrfsMigrationSource{c.Name(), true, btrfsPath, "", s})
- return sources, nil
- }
+ defer s.btrfs.subvolDelete(btrfsPath)
- /* List all the snapshots in order of reverse creation. The idea here
- * is that we send the oldest to newest snapshot, hopefully saving on
- * xfer costs. Then, after all that, we send the container itself.
- */
- snapshots, err := c.Snapshots()
- if err != nil {
- return nil, err
+ return s.send(conn, btrfsPath, "")
}
- for i, snap := range snapshots {
- var prev container
+ for i, snap := range s.snapshots {
+ prev := ""
if i > 0 {
- prev = snapshots[i-1]
+ prev = s.snapshots[i-1].Path()
}
- btrfsPath := snap.Path()
- parentName := ""
- if prev != nil {
- parentName = prev.Path()
+ if err := s.send(conn, snap.Path(), prev); err != nil {
+ return err
}
-
- sources = append(sources, btrfsMigrationSource{snap.Name(), false, btrfsPath, parentName, s})
}
/* We can't send running fses, so let's snapshot the fs and send
* the snapshot.
*/
- tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", c.Name(), uuid.NewRandom().String()), true)
- err = os.MkdirAll(tmpPath, 0700)
+ tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", s.container.Name(), uuid.NewRandom().String()), true)
+ err := os.MkdirAll(tmpPath, 0700)
if err != nil {
- return nil, err
+ return err
}
- btrfsPath := fmt.Sprintf("%s/.root", tmpPath)
- if err := s.subvolSnapshot(c.Path(), btrfsPath, true); err != nil {
- return nil, err
+ s.runningSnapName = fmt.Sprintf("%s/.root", tmpPath)
+ if err := s.btrfs.subvolSnapshot(s.container.Path(), s.runningSnapName, true); err != nil {
+ return err
}
btrfsParent := ""
- if len(sources) > 0 {
- btrfsParent = sources[len(sources)-1].(btrfsMigrationSource).btrfsPath
+ if len(s.btrfsSnapshotNames) > 0 {
+ btrfsParent = s.btrfsSnapshotNames[len(s.btrfsSnapshotNames)-1]
+ }
+
+ return s.send(conn, s.runningSnapName, btrfsParent)
+}
+
+func (s *btrfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error {
+ tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", s.container.Name(), uuid.NewRandom().String()), true)
+ err := os.MkdirAll(tmpPath, 0700)
+ if err != nil {
+ return err
+ }
+
+ s.stoppedSnapName = fmt.Sprintf("%s/.root", tmpPath)
+ if err := s.btrfs.subvolSnapshot(s.container.Path(), s.stoppedSnapName, true); err != nil {
+ return err
}
- sources = append(sources, btrfsMigrationSource{c.Name(), true, btrfsPath, btrfsParent, s})
+ return s.send(conn, s.stoppedSnapName, s.runningSnapName)
+}
+
+func (s *btrfsMigrationSourceDriver) Cleanup() {
+ if s.stoppedSnapName != "" {
+ s.btrfs.subvolDelete(s.stoppedSnapName)
+ }
+
+ if s.runningSnapName != "" {
+ s.btrfs.subvolDelete(s.runningSnapName)
+ }
+}
- return sources, nil
+func (s *storageBtrfs) MigrationType() MigrationFSType {
+ if runningInUserns {
+ return MigrationFSType_RSYNC
+ } else {
+ return MigrationFSType_BTRFS
+ }
}
-func (s *storageBtrfs) MigrationSink(container container, snapshots []container, conn *websocket.Conn) error {
+func (s *storageBtrfs) MigrationSource(c container) (MigrationStorageSourceDriver, error) {
if runningInUserns {
- return rsyncMigrationSink(container, snapshots, conn)
+ return rsyncMigrationSource(c)
+ }
+
+ /* List all the snapshots in order of reverse creation. The idea here
+ * is that we send the oldest to newest snapshot, hopefully saving on
+ * xfer costs. Then, after all that, we send the container itself.
+ */
+ snapshots, err := c.Snapshots()
+ if err != nil {
+ return nil, err
+ }
+
+ driver := &btrfsMigrationSourceDriver{
+ container: c,
+ snapshots: snapshots,
+ btrfsSnapshotNames: []string{},
+ btrfs: s,
+ }
+
+ for _, snap := range snapshots {
+ btrfsPath := snap.Path()
+ driver.btrfsSnapshotNames = append(driver.btrfsSnapshotNames, btrfsPath)
+ }
+
+ return driver, nil
+}
+
+func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error {
+ if runningInUserns {
+ return rsyncMigrationSink(live, container, snapshots, conn)
}
cName := container.Name()
@@ -1041,6 +1051,12 @@ func (s *storageBtrfs) MigrationSink(container container, snapshots []container,
return err
}
+ if live {
+ if err := btrfsRecv(containerPath(cName, true), container.Path(), false); err != nil {
+ return err
+ }
+ }
+
// Cleanup
if ok, _ := shared.PathIsEmpty(snapshotsPath); ok {
err := os.Remove(snapshotsPath)
diff --git a/lxd/storage_dir.go b/lxd/storage_dir.go
index 98d5320..5126c34 100644
--- a/lxd/storage_dir.go
+++ b/lxd/storage_dir.go
@@ -266,10 +266,10 @@ func (s *storageDir) MigrationType() MigrationFSType {
return MigrationFSType_RSYNC
}
-func (s *storageDir) MigrationSource(container container) ([]MigrationStorageSource, error) {
+func (s *storageDir) MigrationSource(container container) (MigrationStorageSourceDriver, error) {
return rsyncMigrationSource(container)
}
-func (s *storageDir) MigrationSink(container container, snapshots []container, conn *websocket.Conn) error {
- return rsyncMigrationSink(container, snapshots, conn)
+func (s *storageDir) MigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error {
+ return rsyncMigrationSink(live, container, snapshots, conn)
}
diff --git a/lxd/storage_lvm.go b/lxd/storage_lvm.go
index f0fd62e..cb4f066 100644
--- a/lxd/storage_lvm.go
+++ b/lxd/storage_lvm.go
@@ -1056,10 +1056,10 @@ func (s *storageLvm) MigrationType() MigrationFSType {
return MigrationFSType_RSYNC
}
-func (s *storageLvm) MigrationSource(container container) ([]MigrationStorageSource, error) {
+func (s *storageLvm) MigrationSource(container container) (MigrationStorageSourceDriver, error) {
return rsyncMigrationSource(container)
}
-func (s *storageLvm) MigrationSink(container container, snapshots []container, conn *websocket.Conn) error {
- return rsyncMigrationSink(container, snapshots, conn)
+func (s *storageLvm) MigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error {
+ return rsyncMigrationSink(live, container, snapshots, conn)
}
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index ec6cdb1..d06f5cc 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -1157,66 +1157,38 @@ func storageZFSSetPoolNameConfig(d *Daemon, poolname string) error {
return nil
}
-type zfsMigrationSource struct {
- lxdName string
- deleteAfterSending bool
- zfsName string
- zfsParent string
-
- zfs *storageZfs
-}
-
-func (s zfsMigrationSource) Name() string {
- return s.lxdName
+type zfsMigrationSourceDriver struct {
+ container container
+ snapshots []container
+ zfsSnapshotNames []string
+ zfs *storageZfs
+ runningSnapName string
+ stoppedSnapName string
}
-func (s zfsMigrationSource) IsSnapshot() bool {
- return !s.deleteAfterSending
+func (s *zfsMigrationSourceDriver) Snapshots() []container {
+ return s.snapshots
}
-func (s zfsMigrationSource) Send(conn *websocket.Conn) error {
- args := []string{"send", fmt.Sprintf("%s/%s", s.zfs.zfsPool, s.zfsName)}
- if s.zfsParent != "" {
- args = append(args, "-i", fmt.Sprintf("%s/%s", s.zfs.zfsPool, s.zfsParent))
+func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zfsParent string) error {
+ args := []string{"send", fmt.Sprintf("%s/containers/%s@%s", s.zfs.zfsPool, s.container.Name(), zfsName)}
+ if zfsParent != "" {
+ args = append(args, "-i", fmt.Sprintf("%s/containers/%s@%s", s.zfs.zfsPool, s.container.Name(), zfsParent))
}
cmd := exec.Command("zfs", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
- /* If this is not a lxd snapshot, that means it is the root container.
- * The way we zfs send a root container is by taking a temporary zfs
- * snapshot and sending that, then deleting that snapshot. Here's where
- * we delete it.
- *
- * Note that we can't use a defer here, because zfsDestroy
- * takes some time, and defer doesn't block the current
- * goroutine. Due to our retry mechanism for network failures
- * (and because zfsDestroy takes a while), we might retry
- * moving (and thus creating a temporary snapshot) before the
- * last one is deleted, resulting in either a snapshot name
- * collision if it was fast enough, or an extra snapshot with
- * an odd name on the destination side. Instead, we don't use
- * defer so we always block until the snapshot is dead.
- */
- if s.deleteAfterSending {
- s.zfs.zfsDestroy(s.zfsName)
- }
return err
}
stderr, err := cmd.StderrPipe()
if err != nil {
- if s.deleteAfterSending {
- s.zfs.zfsDestroy(s.zfsName)
- }
return err
}
if err := cmd.Start(); err != nil {
- if s.deleteAfterSending {
- s.zfs.zfsDestroy(s.zfsName)
- }
return err
}
@@ -1231,39 +1203,97 @@ func (s zfsMigrationSource) Send(conn *websocket.Conn) error {
if err != nil {
shared.Log.Error("problem with zfs send", "output", string(output))
}
- if s.deleteAfterSending {
- s.zfs.zfsDestroy(s.zfsName)
- }
+
return err
}
+func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
+ if s.container.IsSnapshot() {
+ fields := strings.SplitN(s.container.Name(), shared.SnapshotDelimiter, 2)
+ snapshotName := fmt.Sprintf("containers/%s at snapshot-%s", fields[0], fields[1])
+ return s.send(conn, snapshotName, "")
+ }
+
+ lastSnap := ""
+
+ for i, snap := range s.zfsSnapshotNames {
+
+ prev := ""
+ if i > 0 {
+ prev = s.zfsSnapshotNames[i-1]
+ }
+
+ lastSnap = snap
+
+ if err := s.send(conn, snap, prev); err != nil {
+ return err
+ }
+ }
+
+ s.runningSnapName = fmt.Sprintf("migration-send-%s", uuid.NewRandom().String())
+ if err := s.zfs.zfsSnapshotCreate(fmt.Sprintf("containers/%s", s.container.Name()), s.runningSnapName); err != nil {
+ return err
+ }
+
+ if err := s.send(conn, s.runningSnapName, lastSnap); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (s *zfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error {
+ s.stoppedSnapName = fmt.Sprintf("migration-send-%s", uuid.NewRandom().String())
+ if err := s.zfs.zfsSnapshotCreate(fmt.Sprintf("containers/%s", s.container.Name()), s.stoppedSnapName); err != nil {
+ return err
+ }
+
+ if err := s.send(conn, s.stoppedSnapName, s.runningSnapName); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (s *zfsMigrationSourceDriver) Cleanup() {
+ if s.stoppedSnapName != "" {
+ s.zfs.zfsSnapshotDestroy(fmt.Sprintf("containers/%s", s.container.Name()), s.stoppedSnapName)
+ }
+
+ if s.runningSnapName != "" {
+ s.zfs.zfsSnapshotDestroy(fmt.Sprintf("containers/%s", s.container.Name()), s.runningSnapName)
+ }
+}
+
func (s *storageZfs) MigrationType() MigrationFSType {
return MigrationFSType_ZFS
}
-func (s *storageZfs) MigrationSource(container container) ([]MigrationStorageSource, error) {
- sources := []MigrationStorageSource{}
-
+func (s *storageZfs) MigrationSource(ct container) (MigrationStorageSourceDriver, error) {
/* If the container is a snapshot, let's just send that; we don't need
* to send anything else, because that's all the user asked for.
*/
- if container.IsSnapshot() {
- fields := strings.SplitN(container.Name(), shared.SnapshotDelimiter, 2)
- snapshotName := fmt.Sprintf("containers/%s at snapshot-%s", fields[0], fields[1])
- sources = append(sources, zfsMigrationSource{container.Name(), false, snapshotName, "", s})
- return sources, nil
+ if ct.IsSnapshot() {
+ return &zfsMigrationSourceDriver{container: ct}, nil
+ }
+
+ driver := zfsMigrationSourceDriver{
+ container: ct,
+ snapshots: []container{},
+ zfsSnapshotNames: []string{},
+ zfs: s,
}
/* List all the snapshots in order of reverse creation. The idea here
* is that we send the oldest to newest snapshot, hopefully saving on
* xfer costs. Then, after all that, we send the container itself.
*/
- snapshots, err := s.zfsListSnapshots(fmt.Sprintf("containers/%s", container.Name()))
+ snapshots, err := s.zfsListSnapshots(fmt.Sprintf("containers/%s", ct.Name()))
if err != nil {
return nil, err
}
- for i, snap := range snapshots {
+ for _, snap := range snapshots {
/* In the case of e.g. multiple copies running at the same
* time, we will have potentially multiple migration-send
* snapshots. (Or in the case of the test suite, sometimes one
@@ -1273,41 +1303,22 @@ func (s *storageZfs) MigrationSource(container container) ([]MigrationStorageSou
continue
}
- prev := ""
- if i > 0 {
- prev = snapshots[i-1]
- }
+ lxdName := fmt.Sprintf("%s%s%s", ct.Name(), shared.SnapshotDelimiter, snap[len("snapshot-"):])
+ zfsName := fmt.Sprintf("containers/%s@%s", ct.Name(), snap)
- lxdName := fmt.Sprintf("%s%s%s", container.Name(), shared.SnapshotDelimiter, snap[len("snapshot-"):])
- zfsName := fmt.Sprintf("containers/%s@%s", container.Name(), snap)
- parentName := ""
- if prev != "" {
- parentName = fmt.Sprintf("containers/%s@%s", container.Name(), prev)
+ snapshot, err := containerLoadByName(s.d, lxdName)
+ if err != nil {
+ return nil, err
}
- sources = append(sources, zfsMigrationSource{lxdName, false, zfsName, parentName, s})
+ driver.snapshots = append(driver.snapshots, snapshot)
+ driver.zfsSnapshotNames = append(driver.zfsSnapshotNames, zfsName)
}
- /* We can't send running fses, so let's snapshot the fs and send
- * the snapshot.
- */
- snapshotName := fmt.Sprintf("migration-send-%s", uuid.NewRandom().String())
- if err := s.zfsSnapshotCreate(fmt.Sprintf("containers/%s", container.Name()), snapshotName); err != nil {
- return nil, err
- }
-
- zfsName := fmt.Sprintf("containers/%s@%s", container.Name(), snapshotName)
- zfsParent := ""
- if len(sources) > 0 {
- zfsParent = sources[len(sources)-1].(zfsMigrationSource).zfsName
- }
-
- sources = append(sources, zfsMigrationSource{container.Name(), true, zfsName, zfsParent, s})
-
- return sources, nil
+ return &driver, nil
}
-func (s *storageZfs) MigrationSink(container container, snapshots []container, conn *websocket.Conn) error {
+func (s *storageZfs) MigrationSink(live bool, container container, snapshots []container, conn *websocket.Conn) error {
zfsRecv := func(zfsName string) error {
zfsFsName := fmt.Sprintf("%s/%s", s.zfsPool, zfsName)
args := []string{"receive", "-F", "-u", zfsFsName}
@@ -1384,11 +1395,35 @@ func (s *storageZfs) MigrationSink(container container, snapshots []container, c
}
}
+ defer func() {
+ /* clean up our migration-send snapshots that we got from recv. */
+ snapshots, err := s.zfsListSnapshots(fmt.Sprintf("containers/%s", container.Name()))
+ if err != nil {
+ shared.Log.Error("failed listing snapshots post migration", "err", err)
+ return
+ }
+
+ for _, snap := range snapshots {
+ if !strings.HasPrefix(snap, "migration-send") {
+ continue
+ }
+
+ s.zfsSnapshotDestroy(fmt.Sprintf("containers/%s", container.Name()), snap)
+ }
+ }()
+
/* finally, do the real container */
if err := zfsRecv(zfsName); err != nil {
return err
}
+ if live {
+ /* and again for the post-running snapshot if this was a live migration */
+ if err := zfsRecv(zfsName); err != nil {
+ return err
+ }
+ }
+
/* Sometimes, zfs recv mounts this anyway, even if we pass -u
* (https://forums.freebsd.org/threads/zfs-receive-u-shouldnt-mount-received-filesystem-right.36844/)
* but sometimes it doesn't. Let's try to mount, but not complain about
More information about the lxc-devel
mailing list