[lxc-devel] [lxd/master] Storage migration sink cleanup
tomponline on Github
lxc-bot at linuxcontainers.org
Fri Nov 29 11:14:13 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 394 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20191129/dc1b0239/attachment.bin>
-------------- next part --------------
From d5f578718e650c636f7e3282f19c7eeb4cdccba1 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Fri, 29 Nov 2019 11:08:20 +0000
Subject: [PATCH 1/3] lxd/containers/post: Pass state to migration Do function
Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
lxd/containers_post.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lxd/containers_post.go b/lxd/containers_post.go
index 7bde5d7c67..43bf9ee95c 100644
--- a/lxd/containers_post.go
+++ b/lxd/containers_post.go
@@ -438,7 +438,7 @@ func createFromMigration(d *Daemon, project string, req *api.InstancesPost) resp
}()
// And finally run the migration.
- err = sink.Do(op)
+ err = sink.Do(d.State(), op)
if err != nil {
return fmt.Errorf("Error transferring container data: %s", err)
}
From 6c1cf7822766b9e4712082481b455e8b915a8b10 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Fri, 29 Nov 2019 11:08:57 +0000
Subject: [PATCH 2/3] lxd/migrate/container: Restructure of migrationSink.Do()
- This is to accomodate the forthcoming link to the new storage layer.
- Renames the offer and response header variables to align with naming used in custom volume migration.
- Gathers together the legacy offer negotiation code so as to be easily segrated when new storage layer is linked.
- Passes rsyncFeatures from generated response header rather than offer header, to align with custom volume migration.
Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
lxd/migrate_container.go | 199 +++++++++++++++++++--------------------
1 file changed, 95 insertions(+), 104 deletions(-)
diff --git a/lxd/migrate_container.go b/lxd/migrate_container.go
index 280a94311f..6ada2233a0 100644
--- a/lxd/migrate_container.go
+++ b/lxd/migrate_container.go
@@ -21,6 +21,7 @@ import (
"github.com/lxc/lxd/lxd/migration"
"github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/lxd/rsync"
+ "github.com/lxc/lxd/lxd/state"
"github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/api"
@@ -783,13 +784,7 @@ func NewMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) {
return &sink, nil
}
-func (c *migrationSink) Do(migrateOp *operations.Operation) error {
- if c.src.instance.Type() != instancetype.Container {
- return fmt.Errorf("Instance type must be container")
- }
-
- ct := c.src.instance.(*containerLXC)
-
+func (c *migrationSink) Do(state *state.State, migrateOp *operations.Operation) error {
var err error
if c.push {
@@ -840,22 +835,19 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
controller = c.dest.sendControl
}
- header := migration.MigrationHeader{}
- if err := receiver(&header); err != nil {
+ offerHeader := migration.MigrationHeader{}
+ if err := receiver(&offerHeader); err != nil {
controller(err)
return err
}
- // Handle rsync options
- rsyncFeatures := header.GetRsyncFeaturesSlice()
-
live := c.src.live
if c.push {
live = c.dest.live
}
criuType := migration.CRIUType_CRIU_RSYNC.Enum()
- if header.Criu != nil && *header.Criu == migration.CRIUType_NONE {
+ if offerHeader.Criu != nil && *offerHeader.Criu == migration.CRIUType_NONE {
criuType = migration.CRIUType_NONE.Enum()
} else {
if !live {
@@ -863,64 +855,69 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
}
}
- mySink := ct.Storage().MigrationSink
- if c.refresh {
- mySink = rsyncMigrationSink
- }
+ // The function that will be executed to receive the sender's migration data.
+ var myTarget func(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error
- myType := ct.Storage().MigrationType()
- resp := migration.MigrationHeader{
- Fs: &myType,
- Criu: criuType,
- Snapshots: header.Snapshots,
- SnapshotNames: header.SnapshotNames,
- Refresh: &c.refresh,
- }
+ // The migration header to be sent back to source with our target options.
+ var respHeader migration.MigrationHeader
- // Return those rsync features we know about (with the value sent by the remote)
- if header.RsyncFeatures != nil {
- resp.RsyncFeatures = &migration.RsyncFeatures{}
- if resp.RsyncFeatures.Xattrs != nil {
- resp.RsyncFeatures.Xattrs = header.RsyncFeatures.Xattrs
- }
+ if c.src.instance.Type() == instancetype.Container {
+ ct := c.src.instance.(*containerLXC)
+ myTarget = ct.Storage().MigrationSink
+ myType := ct.Storage().MigrationType()
- if resp.RsyncFeatures.Delete != nil {
- resp.RsyncFeatures.Delete = header.RsyncFeatures.Delete
+ respHeader = migration.MigrationHeader{
+ Fs: &myType,
+ Criu: criuType,
+ Snapshots: offerHeader.Snapshots,
+ SnapshotNames: offerHeader.SnapshotNames,
+ Refresh: &c.refresh,
}
- if resp.RsyncFeatures.Compress != nil {
- resp.RsyncFeatures.Compress = header.RsyncFeatures.Compress
+ // Return those rsync features we know about (with the value sent by the remote).
+ if offerHeader.RsyncFeatures != nil {
+ respHeader.RsyncFeatures = &migration.RsyncFeatures{
+ Xattrs: offerHeader.RsyncFeatures.Xattrs,
+ Delete: offerHeader.RsyncFeatures.Delete,
+ Compress: offerHeader.RsyncFeatures.Compress,
+ Bidirectional: offerHeader.RsyncFeatures.Bidirectional,
+ }
}
- if resp.RsyncFeatures.Bidirectional != nil {
- resp.RsyncFeatures.Bidirectional = header.RsyncFeatures.Bidirectional
+ // Return those ZFS features we know about (with the value sent by the remote).
+ if len(zfsVersion) >= 3 && zfsVersion[0:3] != "0.6" {
+ if offerHeader.ZfsFeatures != nil && offerHeader.ZfsFeatures.Compress != nil {
+ respHeader.ZfsFeatures = &migration.ZfsFeatures{
+ Compress: offerHeader.ZfsFeatures.Compress,
+ }
+ }
}
- }
- // Return those ZFS features we know about (with the value sent by the remote)
- if len(zfsVersion) >= 3 && zfsVersion[0:3] != "0.6" {
- if header.ZfsFeatures != nil && header.ZfsFeatures.Compress != nil {
- resp.ZfsFeatures = &migration.ZfsFeatures{
- Compress: header.ZfsFeatures.Compress,
- }
+ // If refresh mode or the storage type the source has doesn't match what we have,
+ // then we have to use rsync.
+ if c.refresh || *offerHeader.Fs != *respHeader.Fs {
+ myTarget = rsyncMigrationSink
+ myType = migration.MigrationFSType_RSYNC
}
+ } else {
+ return fmt.Errorf("Instance type not supported")
}
if c.refresh {
- // Get our existing snapshots
+ // Get our existing snapshots.
targetSnapshots, err := c.src.instance.Snapshots()
if err != nil {
controller(err)
return err
}
- // Get the remote snapshots
- sourceSnapshots := header.GetSnapshots()
+ // Get the remote snapshots.
+ sourceSnapshots := offerHeader.GetSnapshots()
- // Compare the two sets
+ // Compare the two sets.
syncSnapshots, deleteSnapshots := migrationCompareSnapshots(sourceSnapshots, targetSnapshots)
- // Delete the extra local ones
+ // Delete the extra local ones.
for _, snap := range deleteSnapshots {
err := snap.Delete()
if err != nil {
@@ -934,29 +931,24 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
snapshotNames = append(snapshotNames, snap.GetName())
}
- resp.Snapshots = syncSnapshots
- resp.SnapshotNames = snapshotNames
- header.Snapshots = syncSnapshots
- header.SnapshotNames = snapshotNames
- }
-
- // If the storage type the source has doesn't match what we have, then
- // we have to use rsync.
- if c.refresh || *header.Fs != *resp.Fs {
- mySink = rsyncMigrationSink
- myType = migration.MigrationFSType_RSYNC
- resp.Fs = &myType
+ respHeader.Snapshots = syncSnapshots
+ respHeader.SnapshotNames = snapshotNames
+ offerHeader.Snapshots = syncSnapshots
+ offerHeader.SnapshotNames = snapshotNames
}
- if header.GetPredump() == true {
- // If the other side wants pre-dump and if
- // this side supports it, let's use it.
- resp.Predump = proto.Bool(true)
+ if offerHeader.GetPredump() == true {
+ // If the other side wants pre-dump and if this side supports it, let's use it.
+ respHeader.Predump = proto.Bool(true)
} else {
- resp.Predump = proto.Bool(false)
+ respHeader.Predump = proto.Bool(false)
}
- err = sender(&resp)
+ // Get rsync options from sender, these are passed into mySink function as part of
+ // MigrationSinkArgs below.
+ rsyncFeatures := respHeader.GetRsyncFeaturesSlice()
+
+ err = sender(&respHeader)
if err != nil {
controller(err)
return err
@@ -967,7 +959,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
imagesDir := ""
srcIdmap := new(idmap.IdmapSet)
- for _, idmapSet := range header.Idmap {
+ for _, idmapSet := range offerHeader.Idmap {
e := idmap.IdmapEntry{
Isuid: *idmapSet.Isuid,
Isgid: *idmapSet.Isgid,
@@ -977,28 +969,24 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
srcIdmap.Idmap = idmap.Extend(srcIdmap.Idmap, e)
}
- /* We do the fs receive in parallel so we don't have to reason
- * about when to receive what. The sending side is smart enough
- * to send the filesystem bits that it can before it seizes the
- * container to start checkpointing, so the total transfer time
- * will be minimized even if we're dumb here.
- */
+ // We do the fs receive in parallel so we don't have to reason about when to receive
+ // what. The sending side is smart enough to send the filesystem bits that it can
+ // before it seizes the container to start checkpointing, so the total transfer time
+ // will be minimized even if we're dumb here.
fsTransfer := make(chan error)
go func() {
snapshots := []*migration.Snapshot{}
- /* Legacy: we only sent the snapshot names, so we just
- * copy the container's config over, same as we used to
- * do.
- */
- if len(header.SnapshotNames) != len(header.Snapshots) {
- for _, name := range header.SnapshotNames {
+ // Legacy: we only sent the snapshot names, so we just copy the container's
+ // config over, same as we used to do.
+ if len(offerHeader.SnapshotNames) != len(offerHeader.Snapshots) {
+ for _, name := range offerHeader.SnapshotNames {
base := snapshotToProtobuf(c.src.instance)
base.Name = &name
snapshots = append(snapshots, base)
}
} else {
- snapshots = header.Snapshots
+ snapshots = offerHeader.Snapshots
}
var fsConn *websocket.Conn
@@ -1027,16 +1015,19 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
Snapshots: snapshots,
}
- err = mySink(fsConn, migrateOp, args)
+ err = myTarget(fsConn, migrateOp, args)
if err != nil {
fsTransfer <- err
return
}
- err = resetContainerDiskIdmap(ct, srcIdmap)
- if err != nil {
- fsTransfer <- err
- return
+ if c.src.instance.Type() == instancetype.Container {
+ ct := c.src.instance.(*containerLXC)
+ err = resetContainerDiskIdmap(ct, srcIdmap)
+ if err != nil {
+ fsTransfer <- err
+ return
+ }
}
fsTransfer <- nil
@@ -1063,10 +1054,10 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
FinalPreDump: proto.Bool(false),
}
- if resp.GetPredump() {
+ if respHeader.GetPredump() {
for !sync.GetFinalPreDump() {
logger.Debugf("About to receive rsync")
- // Transfer a CRIU pre-dump
+ // Transfer a CRIU pre-dump.
err = rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures)
if err != nil {
restore <- err
@@ -1075,8 +1066,8 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
logger.Debugf("Done receiving from rsync")
logger.Debugf("About to receive header")
- // Check if this was the last pre-dump
- // Only the FinalPreDump element if of interest
+ // Check if this was the last pre-dump.
+ // Only the FinalPreDump element if of interest.
mtype, data, err := criuConn.ReadMessage()
if err != nil {
restore <- err
@@ -1094,7 +1085,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
}
}
- // Final CRIU dump
+ // Final CRIU dump.
err = rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures)
if err != nil {
restore <- err
@@ -1119,15 +1110,16 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
preDumpDir: "",
}
- // Currently we only do a single CRIU pre-dump so we
- // can hardcode "final" here since we know that "final" is the
- // folder for CRIU's final dump.
- err = ct.Migrate(&criuMigrationArgs)
- if err != nil {
- restore <- err
- return
+ // Currently we only do a single CRIU pre-dump so we can hardcode "final"
+ // here since we know that "final" is the folder for CRIU's final dump.
+ if c.src.instance.Type() == instancetype.Container {
+ ct := c.src.instance.(*containerLXC)
+ err = ct.Migrate(&criuMigrationArgs)
+ if err != nil {
+ restore <- err
+ return
+ }
}
-
}
restore <- nil
@@ -1157,12 +1149,11 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
if !*msg.Success {
disconnector()
return fmt.Errorf(*msg.Message)
- } else {
- // The source can only tell us it failed (e.g. if
- // checkpointing failed). We have to tell the source
- // whether or not the restore was successful.
- logger.Debugf("Unknown message %v from source", msg)
}
+
+ // The source can only tell us it failed (e.g. if checkpointing failed).
+ // We have to tell the source whether or not the restore was successful.
+ logger.Debugf("Unknown message %v from source", msg)
}
}
}
From f11d28bb313b842d029508cead4b1bc45b50c0e4 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Fri, 29 Nov 2019 11:13:33 +0000
Subject: [PATCH 3/3] lxd/migrate/storage/volumes: Comment restructure
Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
lxd/migrate_storage_volumes.go | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git a/lxd/migrate_storage_volumes.go b/lxd/migrate_storage_volumes.go
index a2c95321a1..3423ec8eb2 100644
--- a/lxd/migrate_storage_volumes.go
+++ b/lxd/migrate_storage_volumes.go
@@ -427,12 +427,10 @@ func (c *migrationSink) DoStorage(state *state.State, poolName string, req *api.
restore := make(chan error)
go func(c *migrationSink) {
- /* We do the fs receive in parallel so we don't have to reason
- * about when to receive what. The sending side is smart enough
- * to send the filesystem bits that it can before it seizes the
- * container to start checkpointing, so the total transfer time
- * will be minimized even if we're dumb here.
- */
+ // We do the fs receive in parallel so we don't have to reason about when to receive
+ // what. The sending side is smart enough to send the filesystem bits that it can
+ // before it seizes the container to start checkpointing, so the total transfer time
+ // will be minimized even if we're dumb here.
fsTransfer := make(chan error)
go func() {
More information about the lxc-devel
mailing list