[lxc-devel] [lxd/master] push support

brauner on Github lxc-bot at linuxcontainers.org
Fri Sep 30 10:33:53 UTC 2016


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 2607 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20160930/42acad54/attachment.bin>
-------------- next part --------------
From d01245ffc5a15c610627789ac0be9c3de426b606 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Thu, 29 Sep 2016 17:21:48 +0200
Subject: [PATCH 01/14] lxd/containers_post: prepare for push support

Various parts of the code will move into the migration part.

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 lxd/containers_post.go | 143 +++++++++++++++++++++----------------------------
 1 file changed, 60 insertions(+), 83 deletions(-)

diff --git a/lxd/containers_post.go b/lxd/containers_post.go
index bc27740..10e2aee 100644
--- a/lxd/containers_post.go
+++ b/lxd/containers_post.go
@@ -210,106 +210,83 @@ func createFromMigration(d *Daemon, req *containerPostReq) Response {
 		architecture = 0
 	}
 
-	run := func(op *operation) error {
-		args := containerArgs{
-			Architecture: architecture,
-			BaseImage:    req.Source.BaseImage,
-			Config:       req.Config,
-			Ctype:        cTypeRegular,
-			Devices:      req.Devices,
-			Ephemeral:    req.Ephemeral,
-			Name:         req.Name,
-			Profiles:     req.Profiles,
-		}
-
-		var c container
-		_, _, err := dbImageGet(d.db, req.Source.BaseImage, false, true)
-
-		/* Only create a container from an image if we're going to
-		 * rsync over the top of it. In the case of a better file
-		 * transfer mechanism, let's just use that.
-		 *
-		 * TODO: we could invent some negotiation here, where if the
-		 * source and sink both have the same image, we can clone from
-		 * it, but we have to know before sending the snapshot that
-		 * we're sending the whole thing or just a delta from the
-		 * image, so one extra negotiation round trip is needed. An
-		 * alternative is to move actual container object to a later
-		 * point and just negotiate it over the migration control
-		 * socket. Anyway, it'll happen later :)
-		 */
-		if err == nil && d.Storage.MigrationType() == MigrationFSType_RSYNC {
-			c, err = containerCreateFromImage(d, args, req.Source.BaseImage)
-			if err != nil {
-				return err
-			}
-		} else {
-			c, err = containerCreateAsEmpty(d, args)
-			if err != nil {
-				return err
-			}
-		}
-
-		var cert *x509.Certificate
-		if req.Source.Certificate != "" {
-			certBlock, _ := pem.Decode([]byte(req.Source.Certificate))
-			if certBlock == nil {
-				return fmt.Errorf("Invalid certificate")
-			}
-
-			cert, err = x509.ParseCertificate(certBlock.Bytes)
-			if err != nil {
-				return err
-			}
-		}
+	args := containerArgs{
+		Architecture: architecture,
+		BaseImage:    req.Source.BaseImage,
+		Config:       req.Config,
+		Ctype:        cTypeRegular,
+		Devices:      req.Devices,
+		Ephemeral:    req.Ephemeral,
+		Name:         req.Name,
+		Profiles:     req.Profiles,
+	}
 
-		config, err := shared.GetTLSConfig("", "", "", cert)
+	var c container
+	_, _, err = dbImageGet(d.db, req.Source.BaseImage, false, true)
+
+	/* Only create a container from an image if we're going to
+	 * rsync over the top of it. In the case of a better file
+	 * transfer mechanism, let's just use that.
+	 *
+	 * TODO: we could invent some negotiation here, where if the
+	 * source and sink both have the same image, we can clone from
+	 * it, but we have to know before sending the snapshot that
+	 * we're sending the whole thing or just a delta from the
+	 * image, so one extra negotiation round trip is needed. An
+	 * alternative is to move actual container object to a later
+	 * point and just negotiate it over the migration control
+	 * socket. Anyway, it'll happen later :)
+	 */
+	if err == nil && d.Storage.MigrationType() == MigrationFSType_RSYNC {
+		c, err = containerCreateFromImage(d, args, req.Source.BaseImage)
 		if err != nil {
-			c.Delete()
-			return err
-		}
-
-		migrationArgs := MigrationSinkArgs{
-			Url: req.Source.Operation,
-			Dialer: websocket.Dialer{
-				TLSClientConfig: config,
-				NetDial:         shared.RFC3493Dialer},
-			Container: c,
-			Secrets:   req.Source.Websockets,
+			return InternalError(err)
 		}
-
-		sink, err := NewMigrationSink(&migrationArgs)
+	} else {
+		c, err = containerCreateAsEmpty(d, args)
 		if err != nil {
-			c.Delete()
-			return err
+			return InternalError(err)
 		}
+	}
 
-		// Start the storage for this container (LVM mount/umount)
-		c.StorageStart()
+	var cert *x509.Certificate
+	if req.Source.Certificate != "" {
+		certBlock, _ := pem.Decode([]byte(req.Source.Certificate))
+		if certBlock == nil {
+			return InternalError(fmt.Errorf("Invalid certificate"))
+		}
 
-		// And finaly run the migration.
-		err = sink()
+		cert, err = x509.ParseCertificate(certBlock.Bytes)
 		if err != nil {
-			c.StorageStop()
-			shared.LogError("Error during migration sink", log.Ctx{"err": err})
-			c.Delete()
-			return fmt.Errorf("Error transferring container data: %s", err)
+			return InternalError(err)
 		}
+	}
 
-		defer c.StorageStop()
+	config, err := shared.GetTLSConfig("", "", "", cert)
+	if err != nil {
+		c.Delete()
+		return InternalError(err)
+	}
 
-		err = c.TemplateApply("copy")
-		if err != nil {
-			return err
-		}
+	migrationArgs := MigrationSinkArgs{
+		Url: req.Source.Operation,
+		Dialer: websocket.Dialer{
+			TLSClientConfig: config,
+			NetDial:         shared.RFC3493Dialer},
+		Container: c,
+		Secrets:   req.Source.Websockets,
+	}
 
-		return nil
+	sink, err := NewMigrationSink(&migrationArgs)
+	if err != nil {
+		c.Delete()
+		return InternalError(err)
 	}
 
 	resources := map[string][]string{}
 	resources["containers"] = []string{req.Name}
 
-	op, err := operationCreate(operationClassTask, resources, nil, run, nil, nil)
+	op, err := operationCreate(operationClassTask, resources, nil, sink.Do, nil, nil)
 	if err != nil {
 		return InternalError(err)
 	}

From 4fa0ebd25c10fed51598be8d44065aeb2cae200e Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Thu, 29 Sep 2016 17:29:22 +0200
Subject: [PATCH 02/14] lxd/migrate: prepare push mode

- The code pieces that have been removed from
  lxd/containers_post.go:containerCreateFromMigration() move here.
- Export Do() method for NewMigrationSink().

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 lxd/migrate.go | 27 ++++++++++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)

diff --git a/lxd/migrate.go b/lxd/migrate.go
index b9337fa..1c32d4a 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -525,7 +525,7 @@ type MigrationSinkArgs struct {
 	Secrets   map[string]string
 }
 
-func NewMigrationSink(args *MigrationSinkArgs) (func() error, error) {
+func NewMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) {
 	sink := migrationSink{
 		migrationFields{container: args.Container},
 		args.Url,
@@ -550,7 +550,7 @@ func NewMigrationSink(args *MigrationSinkArgs) (func() error, error) {
 		return nil, err
 	}
 
-	return sink.do, nil
+	return &sink, nil
 }
 
 func (c *migrationSink) connectWithSecret(secret string) (*websocket.Conn, error) {
@@ -562,16 +562,22 @@ func (c *migrationSink) connectWithSecret(secret string) (*websocket.Conn, error
 	return lxd.WebsocketDial(c.dialer, wsUrl)
 }
 
-func (c *migrationSink) do() error {
+func (c *migrationSink) Do(migrateOp *operation) error {
 	var err error
+
+	// Start the storage for this container (LVM mount/umount)
+	c.container.StorageStart()
+
 	c.controlConn, err = c.connectWithSecret(c.controlSecret)
 	if err != nil {
+		c.container.StorageStop()
 		return err
 	}
 	defer c.disconnect()
 
 	c.fsConn, err = c.connectWithSecret(c.fsSecret)
 	if err != nil {
+		c.container.StorageStop()
 		c.sendControl(err)
 		return err
 	}
@@ -579,6 +585,7 @@ func (c *migrationSink) do() error {
 	if c.live {
 		c.criuConn, err = c.connectWithSecret(c.criuSecret)
 		if err != nil {
+			c.container.StorageStop()
 			c.sendControl(err)
 			return err
 		}
@@ -586,6 +593,7 @@ func (c *migrationSink) do() error {
 
 	header := MigrationHeader{}
 	if err := c.recv(&header); err != nil {
+		c.container.StorageStop()
 		c.sendControl(err)
 		return err
 	}
@@ -611,6 +619,7 @@ func (c *migrationSink) do() error {
 	}
 
 	if err := c.send(&resp); err != nil {
+		c.container.StorageStop()
 		c.sendControl(err)
 		return err
 	}
@@ -706,14 +715,17 @@ func (c *migrationSink) do() error {
 	for {
 		select {
 		case err = <-restore:
+			c.container.StorageStop()
 			c.sendControl(err)
 			return err
 		case msg, ok := <-source:
 			if !ok {
+				c.container.StorageStop()
 				c.disconnect()
 				return fmt.Errorf("Got error reading source")
 			}
 			if !*msg.Success {
+				c.container.StorageStop()
 				c.disconnect()
 				return fmt.Errorf(*msg.Message)
 			} else {
@@ -724,6 +736,15 @@ func (c *migrationSink) do() error {
 			}
 		}
 	}
+
+	defer c.container.StorageStop()
+
+	err = c.container.TemplateApply("copy")
+	if err != nil {
+		return err
+	}
+
+	return nil
 }
 
 /*

From c31c76ea8cad433f281d9d2cf238a3175cd1dcc0 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Fri, 30 Sep 2016 10:10:39 +0200
Subject: [PATCH 03/14] lxd/migrate: make anonymous struct unanonymous

For push mode we will place a second struct in there. Both will have names to
prevent confusion.

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 lxd/migrate.go | 91 +++++++++++++++++++++++++++++-----------------------------
 1 file changed, 46 insertions(+), 45 deletions(-)

diff --git a/lxd/migrate.go b/lxd/migrate.go
index 1c32d4a..1429bd9 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -512,7 +512,8 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
 }
 
 type migrationSink struct {
-	migrationFields
+	// We are pulling the container from src in pull mode.
+	src migrationFields
 
 	url    string
 	dialer websocket.Dialer
@@ -527,26 +528,26 @@ type MigrationSinkArgs struct {
 
 func NewMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) {
 	sink := migrationSink{
-		migrationFields{container: args.Container},
-		args.Url,
-		args.Dialer,
+		src:    migrationFields{container: args.Container},
+		url:    args.Url,
+		dialer: args.Dialer,
 	}
 
 	var ok bool
-	sink.controlSecret, ok = args.Secrets["control"]
+	sink.src.controlSecret, ok = args.Secrets["control"]
 	if !ok {
 		return nil, fmt.Errorf("Missing control secret")
 	}
 
-	sink.fsSecret, ok = args.Secrets["fs"]
+	sink.src.fsSecret, ok = args.Secrets["fs"]
 	if !ok {
 		return nil, fmt.Errorf("Missing fs secret")
 	}
 
-	sink.criuSecret, ok = args.Secrets["criu"]
-	sink.live = ok
+	sink.src.criuSecret, ok = args.Secrets["criu"]
+	sink.src.live = ok
 
-	if err := findCriu("destination"); sink.live && err != nil {
+	if err := findCriu("destination"); sink.src.live && err != nil {
 		return nil, err
 	}
 
@@ -566,45 +567,45 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 	var err error
 
 	// Start the storage for this container (LVM mount/umount)
-	c.container.StorageStart()
+	c.src.container.StorageStart()
 
-	c.controlConn, err = c.connectWithSecret(c.controlSecret)
+	c.src.controlConn, err = c.connectWithSecret(c.src.controlSecret)
 	if err != nil {
-		c.container.StorageStop()
+		c.src.container.StorageStop()
 		return err
 	}
-	defer c.disconnect()
+	defer c.src.disconnect()
 
-	c.fsConn, err = c.connectWithSecret(c.fsSecret)
+	c.src.fsConn, err = c.connectWithSecret(c.src.fsSecret)
 	if err != nil {
-		c.container.StorageStop()
-		c.sendControl(err)
+		c.src.container.StorageStop()
+		c.src.sendControl(err)
 		return err
 	}
 
-	if c.live {
-		c.criuConn, err = c.connectWithSecret(c.criuSecret)
+	if c.src.live {
+		c.src.criuConn, err = c.connectWithSecret(c.src.criuSecret)
 		if err != nil {
-			c.container.StorageStop()
-			c.sendControl(err)
+			c.src.container.StorageStop()
+			c.src.sendControl(err)
 			return err
 		}
 	}
 
 	header := MigrationHeader{}
-	if err := c.recv(&header); err != nil {
-		c.container.StorageStop()
-		c.sendControl(err)
+	if err := c.src.recv(&header); err != nil {
+		c.src.container.StorageStop()
+		c.src.sendControl(err)
 		return err
 	}
 
 	criuType := CRIUType_CRIU_RSYNC.Enum()
-	if !c.live {
+	if !c.src.live {
 		criuType = nil
 	}
 
-	mySink := c.container.Storage().MigrationSink
-	myType := c.container.Storage().MigrationType()
+	mySink := c.src.container.Storage().MigrationSink
+	myType := c.src.container.Storage().MigrationType()
 	resp := MigrationHeader{
 		Fs:   &myType,
 		Criu: criuType,
@@ -618,9 +619,9 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 		resp.Fs = &myType
 	}
 
-	if err := c.send(&resp); err != nil {
-		c.container.StorageStop()
-		c.sendControl(err)
+	if err := c.src.send(&resp); err != nil {
+		c.src.container.StorageStop()
+		c.src.sendControl(err)
 		return err
 	}
 
@@ -655,7 +656,7 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 			 */
 			if len(header.SnapshotNames) != len(header.Snapshots) {
 				for _, name := range header.SnapshotNames {
-					base := snapshotToProtobuf(c.container)
+					base := snapshotToProtobuf(c.src.container)
 					base.Name = &name
 					snapshots = append(snapshots, base)
 				}
@@ -663,12 +664,12 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 				snapshots = header.Snapshots
 			}
 
-			if err := mySink(c.live, c.container, header.Snapshots, c.fsConn, srcIdmap); err != nil {
+			if err := mySink(c.src.live, c.src.container, header.Snapshots, c.src.fsConn, srcIdmap); err != nil {
 				fsTransfer <- err
 				return
 			}
 
-			if err := ShiftIfNecessary(c.container, srcIdmap); err != nil {
+			if err := ShiftIfNecessary(c.src.container, srcIdmap); err != nil {
 				fsTransfer <- err
 				return
 			}
@@ -676,7 +677,7 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 			fsTransfer <- nil
 		}()
 
-		if c.live {
+		if c.src.live {
 			var err error
 			imagesDir, err = ioutil.TempDir("", "lxd_restore_")
 			if err != nil {
@@ -686,7 +687,7 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 
 			defer os.RemoveAll(imagesDir)
 
-			if err := RsyncRecv(shared.AddSlash(imagesDir), c.criuConn); err != nil {
+			if err := RsyncRecv(shared.AddSlash(imagesDir), c.src.criuConn); err != nil {
 				restore <- err
 				return
 			}
@@ -698,8 +699,8 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 			return
 		}
 
-		if c.live {
-			err = c.container.Migrate(lxc.MIGRATE_RESTORE, imagesDir, "migration", false, false)
+		if c.src.live {
+			err = c.src.container.Migrate(lxc.MIGRATE_RESTORE, imagesDir, "migration", false, false)
 			if err != nil {
 				restore <- err
 				return
@@ -710,23 +711,23 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 		restore <- nil
 	}(c)
 
-	source := c.controlChannel()
+	source := c.src.controlChannel()
 
 	for {
 		select {
 		case err = <-restore:
-			c.container.StorageStop()
-			c.sendControl(err)
+			c.src.container.StorageStop()
+			c.src.sendControl(err)
 			return err
 		case msg, ok := <-source:
 			if !ok {
-				c.container.StorageStop()
-				c.disconnect()
+				c.src.container.StorageStop()
+				c.src.disconnect()
 				return fmt.Errorf("Got error reading source")
 			}
 			if !*msg.Success {
-				c.container.StorageStop()
-				c.disconnect()
+				c.src.container.StorageStop()
+				c.src.disconnect()
 				return fmt.Errorf(*msg.Message)
 			} else {
 				// The source can only tell us it failed (e.g. if
@@ -737,9 +738,9 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 		}
 	}
 
-	defer c.container.StorageStop()
+	defer c.src.container.StorageStop()
 
-	err = c.container.TemplateApply("copy")
+	err = c.src.container.TemplateApply("copy")
 	if err != nil {
 		return err
 	}

From 6c4552f1a9ce1c3b72ca9c8741d828c5ed7838bc Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Fri, 30 Sep 2016 10:31:43 +0200
Subject: [PATCH 04/14] lxd/migrate: push mode: migrationSink

Extend migrationSink struct to support push mode.

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 lxd/migrate.go | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/lxd/migrate.go b/lxd/migrate.go
index 1429bd9..f254019 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -514,9 +514,15 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
 type migrationSink struct {
 	// We are pulling the container from src in pull mode.
 	src migrationFields
+	// The container is pushed to dest in push mode. Note that websocket
+	// connections are not set in push mode. Only the secret fields are
+	// used since the client will connect to the sockets.
+	dest migrationFields
 
-	url    string
-	dialer websocket.Dialer
+	url          string
+	dialer       websocket.Dialer
+	allConnected chan bool
+	push         bool
 }
 
 type MigrationSinkArgs struct {

From bf6d76d2c9b4e0b68c8874b37ec9f05ba7f81f28 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Fri, 30 Sep 2016 10:34:14 +0200
Subject: [PATCH 05/14] lxd/migrate: MigrationSinkArgs push mode

Extend MigrationSinkArgs to support push mode.

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 lxd/migrate.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lxd/migrate.go b/lxd/migrate.go
index f254019..ad409e0 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -530,6 +530,7 @@ type MigrationSinkArgs struct {
 	Dialer    websocket.Dialer
 	Container container
 	Secrets   map[string]string
+	Push      bool
 }
 
 func NewMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) {

From 809c9de4dae686a116711dbd958c75decfdc36b1 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Fri, 30 Sep 2016 10:40:55 +0200
Subject: [PATCH 06/14] lxd/migrate: NewMigrationSink() push mode

Adapt NewMigrationSink() to support push mode. This mainly means that when push
is set to true, NewMigrationSink() will generate a set of websocket secrets
which can be passed back to the client to connect to the dest server.

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 lxd/migrate.go | 31 +++++++++++++++++++++++++++++--
 1 file changed, 29 insertions(+), 2 deletions(-)

diff --git a/lxd/migrate.go b/lxd/migrate.go
index ad409e0..dad4a34 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -533,26 +533,53 @@ type MigrationSinkArgs struct {
 	Push      bool
 }
 
-func NewMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) {
+func NewMigrationSink(args *MigrationSinkArgs, push bool) (*migrationSink, error) {
 	sink := migrationSink{
 		src:    migrationFields{container: args.Container},
 		url:    args.Url,
 		dialer: args.Dialer,
+		push:   args.Push,
 	}
 
-	var ok bool
+	if push {
+		sink.allConnected = make(chan bool, 1)
+	}
+
+	var (
+		ok  bool
+		err error
+	)
 	sink.src.controlSecret, ok = args.Secrets["control"]
 	if !ok {
 		return nil, fmt.Errorf("Missing control secret")
 	}
+	if push {
+		sink.dest.controlSecret, err = shared.RandomCryptoString()
+		if err != nil {
+			return nil, err
+		}
+	}
 
 	sink.src.fsSecret, ok = args.Secrets["fs"]
 	if !ok {
 		return nil, fmt.Errorf("Missing fs secret")
 	}
+	if push {
+		sink.dest.fsSecret, err = shared.RandomCryptoString()
+		if err != nil {
+			return nil, err
+		}
+	}
 
 	sink.src.criuSecret, ok = args.Secrets["criu"]
 	sink.src.live = ok
+	if push && ok {
+		sink.dest.criuSecret, err = shared.RandomCryptoString()
+		if err != nil {
+			return nil, err
+		}
+		sink.dest.live = ok
+	}
 
 	if err := findCriu("destination"); sink.src.live && err != nil {
 		return nil, err

From 6ebe764c3f6f9bc0d6f5e6adc639cfaa9f9fa1d0 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Fri, 30 Sep 2016 10:44:45 +0200
Subject: [PATCH 07/14] lxd/migrate: Metadata() push mode

The migrationSink struct gains a Metadata() method. We will use it to send the
secrets back to the client as an operation.

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 lxd/migrate.go | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/lxd/migrate.go b/lxd/migrate.go
index dad4a34..96d9067 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -597,6 +597,19 @@ func (c *migrationSink) connectWithSecret(secret string) (*websocket.Conn, error
 	return lxd.WebsocketDial(c.dialer, wsUrl)
 }
 
+func (s *migrationSink) Metadata() interface{} {
+	secrets := shared.Jmap{
+		"control": s.dest.controlSecret,
+		"fs":      s.dest.fsSecret,
+	}
+
+	if s.dest.criuSecret != "" {
+		secrets["criu"] = s.dest.criuSecret
+	}
+
+	return secrets
+}
+
 func (c *migrationSink) Do(migrateOp *operation) error {
 	var err error
 

From 392013299fef2d3301bef057c34a97d984af44a5 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Fri, 30 Sep 2016 10:48:10 +0200
Subject: [PATCH 08/14] lxd/migrate: Connect() push mode

The migrationSink struct gains a Connect() method. We will use it to connect to
upgrade to a websocket connection once the client connects.

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 lxd/migrate.go | 35 +++++++++++++++++++++++++++++++++++
 1 file changed, 35 insertions(+)

diff --git a/lxd/migrate.go b/lxd/migrate.go
index 96d9067..b54fea1 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -610,6 +610,41 @@ func (s *migrationSink) Metadata() interface{} {
 	return secrets
 }
 
+func (s *migrationSink) Connect(op *operation, r *http.Request, w http.ResponseWriter) error {
+	secret := r.FormValue("secret")
+	if secret == "" {
+		return fmt.Errorf("missing secret")
+	}
+
+	var conn **websocket.Conn
+
+	switch secret {
+	case s.dest.controlSecret:
+		conn = &s.dest.controlConn
+	case s.dest.criuSecret:
+		conn = &s.dest.criuConn
+	case s.dest.fsSecret:
+		conn = &s.dest.fsConn
+	default:
+		/* If we didn't find the right secret, the user provided a bad one,
+		 * which 403, not 404, since this operation actually exists */
+		return os.ErrPermission
+	}
+
+	c, err := shared.WebsocketUpgrader.Upgrade(w, r, nil)
+	if err != nil {
+		return err
+	}
+
+	*conn = c
+
+	if s.dest.controlConn != nil && (!s.dest.live || s.dest.criuConn != nil) && s.dest.fsConn != nil {
+		s.allConnected <- true
+	}
+
+	return nil
+}
+
 func (c *migrationSink) Do(migrateOp *operation) error {
 	var err error
 

From f8c354760415cff40d8546f1347d4e02757c4c10 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Fri, 30 Sep 2016 11:03:41 +0200
Subject: [PATCH 09/14] lxd/migrate: Do() push mode

Adapt Do() method of migrationSink struct to support push mode. Note that the
boolean we take to indicate whether we need live migration support or not is
always taken to be src.live and not dest.live. This is because src will always
hold information about the container that is actually moved. The same goes for
src.container which will always hold the actual container struct.

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 lxd/migrate.go | 87 ++++++++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 64 insertions(+), 23 deletions(-)

diff --git a/lxd/migrate.go b/lxd/migrate.go
index b54fea1..c74f052 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -648,36 +648,64 @@ func (s *migrationSink) Connect(op *operation, r *http.Request, w http.ResponseW
 func (c *migrationSink) Do(migrateOp *operation) error {
 	var err error
 
+	if c.push {
+		<-c.allConnected
+	}
+
 	// Start the storage for this container (LVM mount/umount)
 	c.src.container.StorageStart()
 
-	c.src.controlConn, err = c.connectWithSecret(c.src.controlSecret)
-	if err != nil {
-		c.src.container.StorageStop()
-		return err
+	disconnector := c.src.disconnect
+	if c.push {
+		disconnector = c.dest.disconnect
 	}
-	defer c.src.disconnect()
 
-	c.src.fsConn, err = c.connectWithSecret(c.src.fsSecret)
-	if err != nil {
-		c.src.container.StorageStop()
-		c.src.sendControl(err)
-		return err
-	}
+	if c.push {
+		defer disconnector()
+	} else {
+		c.src.controlConn, err = c.connectWithSecret(c.src.controlSecret)
+		if err != nil {
+			c.src.container.StorageStop()
+			return err
+		}
+		defer c.src.disconnect()
 
-	if c.src.live {
-		c.src.criuConn, err = c.connectWithSecret(c.src.criuSecret)
+		c.src.fsConn, err = c.connectWithSecret(c.src.fsSecret)
 		if err != nil {
 			c.src.container.StorageStop()
 			c.src.sendControl(err)
 			return err
 		}
+
+		if c.src.live {
+			c.src.criuConn, err = c.connectWithSecret(c.src.criuSecret)
+			if err != nil {
+				c.src.container.StorageStop()
+				c.src.sendControl(err)
+				return err
+			}
+		}
+	}
+
+	receiver := c.src.recv
+	if c.push {
+		receiver = c.dest.recv
+	}
+
+	sender := c.src.send
+	if c.push {
+		sender = c.dest.send
+	}
+
+	controller := c.src.sendControl
+	if c.push {
+		controller = c.dest.sendControl
 	}
 
 	header := MigrationHeader{}
-	if err := c.src.recv(&header); err != nil {
+	if err := receiver(&header); err != nil {
 		c.src.container.StorageStop()
-		c.src.sendControl(err)
+		controller(err)
 		return err
 	}
 
@@ -701,9 +729,9 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 		resp.Fs = &myType
 	}
 
-	if err := c.src.send(&resp); err != nil {
+	if err := sender(&resp); err != nil {
 		c.src.container.StorageStop()
-		c.src.sendControl(err)
+		controller(err)
 		return err
 	}
 
@@ -746,7 +774,11 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 				snapshots = header.Snapshots
 			}
 
-			if err := mySink(c.src.live, c.src.container, header.Snapshots, c.src.fsConn, srcIdmap); err != nil {
+			fsConn := c.src.fsConn
+			if c.push {
+				fsConn = c.dest.fsConn
+			}
+			if err := mySink(c.src.live, c.src.container, header.Snapshots, fsConn, srcIdmap); err != nil {
 				fsTransfer <- err
 				return
 			}
@@ -769,7 +801,11 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 
 			defer os.RemoveAll(imagesDir)
 
-			if err := RsyncRecv(shared.AddSlash(imagesDir), c.src.criuConn); err != nil {
+			criuConn := c.src.criuConn
+			if c.push {
+				criuConn = c.dest.criuConn
+			}
+			if err := RsyncRecv(shared.AddSlash(imagesDir), criuConn); err != nil {
 				restore <- err
 				return
 			}
@@ -793,23 +829,28 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 		restore <- nil
 	}(c)
 
-	source := c.src.controlChannel()
+	var source <-chan MigrationControl
+	if c.push {
+		source = c.dest.controlChannel()
+	} else {
+		source = c.src.controlChannel()
+	}
 
 	for {
 		select {
 		case err = <-restore:
 			c.src.container.StorageStop()
-			c.src.sendControl(err)
+			controller(err)
 			return err
 		case msg, ok := <-source:
 			if !ok {
 				c.src.container.StorageStop()
-				c.src.disconnect()
+				disconnector()
 				return fmt.Errorf("Got error reading source")
 			}
 			if !*msg.Success {
 				c.src.container.StorageStop()
-				c.src.disconnect()
+				disconnector()
 				return fmt.Errorf(*msg.Message)
 			} else {
 				// The source can only tell us it failed (e.g. if

From f20b2443858b80d0d2593507f08b679e9a85dbb6 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Fri, 30 Sep 2016 11:16:51 +0200
Subject: [PATCH 10/14] lxd/containers_post: createFromMigration push mode

Adapt createFromMigration() to support push mode. In push mode,
createFromMigration() will now return a operationClassWebsocket. Metadata() will
sent a set of websockets + secrets to the client.

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 lxd/containers_post.go | 22 ++++++++++++++++------
 1 file changed, 16 insertions(+), 6 deletions(-)

diff --git a/lxd/containers_post.go b/lxd/containers_post.go
index 10e2aee..7f2ad2a 100644
--- a/lxd/containers_post.go
+++ b/lxd/containers_post.go
@@ -201,8 +201,9 @@ func createFromNone(d *Daemon, req *containerPostReq) Response {
 }
 
 func createFromMigration(d *Daemon, req *containerPostReq) Response {
-	if req.Source.Mode != "pull" {
-		return NotImplemented
+	push := false
+	if req.Source.Mode == "push" {
+		push = true
 	}
 
 	architecture, err := shared.ArchitectureId(req.Architecture)
@@ -275,9 +276,10 @@ func createFromMigration(d *Daemon, req *containerPostReq) Response {
 			NetDial:         shared.RFC3493Dialer},
 		Container: c,
 		Secrets:   req.Source.Websockets,
+		Push:      push,
 	}
 
-	sink, err := NewMigrationSink(&migrationArgs)
+	sink, err := NewMigrationSink(&migrationArgs, push)
 	if err != nil {
 		c.Delete()
 		return InternalError(err)
@@ -286,9 +288,17 @@ func createFromMigration(d *Daemon, req *containerPostReq) Response {
 	resources := map[string][]string{}
 	resources["containers"] = []string{req.Name}
 
-	op, err := operationCreate(operationClassTask, resources, nil, sink.Do, nil, nil)
-	if err != nil {
-		return InternalError(err)
+	var op *operation
+	if push {
+		op, err = operationCreate(operationClassWebsocket, resources, sink.Metadata(), sink.Do, nil, sink.Connect)
+		if err != nil {
+			return InternalError(err)
+		}
+	} else {
+		op, err = operationCreate(operationClassTask, resources, nil, sink.Do, nil, nil)
+		if err != nil {
+			return InternalError(err)
+		}
 	}
 
 	return OperationResponse(op)

From bc870b786d1afc06aa81a8bbde93e0ac10d325aa Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Fri, 30 Sep 2016 11:32:13 +0200
Subject: [PATCH 11/14] client: MigrateFrom() support push mode

In push mode the client will connect to the websockets from the source server
and do a post on the target server. The target server will be informed to expect
a push and in response return a set of secrets to the client. The client will
then use those secrets to connect to a set of websockets matching those of the
source server. The client will serve as a proxy between and source and target
server and relay the data accordingly.

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 client.go | 145 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 142 insertions(+), 3 deletions(-)

diff --git a/client.go b/client.go
index 58586a3..f2565cc 100644
--- a/client.go
+++ b/client.go
@@ -20,6 +20,7 @@ import (
 	"path/filepath"
 	"strconv"
 	"strings"
+	"sync"
 	"syscall"
 
 	"github.com/gorilla/websocket"
@@ -1991,19 +1992,29 @@ func (c *Client) GetMigrationSourceWS(container string) (*Response, error) {
 	return c.post(url, body, Async)
 }
 
-func (c *Client) MigrateFrom(name string, operation string, certificate string, secrets map[string]string, architecture string, config map[string]string, devices shared.Devices, profiles []string, baseImage string, ephemeral bool) (*Response, error) {
+func (c *Client) MigrateFrom(name string, operation string, certificate string,
+	sourceSecrets map[string]string, architecture string, config map[string]string,
+	devices shared.Devices, profiles []string,
+	baseImage string, ephemeral bool, push bool, sourceClient *Client,
+	sourceOperation string) (*Response, error) {
 	if c.Remote.Public {
 		return nil, fmt.Errorf("This function isn't supported by public remotes.")
 	}
 
 	source := shared.Jmap{
 		"type":        "migration",
-		"mode":        "pull",
 		"operation":   operation,
 		"certificate": certificate,
-		"secrets":     secrets,
+		"secrets":     sourceSecrets,
 		"base-image":  baseImage,
 	}
+
+	if push {
+		source["mode"] = "push"
+	} else {
+		source["mode"] = "pull"
+	}
+
 	body := shared.Jmap{
 		"architecture": architecture,
 		"config":       config,
@@ -2014,6 +2025,134 @@ func (c *Client) MigrateFrom(name string, operation string, certificate string,
 		"source":       source,
 	}
 
+	if source["mode"] == "push" {
+		// Check source server secrets.
+		sourceControlSecret, ok := sourceSecrets["control"]
+		if !ok {
+			return nil, fmt.Errorf("Missing control secret")
+		}
+		sourceFsSecret, ok := sourceSecrets["fs"]
+		if !ok {
+			return nil, fmt.Errorf("Missing fs secret")
+		}
+
+		criuSecret := false
+		sourceCriuSecret, ok := sourceSecrets["criu"]
+		if ok {
+			criuSecret = true
+		}
+
+		// Connect to source server websockets.
+		sourceControlConn, err := sourceClient.Websocket(sourceOperation, sourceControlSecret)
+		if err != nil {
+			return nil, err
+		}
+		sourceFsConn, err := sourceClient.Websocket(sourceOperation, sourceFsSecret)
+		if err != nil {
+			return nil, err
+		}
+
+		var sourceCriuConn *websocket.Conn
+		if criuSecret {
+			sourceCriuConn, err = sourceClient.Websocket(sourceOperation, sourceCriuSecret)
+			if err != nil {
+				return nil, err
+			}
+		}
+
+		// Post to target server and request and retrieve a set of
+		// websockets + secrets matching those of the source server.
+		resp, err := c.post("containers", body, Async)
+		if err != nil {
+			return nil, err
+		}
+
+		destSecrets := map[string]string{}
+		op, err := resp.MetadataAsOperation()
+		if err != nil {
+			return nil, err
+		}
+		for k, v := range *op.Metadata {
+			destSecrets[k] = v.(string)
+		}
+
+		destControlSecret, ok := destSecrets["control"]
+		if !ok {
+			return nil, fmt.Errorf("Missing control secret")
+		}
+		destFsSecret, ok := destSecrets["fs"]
+		if !ok {
+			return nil, fmt.Errorf("Missing fs secret")
+		}
+		destCriuSecret, ok := destSecrets["criu"]
+		if criuSecret && !ok || !criuSecret && ok {
+			return nil, fmt.Errorf("Missing criu secret")
+		}
+
+		// Connect to dest server websockets.
+		destControlConn, err := c.Websocket(resp.Operation, destControlSecret)
+		if err != nil {
+			return nil, err
+		}
+		destFsConn, err := c.Websocket(resp.Operation, destFsSecret)
+		if err != nil {
+			return nil, err
+		}
+
+		var destCriuConn *websocket.Conn
+		if criuSecret {
+			destCriuConn, err = c.Websocket(resp.Operation, destCriuSecret)
+			if err != nil {
+				return nil, err
+			}
+		}
+
+		// Let client shovel data from src to dest server.
+		capacity := 4
+		if criuSecret {
+			capacity += 2
+		}
+		syncChan := make(chan error, capacity)
+		defer close(syncChan)
+
+		proxy := func(src *websocket.Conn, dest *websocket.Conn) {
+			for {
+				mt, payload, err := src.ReadMessage()
+				if err != nil {
+					if err != io.EOF {
+						syncChan <- err
+						break
+					}
+				}
+				/* The gorilla websockets framework doesn't
+				*  allow concurrent writes. */
+				var controlLock sync.Mutex
+				controlLock.Lock()
+				if err = dest.WriteMessage(mt, payload); err != nil {
+					controlLock.Unlock()
+					syncChan <- err
+					break
+				}
+				controlLock.Unlock()
+			}
+		}
+
+		go proxy(sourceControlConn, destControlConn)
+		go proxy(destControlConn, sourceControlConn)
+		go proxy(sourceFsConn, destFsConn)
+		go proxy(destFsConn, sourceFsConn)
+		if criuSecret {
+			go proxy(sourceCriuConn, destCriuConn)
+			go proxy(destCriuConn, sourceCriuConn)
+		}
+
+		for i := 0; i < cap(syncChan); i++ {
+			<-syncChan
+		}
+
+		return resp, nil
+	}
+
 	return c.post("containers", body, Async)
 }
 

From 6f1918f08392407ca3a0f21406f984d5f8ec0cc2 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Fri, 30 Sep 2016 11:35:45 +0200
Subject: [PATCH 12/14] lxc/copy: adapt to changes in MigrateFrom()

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 lxc/copy.go | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/lxc/copy.go b/lxc/copy.go
index cac2d50..68d978e 100644
--- a/lxc/copy.go
+++ b/lxc/copy.go
@@ -204,15 +204,22 @@ func (c *copyCmd) copyContainer(config *lxd.Config, sourceResource string, destR
 		var migration *lxd.Response
 
 		sourceWSUrl := "https://" + addr + sourceWSResponse.Operation
-		migration, err = dest.MigrateFrom(destName, sourceWSUrl, source.Certificate, secrets, status.Architecture, status.Config, status.Devices, status.Profiles, baseImage, ephemeral == 1)
+		migration, err = dest.MigrateFrom(destName, sourceWSUrl, source.Certificate, secrets, status.Architecture, status.Config, status.Devices, status.Profiles, baseImage, ephemeral == 1, false, source, sourceWSResponse.Operation)
 		if err != nil {
 			continue
 		}
 
-		if err = dest.WaitForSuccess(migration.Operation); err != nil {
+		migMap, err := migration.MetadataAsMap()
+		if err != nil {
 			return err
 		}
 
+		if v, ok := (*migMap)["mode"]; ok && v == "pull" {
+			if err = dest.WaitForSuccess(migration.Operation); err != nil {
+				return err
+			}
+		}
+
 		if destResource == "" {
 			op, err := migration.MetadataAsOperation()
 			if err != nil {

From f23bf2023697946642efd65c74635b27c5c273b0 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Fri, 30 Sep 2016 11:50:18 +0200
Subject: [PATCH 13/14] doc/rest-api: add push mode for container creation

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 doc/rest-api.md | 26 +++++++++++++++++++++++++-
 1 file changed, 25 insertions(+), 1 deletion(-)

diff --git a/doc/rest-api.md b/doc/rest-api.md
index 61ad513..ec2ef8f 100644
--- a/doc/rest-api.md
+++ b/doc/rest-api.md
@@ -498,7 +498,7 @@ Input (using a remote container, sent over the migration websocket):
         },
         "source": {"type": "migration",                                                 # Can be: "image", "migration", "copy" or "none"
                    "mode": "pull",                                                      # Only "pull" is supported for now
-                   "operation": "https://10.0.2.3:8443/1.0/operations/<UUID>",          # Full URL to the remote operation (pull mode only)
+                   "operation": "https://10.0.2.3:8443/1.0/operations/<UUID>",          # Full URL to the remote operation
                    "certificate": "PEM certificate",                                    # Optional PEM certificate. If not mentioned, system CA is used.
                    "base-image": "<fingerprint>",                                       # Optional, the base image the container was created from
                    "secrets": {"control": "my-secret-string",                           # Secrets to use when talking to the migration source
@@ -518,6 +518,30 @@ Input (using a local container):
                    "source": "my-old-container"}                                        # Name of the source container
     }
 
+Input (using a remote container, in push mode sent over the migration websocket via client proxying):
+
+    {
+        "name": "my-new-container",                                                     # 64 chars max, ASCII, no slash, no colon and no comma
+        "architecture": "x86_64",
+        "profiles": ["default"],                                                        # List of profiles
+        "ephemeral": true,                                                              # Whether to destroy the container on shutdown
+        "config": {"limits.cpu": "2"},                                                  # Config override.
+        "devices": {                                                                    # optional list of devices the container should have
+            "rootfs": {
+                "path": "/dev/kvm",
+                "type": "unix-char"
+            },
+        },
+        "source": {"type": "migration",                                                 # Can be: "image", "migration", "copy" or "none"
+                   "mode": "push",                                                      # Only "pull" is supported for now
+                   "operation": "https://10.0.2.3:8443/1.0/operations/<UUID>",          # Full URL to the remote operation
+                   "certificate": "PEM certificate",                                    # Optional PEM certificate. If not mentioned, system CA is used.
+                   "base-image": "<fingerprint>",                                       # Optional, the base image the container was created from
+                   "secrets": {"control": "my-secret-string",                           # Secrets to use when talking to the migration source
+                               "criu":    "my-other-secret",
+                               "fs":      "my third secret"},
+    }
+
 ## /1.0/containers/\<name\>
 ### GET
  * Description: Container information

From 02f623679c33a724b42034703d69e6ea471d5559 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at canonical.com>
Date: Fri, 30 Sep 2016 11:37:46 +0200
Subject: [PATCH 14/14] [DO NOT MERGE]: lxc/copy: easy push support test

This adds the --push flag. When --push is passed to lxc copy then push mode
instead of pull mode will be used. This is not intended for inclusion in LXD.
This commit is only here to make it easier for users to test push support.

Signed-off-by: Christian Brauner <christian.brauner at canonical.com>
---
 lxc/copy.go | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/lxc/copy.go b/lxc/copy.go
index 68d978e..20e14f9 100644
--- a/lxc/copy.go
+++ b/lxc/copy.go
@@ -16,6 +16,8 @@ type copyCmd struct {
 	ephem    bool
 }
 
+var usePush bool
+
 func (c *copyCmd) showByDefault() bool {
 	return true
 }
@@ -24,7 +26,7 @@ func (c *copyCmd) usage() string {
 	return i18n.G(
 		`Copy containers within or in between lxd instances.
 
-lxc copy [remote:]<source container> [[remote:]<destination container>] [--ephemeral|e] [--profile|-p <profile>...] [--config|-c <key=value>...]`)
+lxc copy [remote:]<source container> [[remote:]<destination container>] [--ephemeral|e] [--push] [--profile|-p <profile>...] [--config|-c <key=value>...]`)
 }
 
 func (c *copyCmd) flags() {
@@ -34,6 +36,7 @@ func (c *copyCmd) flags() {
 	gnuflag.Var(&c.profArgs, "p", i18n.G("Profile to apply to the new container"))
 	gnuflag.BoolVar(&c.ephem, "ephemeral", false, i18n.G("Ephemeral container"))
 	gnuflag.BoolVar(&c.ephem, "e", false, i18n.G("Ephemeral container"))
+	gnuflag.BoolVar(&usePush, "push", false, i18n.G("Use push mode"))
 }
 
 func (c *copyCmd) copyContainer(config *lxd.Config, sourceResource string, destResource string, keepVolatile bool, ephemeral int) error {
@@ -204,7 +207,7 @@ func (c *copyCmd) copyContainer(config *lxd.Config, sourceResource string, destR
 		var migration *lxd.Response
 
 		sourceWSUrl := "https://" + addr + sourceWSResponse.Operation
-		migration, err = dest.MigrateFrom(destName, sourceWSUrl, source.Certificate, secrets, status.Architecture, status.Config, status.Devices, status.Profiles, baseImage, ephemeral == 1, false, source, sourceWSResponse.Operation)
+		migration, err = dest.MigrateFrom(destName, sourceWSUrl, source.Certificate, secrets, status.Architecture, status.Config, status.Devices, status.Profiles, baseImage, ephemeral == 1, usePush, source, sourceWSResponse.Operation)
 		if err != nil {
 			continue
 		}


More information about the lxc-devel mailing list