[lxc-devel] [lxd/master] Storage migration sink cleanup

tomponline on Github lxc-bot at linuxcontainers.org
Fri Nov 29 11:14:13 UTC 2019


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 394 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20191129/dc1b0239/attachment.bin>
-------------- next part --------------
From d5f578718e650c636f7e3282f19c7eeb4cdccba1 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Fri, 29 Nov 2019 11:08:20 +0000
Subject: [PATCH 1/3] lxd/containers/post: Pass state to migration Do function

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/containers_post.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lxd/containers_post.go b/lxd/containers_post.go
index 7bde5d7c67..43bf9ee95c 100644
--- a/lxd/containers_post.go
+++ b/lxd/containers_post.go
@@ -438,7 +438,7 @@ func createFromMigration(d *Daemon, project string, req *api.InstancesPost) resp
 		}()
 
 		// And finally run the migration.
-		err = sink.Do(op)
+		err = sink.Do(d.State(), op)
 		if err != nil {
 			return fmt.Errorf("Error transferring container data: %s", err)
 		}

From 6c1cf7822766b9e4712082481b455e8b915a8b10 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Fri, 29 Nov 2019 11:08:57 +0000
Subject: [PATCH 2/3] lxd/migrate/container: Restructure of migrationSink.Do()

- This is to accomodate the forthcoming link to the new storage layer.
- Renames the offer and response header variables to align with naming used in custom volume migration.
- Gathers together the legacy offer negotiation code so as to be easily segrated when new storage layer is linked.
- Passes rsyncFeatures from generated response header rather than offer header, to align with custom volume migration.

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/migrate_container.go | 199 +++++++++++++++++++--------------------
 1 file changed, 95 insertions(+), 104 deletions(-)

diff --git a/lxd/migrate_container.go b/lxd/migrate_container.go
index 280a94311f..6ada2233a0 100644
--- a/lxd/migrate_container.go
+++ b/lxd/migrate_container.go
@@ -21,6 +21,7 @@ import (
 	"github.com/lxc/lxd/lxd/migration"
 	"github.com/lxc/lxd/lxd/operations"
 	"github.com/lxc/lxd/lxd/rsync"
+	"github.com/lxc/lxd/lxd/state"
 	"github.com/lxc/lxd/lxd/util"
 	"github.com/lxc/lxd/shared"
 	"github.com/lxc/lxd/shared/api"
@@ -783,13 +784,7 @@ func NewMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) {
 	return &sink, nil
 }
 
-func (c *migrationSink) Do(migrateOp *operations.Operation) error {
-	if c.src.instance.Type() != instancetype.Container {
-		return fmt.Errorf("Instance type must be container")
-	}
-
-	ct := c.src.instance.(*containerLXC)
-
+func (c *migrationSink) Do(state *state.State, migrateOp *operations.Operation) error {
 	var err error
 
 	if c.push {
@@ -840,22 +835,19 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 		controller = c.dest.sendControl
 	}
 
-	header := migration.MigrationHeader{}
-	if err := receiver(&header); err != nil {
+	offerHeader := migration.MigrationHeader{}
+	if err := receiver(&offerHeader); err != nil {
 		controller(err)
 		return err
 	}
 
-	// Handle rsync options
-	rsyncFeatures := header.GetRsyncFeaturesSlice()
-
 	live := c.src.live
 	if c.push {
 		live = c.dest.live
 	}
 
 	criuType := migration.CRIUType_CRIU_RSYNC.Enum()
-	if header.Criu != nil && *header.Criu == migration.CRIUType_NONE {
+	if offerHeader.Criu != nil && *offerHeader.Criu == migration.CRIUType_NONE {
 		criuType = migration.CRIUType_NONE.Enum()
 	} else {
 		if !live {
@@ -863,64 +855,69 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 		}
 	}
 
-	mySink := ct.Storage().MigrationSink
-	if c.refresh {
-		mySink = rsyncMigrationSink
-	}
+	// The function that will be executed to receive the sender's migration data.
+	var myTarget func(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error
 
-	myType := ct.Storage().MigrationType()
-	resp := migration.MigrationHeader{
-		Fs:            &myType,
-		Criu:          criuType,
-		Snapshots:     header.Snapshots,
-		SnapshotNames: header.SnapshotNames,
-		Refresh:       &c.refresh,
-	}
+	// The migration header to be sent back to source with our target options.
+	var respHeader migration.MigrationHeader
 
-	// Return those rsync features we know about (with the value sent by the remote)
-	if header.RsyncFeatures != nil {
-		resp.RsyncFeatures = &migration.RsyncFeatures{}
-		if resp.RsyncFeatures.Xattrs != nil {
-			resp.RsyncFeatures.Xattrs = header.RsyncFeatures.Xattrs
-		}
+	if c.src.instance.Type() == instancetype.Container {
+		ct := c.src.instance.(*containerLXC)
+		myTarget = ct.Storage().MigrationSink
+		myType := ct.Storage().MigrationType()
 
-		if resp.RsyncFeatures.Delete != nil {
-			resp.RsyncFeatures.Delete = header.RsyncFeatures.Delete
+		respHeader = migration.MigrationHeader{
+			Fs:            &myType,
+			Criu:          criuType,
+			Snapshots:     offerHeader.Snapshots,
+			SnapshotNames: offerHeader.SnapshotNames,
+			Refresh:       &c.refresh,
 		}
 
-		if resp.RsyncFeatures.Compress != nil {
-			resp.RsyncFeatures.Compress = header.RsyncFeatures.Compress
+		// Return those rsync features we know about (with the value sent by the remote).
+		if offerHeader.RsyncFeatures != nil {
+			respHeader.RsyncFeatures = &migration.RsyncFeatures{
+				Xattrs:        offerHeader.RsyncFeatures.Xattrs,
+				Delete:        offerHeader.RsyncFeatures.Delete,
+				Compress:      offerHeader.RsyncFeatures.Compress,
+				Bidirectional: offerHeader.RsyncFeatures.Bidirectional,
+			}
 		}
 
-		if resp.RsyncFeatures.Bidirectional != nil {
-			resp.RsyncFeatures.Bidirectional = header.RsyncFeatures.Bidirectional
+		// Return those ZFS features we know about (with the value sent by the remote).
+		if len(zfsVersion) >= 3 && zfsVersion[0:3] != "0.6" {
+			if offerHeader.ZfsFeatures != nil && offerHeader.ZfsFeatures.Compress != nil {
+				respHeader.ZfsFeatures = &migration.ZfsFeatures{
+					Compress: offerHeader.ZfsFeatures.Compress,
+				}
+			}
 		}
-	}
 
-	// Return those ZFS features we know about (with the value sent by the remote)
-	if len(zfsVersion) >= 3 && zfsVersion[0:3] != "0.6" {
-		if header.ZfsFeatures != nil && header.ZfsFeatures.Compress != nil {
-			resp.ZfsFeatures = &migration.ZfsFeatures{
-				Compress: header.ZfsFeatures.Compress,
-			}
+		// If refresh mode or the storage type the source has doesn't match what we have,
+		// then we have to use rsync.
+		if c.refresh || *offerHeader.Fs != *respHeader.Fs {
+			myTarget = rsyncMigrationSink
+			myType = migration.MigrationFSType_RSYNC
 		}
+	} else {
+		return fmt.Errorf("Instance type not supported")
 	}
 
 	if c.refresh {
-		// Get our existing snapshots
+		// Get our existing snapshots.
 		targetSnapshots, err := c.src.instance.Snapshots()
 		if err != nil {
 			controller(err)
 			return err
 		}
 
-		// Get the remote snapshots
-		sourceSnapshots := header.GetSnapshots()
+		// Get the remote snapshots.
+		sourceSnapshots := offerHeader.GetSnapshots()
 
-		// Compare the two sets
+		// Compare the two sets.
 		syncSnapshots, deleteSnapshots := migrationCompareSnapshots(sourceSnapshots, targetSnapshots)
 
-		// Delete the extra local ones
+		// Delete the extra local ones.
 		for _, snap := range deleteSnapshots {
 			err := snap.Delete()
 			if err != nil {
@@ -934,29 +931,24 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 			snapshotNames = append(snapshotNames, snap.GetName())
 		}
 
-		resp.Snapshots = syncSnapshots
-		resp.SnapshotNames = snapshotNames
-		header.Snapshots = syncSnapshots
-		header.SnapshotNames = snapshotNames
-	}
-
-	// If the storage type the source has doesn't match what we have, then
-	// we have to use rsync.
-	if c.refresh || *header.Fs != *resp.Fs {
-		mySink = rsyncMigrationSink
-		myType = migration.MigrationFSType_RSYNC
-		resp.Fs = &myType
+		respHeader.Snapshots = syncSnapshots
+		respHeader.SnapshotNames = snapshotNames
+		offerHeader.Snapshots = syncSnapshots
+		offerHeader.SnapshotNames = snapshotNames
 	}
 
-	if header.GetPredump() == true {
-		// If the other side wants pre-dump and if
-		// this side supports it, let's use it.
-		resp.Predump = proto.Bool(true)
+	if offerHeader.GetPredump() == true {
+		// If the other side wants pre-dump and if this side supports it, let's use it.
+		respHeader.Predump = proto.Bool(true)
 	} else {
-		resp.Predump = proto.Bool(false)
+		respHeader.Predump = proto.Bool(false)
 	}
 
-	err = sender(&resp)
+	// Get rsync options from sender, these are passed into mySink function as part of
+	// MigrationSinkArgs below.
+	rsyncFeatures := respHeader.GetRsyncFeaturesSlice()
+
+	err = sender(&respHeader)
 	if err != nil {
 		controller(err)
 		return err
@@ -967,7 +959,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 		imagesDir := ""
 		srcIdmap := new(idmap.IdmapSet)
 
-		for _, idmapSet := range header.Idmap {
+		for _, idmapSet := range offerHeader.Idmap {
 			e := idmap.IdmapEntry{
 				Isuid:    *idmapSet.Isuid,
 				Isgid:    *idmapSet.Isgid,
@@ -977,28 +969,24 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 			srcIdmap.Idmap = idmap.Extend(srcIdmap.Idmap, e)
 		}
 
-		/* We do the fs receive in parallel so we don't have to reason
-		 * about when to receive what. The sending side is smart enough
-		 * to send the filesystem bits that it can before it seizes the
-		 * container to start checkpointing, so the total transfer time
-		 * will be minimized even if we're dumb here.
-		 */
+		// We do the fs receive in parallel so we don't have to reason about when to receive
+		// what. The sending side is smart enough to send the filesystem bits that it can
+		// before it seizes the container to start checkpointing, so the total transfer time
+		// will be minimized even if we're dumb here.
 		fsTransfer := make(chan error)
 		go func() {
 			snapshots := []*migration.Snapshot{}
 
-			/* Legacy: we only sent the snapshot names, so we just
-			 * copy the container's config over, same as we used to
-			 * do.
-			 */
-			if len(header.SnapshotNames) != len(header.Snapshots) {
-				for _, name := range header.SnapshotNames {
+			// Legacy: we only sent the snapshot names, so we just copy the container's
+			// config over, same as we used to do.
+			if len(offerHeader.SnapshotNames) != len(offerHeader.Snapshots) {
+				for _, name := range offerHeader.SnapshotNames {
 					base := snapshotToProtobuf(c.src.instance)
 					base.Name = &name
 					snapshots = append(snapshots, base)
 				}
 			} else {
-				snapshots = header.Snapshots
+				snapshots = offerHeader.Snapshots
 			}
 
 			var fsConn *websocket.Conn
@@ -1027,16 +1015,19 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 				Snapshots:     snapshots,
 			}
 
-			err = mySink(fsConn, migrateOp, args)
+			err = myTarget(fsConn, migrateOp, args)
 			if err != nil {
 				fsTransfer <- err
 				return
 			}
 
-			err = resetContainerDiskIdmap(ct, srcIdmap)
-			if err != nil {
-				fsTransfer <- err
-				return
+			if c.src.instance.Type() == instancetype.Container {
+				ct := c.src.instance.(*containerLXC)
+				err = resetContainerDiskIdmap(ct, srcIdmap)
+				if err != nil {
+					fsTransfer <- err
+					return
+				}
 			}
 
 			fsTransfer <- nil
@@ -1063,10 +1054,10 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 				FinalPreDump: proto.Bool(false),
 			}
 
-			if resp.GetPredump() {
+			if respHeader.GetPredump() {
 				for !sync.GetFinalPreDump() {
 					logger.Debugf("About to receive rsync")
-					// Transfer a CRIU pre-dump
+					// Transfer a CRIU pre-dump.
 					err = rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures)
 					if err != nil {
 						restore <- err
@@ -1075,8 +1066,8 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 					logger.Debugf("Done receiving from rsync")
 
 					logger.Debugf("About to receive header")
-					// Check if this was the last pre-dump
-					// Only the FinalPreDump element if of interest
+					// Check if this was the last pre-dump.
+					// Only the FinalPreDump element if of interest.
 					mtype, data, err := criuConn.ReadMessage()
 					if err != nil {
 						restore <- err
@@ -1094,7 +1085,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 				}
 			}
 
-			// Final CRIU dump
+			// Final CRIU dump.
 			err = rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures)
 			if err != nil {
 				restore <- err
@@ -1119,15 +1110,16 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 				preDumpDir:   "",
 			}
 
-			// Currently we only do a single CRIU pre-dump so we
-			// can hardcode "final" here since we know that "final" is the
-			// folder for CRIU's final dump.
-			err = ct.Migrate(&criuMigrationArgs)
-			if err != nil {
-				restore <- err
-				return
+			// Currently we only do a single CRIU pre-dump so we can hardcode "final"
+			// here since we know that "final" is the folder for CRIU's final dump.
+			if c.src.instance.Type() == instancetype.Container {
+				ct := c.src.instance.(*containerLXC)
+				err = ct.Migrate(&criuMigrationArgs)
+				if err != nil {
+					restore <- err
+					return
+				}
 			}
-
 		}
 
 		restore <- nil
@@ -1157,12 +1149,11 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 			if !*msg.Success {
 				disconnector()
 				return fmt.Errorf(*msg.Message)
-			} else {
-				// The source can only tell us it failed (e.g. if
-				// checkpointing failed). We have to tell the source
-				// whether or not the restore was successful.
-				logger.Debugf("Unknown message %v from source", msg)
 			}
+
+			// The source can only tell us it failed (e.g. if checkpointing failed).
+			// We have to tell the source whether or not the restore was successful.
+			logger.Debugf("Unknown message %v from source", msg)
 		}
 	}
 }

From f11d28bb313b842d029508cead4b1bc45b50c0e4 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Fri, 29 Nov 2019 11:13:33 +0000
Subject: [PATCH 3/3] lxd/migrate/storage/volumes: Comment restructure

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/migrate_storage_volumes.go | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/lxd/migrate_storage_volumes.go b/lxd/migrate_storage_volumes.go
index a2c95321a1..3423ec8eb2 100644
--- a/lxd/migrate_storage_volumes.go
+++ b/lxd/migrate_storage_volumes.go
@@ -427,12 +427,10 @@ func (c *migrationSink) DoStorage(state *state.State, poolName string, req *api.
 	restore := make(chan error)
 
 	go func(c *migrationSink) {
-		/* We do the fs receive in parallel so we don't have to reason
-		 * about when to receive what. The sending side is smart enough
-		 * to send the filesystem bits that it can before it seizes the
-		 * container to start checkpointing, so the total transfer time
-		 * will be minimized even if we're dumb here.
-		 */
+		// We do the fs receive in parallel so we don't have to reason about when to receive
+		// what. The sending side is smart enough to send the filesystem bits that it can
+		// before it seizes the container to start checkpointing, so the total transfer time
+		// will be minimized even if we're dumb here.
 		fsTransfer := make(chan error)
 
 		go func() {


More information about the lxc-devel mailing list