[lxc-devel] [lxd/master] storage: prelude

brauner on Github lxc-bot at linuxcontainers.org
Thu Mar 22 17:32:30 UTC 2018


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 364 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180322/9339b86e/attachment.bin>
-------------- next part --------------
From f355d898caa1602770947c11f1090eb344d42d82 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Wed, 21 Mar 2018 12:29:51 +0100
Subject: [PATCH 1/3] migrate: split into migrate and migrate_container

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 lxd/container_post.go     |    2 +-
 lxd/container_snapshot.go |    2 +-
 lxd/migrate.go            | 1029 +--------------------------------------------
 lxd/migrate_container.go  | 1013 ++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 1033 insertions(+), 1013 deletions(-)
 create mode 100644 lxd/migrate_container.go

diff --git a/lxd/container_post.go b/lxd/container_post.go
index f77fbd04d..96548cfff 100644
--- a/lxd/container_post.go
+++ b/lxd/container_post.go
@@ -67,7 +67,7 @@ func containerPost(d *Daemon, r *http.Request) Response {
 
 		if req.Target != nil {
 			// Push mode
-			err := ws.ConnectTarget(*req.Target)
+			err := ws.ConnectContainerTarget(*req.Target)
 			if err != nil {
 				return InternalError(err)
 			}
diff --git a/lxd/container_snapshot.go b/lxd/container_snapshot.go
index 2640609fc..14c7fef12 100644
--- a/lxd/container_snapshot.go
+++ b/lxd/container_snapshot.go
@@ -246,7 +246,7 @@ func snapshotPost(d *Daemon, r *http.Request, sc container, containerName string
 
 		if req.Target != nil {
 			// Push mode
-			err := ws.ConnectTarget(*req.Target)
+			err := ws.ConnectContainerTarget(*req.Target)
 			if err != nil {
 				return InternalError(err)
 			}
diff --git a/lxd/migrate.go b/lxd/migrate.go
index f0cea6253..04a223b1a 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -7,36 +7,23 @@ package main
 
 import (
 	"crypto/x509"
-	"encoding/binary"
 	"encoding/pem"
 	"fmt"
-	"io/ioutil"
 	"net/http"
 	"net/url"
 	"os"
-	"os/exec"
-	"path/filepath"
-	"strconv"
 	"strings"
 	"sync"
 
 	"github.com/golang/protobuf/proto"
 	"github.com/gorilla/websocket"
-	"gopkg.in/lxc/go-lxc.v2"
 
 	"github.com/lxc/lxd/lxd/migration"
-	"github.com/lxc/lxd/lxd/util"
 	"github.com/lxc/lxd/shared"
-	"github.com/lxc/lxd/shared/api"
-	"github.com/lxc/lxd/shared/idmap"
 	"github.com/lxc/lxd/shared/logger"
 )
 
 type migrationFields struct {
-	live bool
-
-	containerOnly bool
-
 	controlSecret string
 	controlConn   *websocket.Conn
 	controlLock   sync.Mutex
@@ -47,7 +34,10 @@ type migrationFields struct {
 	fsSecret string
 	fsConn   *websocket.Conn
 
-	container container
+	// container specific fields
+	live          bool
+	containerOnly bool
+	container     container
 }
 
 func (c *migrationFields) send(m proto.Message) error {
@@ -137,37 +127,6 @@ type migrationSourceWs struct {
 	allConnected chan bool
 }
 
-func NewMigrationSource(c container, stateful bool, containerOnly bool) (*migrationSourceWs, error) {
-	ret := migrationSourceWs{migrationFields{container: c}, make(chan bool, 1)}
-	ret.containerOnly = containerOnly
-
-	var err error
-	ret.controlSecret, err = shared.RandomCryptoString()
-	if err != nil {
-		return nil, err
-	}
-
-	ret.fsSecret, err = shared.RandomCryptoString()
-	if err != nil {
-		return nil, err
-	}
-
-	if stateful && c.IsRunning() {
-		_, err := exec.LookPath("criu")
-		if err != nil {
-			return nil, fmt.Errorf("Unable to perform container live migration. CRIU isn't installed on the source server.")
-		}
-
-		ret.live = true
-		ret.criuSecret, err = shared.RandomCryptoString()
-		if err != nil {
-			return nil, err
-		}
-	}
-
-	return &ret, nil
-}
-
 func (s *migrationSourceWs) Metadata() interface{} {
 	secrets := shared.Jmap{
 		"control": s.controlSecret,
@@ -217,12 +176,12 @@ func (s *migrationSourceWs) Connect(op *operation, r *http.Request, w http.Respo
 	return nil
 }
 
-func (s *migrationSourceWs) ConnectTarget(target api.ContainerPostTarget) error {
+func (s *migrationSourceWs) ConnectTarget(certificate string, operation string, websockets map[string]string) error {
 	var err error
 	var cert *x509.Certificate
 
-	if target.Certificate != "" {
-		certBlock, _ := pem.Decode([]byte(target.Certificate))
+	if certificate != "" {
+		certBlock, _ := pem.Decode([]byte(certificate))
 		if certBlock == nil {
 			return fmt.Errorf("Invalid certificate")
 		}
@@ -243,7 +202,7 @@ func (s *migrationSourceWs) ConnectTarget(target api.ContainerPostTarget) error
 		NetDial:         shared.RFC3493Dialer,
 	}
 
-	for name, secret := range target.Websockets {
+	for name, secret := range websockets {
 		var conn **websocket.Conn
 
 		switch name {
@@ -260,7 +219,7 @@ func (s *migrationSourceWs) ConnectTarget(target api.ContainerPostTarget) error
 		query := url.Values{"secret": []string{secret}}
 
 		// The URL is a https URL to the operation, mangle to be a wss URL to the secret
-		wsUrl := fmt.Sprintf("wss://%s/websocket?%s", strings.TrimPrefix(target.Operation, "https://"), query.Encode())
+		wsUrl := fmt.Sprintf("wss://%s/websocket?%s", strings.TrimPrefix(operation, "https://"), query.Encode())
 
 		wsConn, _, err := dialer.Dial(wsUrl, http.Header{})
 		if err != nil {
@@ -275,609 +234,10 @@ func (s *migrationSourceWs) ConnectTarget(target api.ContainerPostTarget) error
 	return nil
 }
 
-func writeActionScript(directory string, operation string, secret string, execPath string) error {
-	script := fmt.Sprintf(`#!/bin/sh -e
-if [ "$CRTOOLS_SCRIPT_ACTION" = "post-dump" ]; then
-	%s migratedumpsuccess %s %s
-fi
-`, execPath, operation, secret)
-
-	f, err := os.Create(filepath.Join(directory, "action.sh"))
-	if err != nil {
-		return err
-	}
-	defer f.Close()
-
-	err = f.Chmod(0500)
-	if err != nil {
-		return err
-	}
-
-	_, err = f.WriteString(script)
-	return err
-}
-
-func snapshotToProtobuf(c container) *migration.Snapshot {
-	config := []*migration.Config{}
-	for k, v := range c.LocalConfig() {
-		kCopy := string(k)
-		vCopy := string(v)
-		config = append(config, &migration.Config{Key: &kCopy, Value: &vCopy})
-	}
-
-	devices := []*migration.Device{}
-	for name, d := range c.LocalDevices() {
-		props := []*migration.Config{}
-		for k, v := range d {
-			kCopy := string(k)
-			vCopy := string(v)
-			props = append(props, &migration.Config{Key: &kCopy, Value: &vCopy})
-		}
-
-		devices = append(devices, &migration.Device{Name: &name, Config: props})
-	}
-
-	parts := strings.SplitN(c.Name(), shared.SnapshotDelimiter, 2)
-	isEphemeral := c.IsEphemeral()
-	arch := int32(c.Architecture())
-	stateful := c.IsStateful()
-
-	return &migration.Snapshot{
-		Name:         &parts[len(parts)-1],
-		LocalConfig:  config,
-		Profiles:     c.Profiles(),
-		Ephemeral:    &isEphemeral,
-		LocalDevices: devices,
-		Architecture: &arch,
-		Stateful:     &stateful,
-	}
-}
-
-// Check if CRIU supports pre-dumping and number of
-// pre-dump iterations
-func (s *migrationSourceWs) checkForPreDumpSupport() (bool, int) {
-	// Ask CRIU if this architecture/kernel/criu combination
-	// supports pre-copy (dirty memory tracking)
-	criuMigrationArgs := CriuMigrationArgs{
-		cmd:          lxc.MIGRATE_FEATURE_CHECK,
-		stateDir:     "",
-		function:     "feature-check",
-		stop:         false,
-		actionScript: false,
-		dumpDir:      "",
-		preDumpDir:   "",
-		features:     lxc.FEATURE_MEM_TRACK,
-	}
-	err := s.container.Migrate(&criuMigrationArgs)
-
-	if err != nil {
-		// CRIU says it does not know about dirty memory tracking.
-		// This means the rest of this function is irrelevant.
-		return false, 0
-	}
-
-	// CRIU says it can actually do pre-dump. Let's set it to true
-	// unless the user wants something else.
-	use_pre_dumps := true
-
-	// What does the configuration say about pre-copy
-	tmp := s.container.ExpandedConfig()["migration.incremental.memory"]
-
-	if tmp != "" {
-		use_pre_dumps = shared.IsTrue(tmp)
-	}
-	logger.Debugf("migration.incremental.memory %d", use_pre_dumps)
-
-	var max_iterations int
-
-	// migration.incremental.memory.iterations is the value after which the
-	// container will be definitely migrated, even if the remaining number
-	// of memory pages is below the defined threshold.
-	tmp = s.container.ExpandedConfig()["migration.incremental.memory.iterations"]
-	if tmp != "" {
-		max_iterations, _ = strconv.Atoi(tmp)
-	} else {
-		// default to 10
-		max_iterations = 10
-	}
-	if max_iterations > 999 {
-		// the pre-dump directory is hardcoded to a string
-		// with maximal 3 digits. 999 pre-dumps makes no
-		// sense at all, but let's make sure the number
-		// is not higher than this.
-		max_iterations = 999
-	}
-	logger.Debugf("using maximal %d iterations for pre-dumping", max_iterations)
-
-	return use_pre_dumps, max_iterations
-}
-
-// The function readCriuStatsDump() reads the CRIU 'stats-dump' file
-// in path and returns the pages_written, pages_skipped_parent, error.
-func readCriuStatsDump(path string) (uint64, uint64, error) {
-	statsDump := shared.AddSlash(path) + "stats-dump"
-	in, err := ioutil.ReadFile(statsDump)
-	if err != nil {
-		logger.Errorf("Error reading CRIU's 'stats-dump' file: %s", err.Error())
-		return 0, 0, err
-	}
-
-	// According to the CRIU file image format it starts with two magic values.
-	// First magic IMG_SERVICE: 1427134784
-	if binary.LittleEndian.Uint32(in[0:4]) != 1427134784 {
-		msg := "IMG_SERVICE(1427134784) criu magic not found"
-		logger.Errorf(msg)
-		return 0, 0, fmt.Errorf(msg)
-	}
-	// Second magic STATS: 1460220678
-	if binary.LittleEndian.Uint32(in[4:8]) != 1460220678 {
-		msg := "STATS(1460220678) criu magic not found"
-		logger.Errorf(msg)
-		return 0, 0, fmt.Errorf(msg)
-	}
-
-	// Next, read the size of the image payload
-	size := binary.LittleEndian.Uint32(in[8:12])
-	logger.Debugf("stats-dump payload size %d", size)
-
-	statsEntry := &migration.StatsEntry{}
-	if err = proto.Unmarshal(in[12:12+size], statsEntry); err != nil {
-		logger.Errorf("Failed to parse CRIU's 'stats-dump' file: %s", err.Error())
-		return 0, 0, err
-	}
-
-	written := statsEntry.GetDump().GetPagesWritten()
-	skipped := statsEntry.GetDump().GetPagesSkippedParent()
-	return written, skipped, nil
-}
-
-type preDumpLoopArgs struct {
-	checkpointDir string
-	bwlimit       string
-	preDumpDir    string
-	dumpDir       string
-	final         bool
-}
-
-// The function preDumpLoop is the main logic behind the pre-copy migration.
-// This function contains the actual pre-dump, the corresponding rsync
-// transfer and it tells the outer loop to abort if the threshold
-// of memory pages transferred by pre-dumping has been reached.
-func (s *migrationSourceWs) preDumpLoop(args *preDumpLoopArgs) (bool, error) {
-	// Do a CRIU pre-dump
-	criuMigrationArgs := CriuMigrationArgs{
-		cmd:          lxc.MIGRATE_PRE_DUMP,
-		stop:         false,
-		actionScript: false,
-		preDumpDir:   args.preDumpDir,
-		dumpDir:      args.dumpDir,
-		stateDir:     args.checkpointDir,
-		function:     "migration",
-	}
-
-	logger.Debugf("Doing another pre-dump in %s", args.preDumpDir)
-
-	final := args.final
-
-	err := s.container.Migrate(&criuMigrationArgs)
-	if err != nil {
-		return final, err
-	}
-
-	// Send the pre-dump.
-	ctName, _, _ := containerGetParentAndSnapshotName(s.container.Name())
-	state := s.container.DaemonState()
-	err = RsyncSend(ctName, shared.AddSlash(args.checkpointDir), s.criuConn, nil, args.bwlimit, state.OS.ExecPath)
-	if err != nil {
-		return final, err
-	}
-
-	// Read the CRIU's 'stats-dump' file
-	dumpPath := shared.AddSlash(args.checkpointDir)
-	dumpPath += shared.AddSlash(args.dumpDir)
-	written, skipped_parent, err := readCriuStatsDump(dumpPath)
-	if err != nil {
-		return final, err
-	}
-
-	logger.Debugf("CRIU pages written %d", written)
-	logger.Debugf("CRIU pages skipped %d", skipped_parent)
-
-	total_pages := written + skipped_parent
-
-	percentage_skipped := int(100 - ((100 * written) / total_pages))
-
-	logger.Debugf("CRIU pages skipped percentage %d%%", percentage_skipped)
-
-	// threshold is the percentage of memory pages that needs
-	// to be pre-copied for the pre-copy migration to stop.
-	var threshold int
-	tmp := s.container.ExpandedConfig()["migration.incremental.memory.goal"]
-	if tmp != "" {
-		threshold, _ = strconv.Atoi(tmp)
-	} else {
-		// defaults to 70%
-		threshold = 70
-	}
-
-	if percentage_skipped > threshold {
-		logger.Debugf("Memory pages skipped (%d%%) due to pre-copy is larger than threshold (%d%%)", percentage_skipped, threshold)
-		logger.Debugf("This was the last pre-dump; next dump is the final dump")
-		final = true
-	}
-
-	// If in pre-dump mode, the receiving side
-	// expects a message to know if this was the
-	// last pre-dump
-	logger.Debugf("Sending another header")
-	sync := migration.MigrationSync{
-		FinalPreDump: proto.Bool(final),
-	}
-
-	data, err := proto.Marshal(&sync)
-
-	if err != nil {
-		return final, err
-	}
-
-	err = s.criuConn.WriteMessage(websocket.BinaryMessage, data)
-	if err != nil {
-		s.sendControl(err)
-		return final, err
-	}
-	logger.Debugf("Sending another header done")
-
-	return final, nil
-}
-
-func (s *migrationSourceWs) Do(migrateOp *operation) error {
-	<-s.allConnected
-
-	criuType := migration.CRIUType_CRIU_RSYNC.Enum()
-	if !s.live {
-		criuType = nil
-		if s.container.IsRunning() {
-			criuType = migration.CRIUType_NONE.Enum()
-		}
-	}
-
-	// Storage needs to start unconditionally now, since we need to
-	// initialize a new storage interface.
-	ourStart, err := s.container.StorageStart()
-	if err != nil {
-		return err
-	}
-	if ourStart {
-		defer s.container.StorageStop()
-	}
-
-	idmaps := make([]*migration.IDMapType, 0)
-
-	idmapset, err := s.container.IdmapSet()
-	if err != nil {
-		return err
-	}
-
-	if idmapset != nil {
-		for _, ctnIdmap := range idmapset.Idmap {
-			idmap := migration.IDMapType{
-				Isuid:    proto.Bool(ctnIdmap.Isuid),
-				Isgid:    proto.Bool(ctnIdmap.Isgid),
-				Hostid:   proto.Int32(int32(ctnIdmap.Hostid)),
-				Nsid:     proto.Int32(int32(ctnIdmap.Nsid)),
-				Maprange: proto.Int32(int32(ctnIdmap.Maprange)),
-			}
-
-			idmaps = append(idmaps, &idmap)
-		}
-	}
-
-	driver, fsErr := s.container.Storage().MigrationSource(s.container, s.containerOnly)
-
-	snapshots := []*migration.Snapshot{}
-	snapshotNames := []string{}
-	// Only send snapshots when requested.
-	if !s.containerOnly {
-		if fsErr == nil {
-			fullSnaps := driver.Snapshots()
-			for _, snap := range fullSnaps {
-				snapshots = append(snapshots, snapshotToProtobuf(snap))
-				snapshotNames = append(snapshotNames, shared.ExtractSnapshotName(snap.Name()))
-			}
-		}
-	}
-
-	use_pre_dumps := false
-	max_iterations := 0
-	if s.live {
-		use_pre_dumps, max_iterations = s.checkForPreDumpSupport()
-	}
-
-	// The protocol says we have to send a header no matter what, so let's
-	// do that, but then immediately send an error.
-	myType := s.container.Storage().MigrationType()
-	header := migration.MigrationHeader{
-		Fs:            &myType,
-		Criu:          criuType,
-		Idmap:         idmaps,
-		SnapshotNames: snapshotNames,
-		Snapshots:     snapshots,
-		Predump:       proto.Bool(use_pre_dumps),
-	}
-
-	err = s.send(&header)
-	if err != nil {
-		s.sendControl(err)
-		return err
-	}
-
-	if fsErr != nil {
-		s.sendControl(fsErr)
-		return fsErr
-	}
-
-	err = s.recv(&header)
-	if err != nil {
-		s.sendControl(err)
-		return err
-	}
-
-	bwlimit := ""
-	if *header.Fs != myType {
-		myType = migration.MigrationFSType_RSYNC
-		header.Fs = &myType
-
-		driver, _ = rsyncMigrationSource(s.container, s.containerOnly)
-
-		// Check if this storage pool has a rate limit set for rsync.
-		poolwritable := s.container.Storage().GetStoragePoolWritable()
-		if poolwritable.Config != nil {
-			bwlimit = poolwritable.Config["rsync.bwlimit"]
-		}
-	}
-
-	// Check if the other side knows about pre-dumping and
-	// the associated rsync protocol
-	use_pre_dumps = header.GetPredump()
-	if use_pre_dumps {
-		logger.Debugf("The other side does support pre-copy")
-	} else {
-		logger.Debugf("The other side does not support pre-copy")
-	}
-
-	// All failure paths need to do a few things to correctly handle errors before returning.
-	// Unfortunately, handling errors is not well-suited to defer as the code depends on the
-	// status of driver and the error value.  The error value is especially tricky due to the
-	// common case of creating a new err variable (intentional or not) due to scoping and use
-	// of ":=".  Capturing err in a closure for use in defer would be fragile, which defeats
-	// the purpose of using defer.  An abort function reduces the odds of mishandling errors
-	// without introducing the fragility of closing on err.
-	abort := func(err error) error {
-		driver.Cleanup()
-		s.sendControl(err)
-		return err
-	}
-
-	err = driver.SendWhileRunning(s.fsConn, migrateOp, bwlimit, s.containerOnly)
-	if err != nil {
-		return abort(err)
-	}
-
-	restoreSuccess := make(chan bool, 1)
-	dumpSuccess := make(chan error, 1)
-
-	if s.live {
-		if header.Criu == nil {
-			return abort(fmt.Errorf("Got no CRIU socket type for live migration"))
-		} else if *header.Criu != migration.CRIUType_CRIU_RSYNC {
-			return abort(fmt.Errorf("Formats other than criu rsync not understood"))
-		}
-
-		checkpointDir, err := ioutil.TempDir("", "lxd_checkpoint_")
-		if err != nil {
-			return abort(err)
-		}
-
-		if util.RuntimeLiblxcVersionAtLeast(2, 0, 4) {
-			/* What happens below is slightly convoluted. Due to various
-			 * complications with networking, there's no easy way for criu
-			 * to exit and leave the container in a frozen state for us to
-			 * somehow resume later.
-			 *
-			 * Instead, we use what criu calls an "action-script", which is
-			 * basically a callback that lets us know when the dump is
-			 * done. (Unfortunately, we can't pass arguments, just an
-			 * executable path, so we write a custom action script with the
-			 * real command we want to run.)
-			 *
-			 * This script then hangs until the migration operation either
-			 * finishes successfully or fails, and exits 1 or 0, which
-			 * causes criu to either leave the container running or kill it
-			 * as we asked.
-			 */
-			dumpDone := make(chan bool, 1)
-			actionScriptOpSecret, err := shared.RandomCryptoString()
-			if err != nil {
-				os.RemoveAll(checkpointDir)
-				return abort(err)
-			}
-
-			state := s.container.DaemonState()
-			actionScriptOp, err := operationCreate(
-				state.Cluster,
-				operationClassWebsocket,
-				"Live-migrating container",
-				nil,
-				nil,
-				func(op *operation) error {
-					result := <-restoreSuccess
-					if !result {
-						return fmt.Errorf("restore failed, failing CRIU")
-					}
-					return nil
-				},
-				nil,
-				func(op *operation, r *http.Request, w http.ResponseWriter) error {
-					secret := r.FormValue("secret")
-					if secret == "" {
-						return fmt.Errorf("missing secret")
-					}
-
-					if secret != actionScriptOpSecret {
-						return os.ErrPermission
-					}
-
-					c, err := shared.WebsocketUpgrader.Upgrade(w, r, nil)
-					if err != nil {
-						return err
-					}
-
-					dumpDone <- true
-
-					closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
-					return c.WriteMessage(websocket.CloseMessage, closeMsg)
-				},
-			)
-			if err != nil {
-				os.RemoveAll(checkpointDir)
-				return abort(err)
-			}
-
-			err = writeActionScript(checkpointDir, actionScriptOp.url, actionScriptOpSecret, state.OS.ExecPath)
-			if err != nil {
-				os.RemoveAll(checkpointDir)
-				return abort(err)
-			}
-
-			preDumpCounter := 0
-			preDumpDir := ""
-			if use_pre_dumps {
-				final := false
-				for !final {
-					preDumpCounter++
-					if preDumpCounter < max_iterations {
-						final = false
-					} else {
-						final = true
-					}
-					dumpDir := fmt.Sprintf("%03d", preDumpCounter)
-					loop_args := preDumpLoopArgs{
-						checkpointDir: checkpointDir,
-						bwlimit:       bwlimit,
-						preDumpDir:    preDumpDir,
-						dumpDir:       dumpDir,
-						final:         final,
-					}
-					final, err = s.preDumpLoop(&loop_args)
-					if err != nil {
-						os.RemoveAll(checkpointDir)
-						return abort(err)
-					}
-					preDumpDir = fmt.Sprintf("%03d", preDumpCounter)
-					preDumpCounter++
-				}
-			}
-
-			_, err = actionScriptOp.Run()
-			if err != nil {
-				os.RemoveAll(checkpointDir)
-				return abort(err)
-			}
-
-			go func() {
-				criuMigrationArgs := CriuMigrationArgs{
-					cmd:          lxc.MIGRATE_DUMP,
-					stop:         true,
-					actionScript: true,
-					preDumpDir:   preDumpDir,
-					dumpDir:      "final",
-					stateDir:     checkpointDir,
-					function:     "migration",
-				}
-
-				// Do the final CRIU dump. This is needs no special
-				// handling if pre-dumps are used or not
-				dumpSuccess <- s.container.Migrate(&criuMigrationArgs)
-				os.RemoveAll(checkpointDir)
-			}()
-
-			select {
-			/* the checkpoint failed, let's just abort */
-			case err = <-dumpSuccess:
-				return abort(err)
-			/* the dump finished, let's continue on to the restore */
-			case <-dumpDone:
-				logger.Debugf("Dump finished, continuing with restore...")
-			}
-		} else {
-			logger.Debugf("liblxc version is older than 2.0.4 and the live migration will probably fail")
-			defer os.RemoveAll(checkpointDir)
-			criuMigrationArgs := CriuMigrationArgs{
-				cmd:          lxc.MIGRATE_DUMP,
-				stateDir:     checkpointDir,
-				function:     "migration",
-				stop:         true,
-				actionScript: false,
-				dumpDir:      "final",
-				preDumpDir:   "",
-			}
-
-			err = s.container.Migrate(&criuMigrationArgs)
-			if err != nil {
-				return abort(err)
-			}
-		}
-
-		/*
-		 * We do the serially right now, but there's really no reason for us
-		 * to; since we have separate websockets, we can do it in parallel if
-		 * we wanted to. However, assuming we're network bound, there's really
-		 * no reason to do these in parallel. In the future when we're using
-		 * p.haul's protocol, it will make sense to do these in parallel.
-		 */
-		ctName, _, _ := containerGetParentAndSnapshotName(s.container.Name())
-		state := s.container.DaemonState()
-		err = RsyncSend(ctName, shared.AddSlash(checkpointDir), s.criuConn, nil, bwlimit, state.OS.ExecPath)
-		if err != nil {
-			return abort(err)
-		}
-	}
-
-	if s.live || (header.Criu != nil && *header.Criu == migration.CRIUType_NONE) {
-		err = driver.SendAfterCheckpoint(s.fsConn, bwlimit)
-		if err != nil {
-			return abort(err)
-		}
-	}
-
-	driver.Cleanup()
-
-	msg := migration.MigrationControl{}
-	err = s.recv(&msg)
-	if err != nil {
-		s.disconnect()
-		return err
-	}
-
-	if s.live {
-		restoreSuccess <- *msg.Success
-		err := <-dumpSuccess
-		if err != nil {
-			logger.Errorf("dump failed after successful restore?: %q", err)
-		}
-	}
-
-	if !*msg.Success {
-		return fmt.Errorf(*msg.Message)
-	}
-
-	return nil
-}
-
 type migrationSink struct {
-	// We are pulling the container from src in pull mode.
+	// We are pulling the entity from src in pull mode.
 	src migrationFields
-	// The container is pushed from src to dest in push mode. Note that
+	// The entity is pushed from src to dest in push mode. Note that
 	// websocket connections are not set in push mode. Only the secret
 	// fields are used since the client will connect to the sockets.
 	dest migrationFields
@@ -889,73 +249,17 @@ type migrationSink struct {
 }
 
 type MigrationSinkArgs struct {
-	Url           string
-	Dialer        websocket.Dialer
-	Container     container
-	Secrets       map[string]string
-	Push          bool
+	Url     string
+	Dialer  websocket.Dialer
+	Secrets map[string]string
+	Push    bool
+
+	// container specific fields
 	Live          bool
+	Container     container
 	ContainerOnly bool
 }
 
-func NewMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) {
-	sink := migrationSink{
-		src:    migrationFields{container: args.Container, containerOnly: args.ContainerOnly},
-		dest:   migrationFields{containerOnly: args.ContainerOnly},
-		url:    args.Url,
-		dialer: args.Dialer,
-		push:   args.Push,
-	}
-
-	if sink.push {
-		sink.allConnected = make(chan bool, 1)
-	}
-
-	var ok bool
-	var err error
-	if sink.push {
-		sink.dest.controlSecret, err = shared.RandomCryptoString()
-		if err != nil {
-			return nil, err
-		}
-
-		sink.dest.fsSecret, err = shared.RandomCryptoString()
-		if err != nil {
-			return nil, err
-		}
-
-		sink.dest.live = args.Live
-		if sink.dest.live {
-			sink.dest.criuSecret, err = shared.RandomCryptoString()
-			if err != nil {
-				return nil, err
-			}
-		}
-	} else {
-		sink.src.controlSecret, ok = args.Secrets["control"]
-		if !ok {
-			return nil, fmt.Errorf("Missing control secret")
-		}
-
-		sink.src.fsSecret, ok = args.Secrets["fs"]
-		if !ok {
-			return nil, fmt.Errorf("Missing fs secret")
-		}
-
-		sink.src.criuSecret, ok = args.Secrets["criu"]
-		sink.src.live = ok
-	}
-
-	_, err = exec.LookPath("criu")
-	if sink.push && sink.dest.live && err != nil {
-		return nil, fmt.Errorf("Unable to perform container live migration. CRIU isn't installed on the destination server.")
-	} else if sink.src.live && err != nil {
-		return nil, fmt.Errorf("Unable to perform container live migration. CRIU isn't installed on the destination server.")
-	}
-
-	return &sink, nil
-}
-
 func (c *migrationSink) connectWithSecret(secret string) (*websocket.Conn, error) {
 	query := url.Values{"secret": []string{secret}}
 
@@ -1017,300 +321,3 @@ func (s *migrationSink) Connect(op *operation, r *http.Request, w http.ResponseW
 
 	return nil
 }
-
-func (c *migrationSink) Do(migrateOp *operation) error {
-	var err error
-
-	if c.push {
-		<-c.allConnected
-	}
-
-	disconnector := c.src.disconnect
-	if c.push {
-		disconnector = c.dest.disconnect
-	}
-
-	if c.push {
-		defer disconnector()
-	} else {
-		c.src.controlConn, err = c.connectWithSecret(c.src.controlSecret)
-		if err != nil {
-			return err
-		}
-		defer c.src.disconnect()
-
-		c.src.fsConn, err = c.connectWithSecret(c.src.fsSecret)
-		if err != nil {
-			c.src.sendControl(err)
-			return err
-		}
-
-		if c.src.live {
-			c.src.criuConn, err = c.connectWithSecret(c.src.criuSecret)
-			if err != nil {
-				c.src.sendControl(err)
-				return err
-			}
-		}
-	}
-
-	receiver := c.src.recv
-	if c.push {
-		receiver = c.dest.recv
-	}
-
-	sender := c.src.send
-	if c.push {
-		sender = c.dest.send
-	}
-
-	controller := c.src.sendControl
-	if c.push {
-		controller = c.dest.sendControl
-	}
-
-	header := migration.MigrationHeader{}
-	if err := receiver(&header); err != nil {
-		controller(err)
-		return err
-	}
-
-	live := c.src.live
-	if c.push {
-		live = c.dest.live
-	}
-
-	criuType := migration.CRIUType_CRIU_RSYNC.Enum()
-	if header.Criu != nil && *header.Criu == migration.CRIUType_NONE {
-		criuType = migration.CRIUType_NONE.Enum()
-	} else {
-		if !live {
-			criuType = nil
-		}
-	}
-
-	mySink := c.src.container.Storage().MigrationSink
-	myType := c.src.container.Storage().MigrationType()
-	resp := migration.MigrationHeader{
-		Fs:   &myType,
-		Criu: criuType,
-	}
-
-	// If the storage type the source has doesn't match what we have, then
-	// we have to use rsync.
-	if *header.Fs != *resp.Fs {
-		mySink = rsyncMigrationSink
-		myType = migration.MigrationFSType_RSYNC
-		resp.Fs = &myType
-	}
-
-	if header.GetPredump() == true {
-		// If the other side wants pre-dump and if
-		// this side supports it, let's use it.
-		resp.Predump = proto.Bool(true)
-	} else {
-		resp.Predump = proto.Bool(false)
-	}
-
-	err = sender(&resp)
-	if err != nil {
-		controller(err)
-		return err
-	}
-
-	restore := make(chan error)
-	go func(c *migrationSink) {
-		imagesDir := ""
-		srcIdmap := new(idmap.IdmapSet)
-
-		for _, idmapSet := range header.Idmap {
-			e := idmap.IdmapEntry{
-				Isuid:    *idmapSet.Isuid,
-				Isgid:    *idmapSet.Isgid,
-				Nsid:     int64(*idmapSet.Nsid),
-				Hostid:   int64(*idmapSet.Hostid),
-				Maprange: int64(*idmapSet.Maprange)}
-			srcIdmap.Idmap = idmap.Extend(srcIdmap.Idmap, e)
-		}
-
-		/* We do the fs receive in parallel so we don't have to reason
-		 * about when to receive what. The sending side is smart enough
-		 * to send the filesystem bits that it can before it seizes the
-		 * container to start checkpointing, so the total transfer time
-		 * will be minimized even if we're dumb here.
-		 */
-		fsTransfer := make(chan error)
-		go func() {
-			snapshots := []*migration.Snapshot{}
-
-			/* Legacy: we only sent the snapshot names, so we just
-			 * copy the container's config over, same as we used to
-			 * do.
-			 */
-			if len(header.SnapshotNames) != len(header.Snapshots) {
-				for _, name := range header.SnapshotNames {
-					base := snapshotToProtobuf(c.src.container)
-					base.Name = &name
-					snapshots = append(snapshots, base)
-				}
-			} else {
-				snapshots = header.Snapshots
-			}
-
-			var fsConn *websocket.Conn
-			if c.push {
-				fsConn = c.dest.fsConn
-			} else {
-				fsConn = c.src.fsConn
-			}
-
-			sendFinalFsDelta := false
-			if live {
-				sendFinalFsDelta = true
-			}
-
-			if criuType != nil && *criuType == migration.CRIUType_NONE {
-				sendFinalFsDelta = true
-			}
-
-			err = mySink(sendFinalFsDelta, c.src.container,
-				snapshots, fsConn, srcIdmap, migrateOp,
-				c.src.containerOnly)
-			if err != nil {
-				fsTransfer <- err
-				return
-			}
-
-			err = ShiftIfNecessary(c.src.container, srcIdmap)
-			if err != nil {
-				fsTransfer <- err
-				return
-			}
-
-			fsTransfer <- nil
-		}()
-
-		if live {
-			var err error
-			imagesDir, err = ioutil.TempDir("", "lxd_restore_")
-			if err != nil {
-				restore <- err
-				return
-			}
-
-			defer os.RemoveAll(imagesDir)
-
-			var criuConn *websocket.Conn
-			if c.push {
-				criuConn = c.dest.criuConn
-			} else {
-				criuConn = c.src.criuConn
-			}
-
-			sync := &migration.MigrationSync{
-				FinalPreDump: proto.Bool(false),
-			}
-
-			if resp.GetPredump() {
-				logger.Debugf("Before the receive loop %s", sync.GetFinalPreDump())
-				for !sync.GetFinalPreDump() {
-					logger.Debugf("About to receive rsync")
-					// Transfer a CRIU pre-dump
-					err = RsyncRecv(shared.AddSlash(imagesDir), criuConn, nil)
-					if err != nil {
-						restore <- err
-						return
-					}
-					logger.Debugf("rsync receive done")
-
-					logger.Debugf("About to receive header")
-					// Check if this was the last pre-dump
-					// Only the FinalPreDump element if of interest
-					mtype, data, err := criuConn.ReadMessage()
-					if err != nil {
-						logger.Debugf("err %s", err)
-						restore <- err
-						return
-					}
-					if mtype != websocket.BinaryMessage {
-						restore <- err
-						return
-					}
-					err = proto.Unmarshal(data, sync)
-					if err != nil {
-						logger.Debugf("err %s", err)
-						restore <- err
-						return
-					}
-					logger.Debugf("At the end of the receive loop %s", sync.GetFinalPreDump())
-				}
-			}
-
-			// Final CRIU dump
-			err = RsyncRecv(shared.AddSlash(imagesDir), criuConn, nil)
-			if err != nil {
-				restore <- err
-				return
-			}
-		}
-
-		err := <-fsTransfer
-		if err != nil {
-			restore <- err
-			return
-		}
-
-		if live {
-			criuMigrationArgs := CriuMigrationArgs{
-				cmd:          lxc.MIGRATE_RESTORE,
-				stateDir:     imagesDir,
-				function:     "migration",
-				stop:         false,
-				actionScript: false,
-				dumpDir:      "final",
-				preDumpDir:   "",
-			}
-
-			// Currently we only do a single CRIU pre-dump so we
-			// can hardcode "final" here since we know that "final" is the
-			// folder for CRIU's final dump.
-			err = c.src.container.Migrate(&criuMigrationArgs)
-			if err != nil {
-				restore <- err
-				return
-			}
-
-		}
-
-		restore <- nil
-	}(c)
-
-	var source <-chan migration.MigrationControl
-	if c.push {
-		source = c.dest.controlChannel()
-	} else {
-		source = c.src.controlChannel()
-	}
-
-	for {
-		select {
-		case err = <-restore:
-			controller(err)
-			return err
-		case msg, ok := <-source:
-			if !ok {
-				disconnector()
-				return fmt.Errorf("Got error reading source")
-			}
-			if !*msg.Success {
-				disconnector()
-				return fmt.Errorf(*msg.Message)
-			} else {
-				// The source can only tell us it failed (e.g. if
-				// checkpointing failed). We have to tell the source
-				// whether or not the restore was successful.
-				logger.Debugf("Unknown message %v from source", msg)
-			}
-		}
-	}
-}
diff --git a/lxd/migrate_container.go b/lxd/migrate_container.go
new file mode 100644
index 000000000..58300cccc
--- /dev/null
+++ b/lxd/migrate_container.go
@@ -0,0 +1,1013 @@
+package main
+
+import (
+	"encoding/binary"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"os"
+	"os/exec"
+	"path/filepath"
+	"strconv"
+	"strings"
+
+	"github.com/golang/protobuf/proto"
+	"github.com/gorilla/websocket"
+	"gopkg.in/lxc/go-lxc.v2"
+
+	"github.com/lxc/lxd/lxd/migration"
+	"github.com/lxc/lxd/lxd/util"
+	"github.com/lxc/lxd/shared"
+	"github.com/lxc/lxd/shared/api"
+	"github.com/lxc/lxd/shared/idmap"
+	"github.com/lxc/lxd/shared/logger"
+)
+
+func NewMigrationSource(c container, stateful bool, containerOnly bool) (*migrationSourceWs, error) {
+	ret := migrationSourceWs{migrationFields{container: c}, make(chan bool, 1)}
+	ret.containerOnly = containerOnly
+
+	var err error
+	ret.controlSecret, err = shared.RandomCryptoString()
+	if err != nil {
+		return nil, err
+	}
+
+	ret.fsSecret, err = shared.RandomCryptoString()
+	if err != nil {
+		return nil, err
+	}
+
+	if stateful && c.IsRunning() {
+		_, err := exec.LookPath("criu")
+		if err != nil {
+			return nil, fmt.Errorf("Unable to perform container live migration. CRIU isn't installed on the source server.")
+		}
+
+		ret.live = true
+		ret.criuSecret, err = shared.RandomCryptoString()
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return &ret, nil
+}
+
+func writeActionScript(directory string, operation string, secret string, execPath string) error {
+	script := fmt.Sprintf(`#!/bin/sh -e
+if [ "$CRTOOLS_SCRIPT_ACTION" = "post-dump" ]; then
+	%s migratedumpsuccess %s %s
+fi
+`, execPath, operation, secret)
+
+	f, err := os.Create(filepath.Join(directory, "action.sh"))
+	if err != nil {
+		return err
+	}
+	defer f.Close()
+
+	err = f.Chmod(0500)
+	if err != nil {
+		return err
+	}
+
+	_, err = f.WriteString(script)
+	return err
+}
+
+func snapshotToProtobuf(c container) *migration.Snapshot {
+	config := []*migration.Config{}
+	for k, v := range c.LocalConfig() {
+		kCopy := string(k)
+		vCopy := string(v)
+		config = append(config, &migration.Config{Key: &kCopy, Value: &vCopy})
+	}
+
+	devices := []*migration.Device{}
+	for name, d := range c.LocalDevices() {
+		props := []*migration.Config{}
+		for k, v := range d {
+			kCopy := string(k)
+			vCopy := string(v)
+			props = append(props, &migration.Config{Key: &kCopy, Value: &vCopy})
+		}
+
+		devices = append(devices, &migration.Device{Name: &name, Config: props})
+	}
+
+	parts := strings.SplitN(c.Name(), shared.SnapshotDelimiter, 2)
+	isEphemeral := c.IsEphemeral()
+	arch := int32(c.Architecture())
+	stateful := c.IsStateful()
+
+	return &migration.Snapshot{
+		Name:         &parts[len(parts)-1],
+		LocalConfig:  config,
+		Profiles:     c.Profiles(),
+		Ephemeral:    &isEphemeral,
+		LocalDevices: devices,
+		Architecture: &arch,
+		Stateful:     &stateful,
+	}
+}
+
+// Check if CRIU supports pre-dumping and number of
+// pre-dump iterations
+func (s *migrationSourceWs) checkForPreDumpSupport() (bool, int) {
+	// Ask CRIU if this architecture/kernel/criu combination
+	// supports pre-copy (dirty memory tracking)
+	criuMigrationArgs := CriuMigrationArgs{
+		cmd:          lxc.MIGRATE_FEATURE_CHECK,
+		stateDir:     "",
+		function:     "feature-check",
+		stop:         false,
+		actionScript: false,
+		dumpDir:      "",
+		preDumpDir:   "",
+		features:     lxc.FEATURE_MEM_TRACK,
+	}
+	err := s.container.Migrate(&criuMigrationArgs)
+
+	if err != nil {
+		// CRIU says it does not know about dirty memory tracking.
+		// This means the rest of this function is irrelevant.
+		return false, 0
+	}
+
+	// CRIU says it can actually do pre-dump. Let's set it to true
+	// unless the user wants something else.
+	use_pre_dumps := true
+
+	// What does the configuration say about pre-copy
+	tmp := s.container.ExpandedConfig()["migration.incremental.memory"]
+
+	if tmp != "" {
+		use_pre_dumps = shared.IsTrue(tmp)
+	}
+	logger.Debugf("migration.incremental.memory %d", use_pre_dumps)
+
+	var max_iterations int
+
+	// migration.incremental.memory.iterations is the value after which the
+	// container will be definitely migrated, even if the remaining number
+	// of memory pages is below the defined threshold.
+	tmp = s.container.ExpandedConfig()["migration.incremental.memory.iterations"]
+	if tmp != "" {
+		max_iterations, _ = strconv.Atoi(tmp)
+	} else {
+		// default to 10
+		max_iterations = 10
+	}
+	if max_iterations > 999 {
+		// the pre-dump directory is hardcoded to a string
+		// with maximal 3 digits. 999 pre-dumps makes no
+		// sense at all, but let's make sure the number
+		// is not higher than this.
+		max_iterations = 999
+	}
+	logger.Debugf("using maximal %d iterations for pre-dumping", max_iterations)
+
+	return use_pre_dumps, max_iterations
+}
+
+// The function readCriuStatsDump() reads the CRIU 'stats-dump' file
+// in path and returns the pages_written, pages_skipped_parent, error.
+func readCriuStatsDump(path string) (uint64, uint64, error) {
+	statsDump := shared.AddSlash(path) + "stats-dump"
+	in, err := ioutil.ReadFile(statsDump)
+	if err != nil {
+		logger.Errorf("Error reading CRIU's 'stats-dump' file: %s", err.Error())
+		return 0, 0, err
+	}
+
+	// According to the CRIU file image format it starts with two magic values.
+	// First magic IMG_SERVICE: 1427134784
+	if binary.LittleEndian.Uint32(in[0:4]) != 1427134784 {
+		msg := "IMG_SERVICE(1427134784) criu magic not found"
+		logger.Errorf(msg)
+		return 0, 0, fmt.Errorf(msg)
+	}
+	// Second magic STATS: 1460220678
+	if binary.LittleEndian.Uint32(in[4:8]) != 1460220678 {
+		msg := "STATS(1460220678) criu magic not found"
+		logger.Errorf(msg)
+		return 0, 0, fmt.Errorf(msg)
+	}
+
+	// Next, read the size of the image payload
+	size := binary.LittleEndian.Uint32(in[8:12])
+	logger.Debugf("stats-dump payload size %d", size)
+
+	statsEntry := &migration.StatsEntry{}
+	if err = proto.Unmarshal(in[12:12+size], statsEntry); err != nil {
+		logger.Errorf("Failed to parse CRIU's 'stats-dump' file: %s", err.Error())
+		return 0, 0, err
+	}
+
+	written := statsEntry.GetDump().GetPagesWritten()
+	skipped := statsEntry.GetDump().GetPagesSkippedParent()
+	return written, skipped, nil
+}
+
+type preDumpLoopArgs struct {
+	checkpointDir string
+	bwlimit       string
+	preDumpDir    string
+	dumpDir       string
+	final         bool
+}
+
+// The function preDumpLoop is the main logic behind the pre-copy migration.
+// This function contains the actual pre-dump, the corresponding rsync
+// transfer and it tells the outer loop to abort if the threshold
+// of memory pages transferred by pre-dumping has been reached.
+func (s *migrationSourceWs) preDumpLoop(args *preDumpLoopArgs) (bool, error) {
+	// Do a CRIU pre-dump
+	criuMigrationArgs := CriuMigrationArgs{
+		cmd:          lxc.MIGRATE_PRE_DUMP,
+		stop:         false,
+		actionScript: false,
+		preDumpDir:   args.preDumpDir,
+		dumpDir:      args.dumpDir,
+		stateDir:     args.checkpointDir,
+		function:     "migration",
+	}
+
+	logger.Debugf("Doing another pre-dump in %s", args.preDumpDir)
+
+	final := args.final
+
+	err := s.container.Migrate(&criuMigrationArgs)
+	if err != nil {
+		return final, err
+	}
+
+	// Send the pre-dump.
+	ctName, _, _ := containerGetParentAndSnapshotName(s.container.Name())
+	state := s.container.DaemonState()
+	err = RsyncSend(ctName, shared.AddSlash(args.checkpointDir), s.criuConn, nil, args.bwlimit, state.OS.ExecPath)
+	if err != nil {
+		return final, err
+	}
+
+	// Read the CRIU's 'stats-dump' file
+	dumpPath := shared.AddSlash(args.checkpointDir)
+	dumpPath += shared.AddSlash(args.dumpDir)
+	written, skipped_parent, err := readCriuStatsDump(dumpPath)
+	if err != nil {
+		return final, err
+	}
+
+	logger.Debugf("CRIU pages written %d", written)
+	logger.Debugf("CRIU pages skipped %d", skipped_parent)
+
+	total_pages := written + skipped_parent
+
+	percentage_skipped := int(100 - ((100 * written) / total_pages))
+
+	logger.Debugf("CRIU pages skipped percentage %d%%", percentage_skipped)
+
+	// threshold is the percentage of memory pages that needs
+	// to be pre-copied for the pre-copy migration to stop.
+	var threshold int
+	tmp := s.container.ExpandedConfig()["migration.incremental.memory.goal"]
+	if tmp != "" {
+		threshold, _ = strconv.Atoi(tmp)
+	} else {
+		// defaults to 70%
+		threshold = 70
+	}
+
+	if percentage_skipped > threshold {
+		logger.Debugf("Memory pages skipped (%d%%) due to pre-copy is larger than threshold (%d%%)", percentage_skipped, threshold)
+		logger.Debugf("This was the last pre-dump; next dump is the final dump")
+		final = true
+	}
+
+	// If in pre-dump mode, the receiving side
+	// expects a message to know if this was the
+	// last pre-dump
+	logger.Debugf("Sending another header")
+	sync := migration.MigrationSync{
+		FinalPreDump: proto.Bool(final),
+	}
+
+	data, err := proto.Marshal(&sync)
+
+	if err != nil {
+		return final, err
+	}
+
+	err = s.criuConn.WriteMessage(websocket.BinaryMessage, data)
+	if err != nil {
+		s.sendControl(err)
+		return final, err
+	}
+	logger.Debugf("Sending another header done")
+
+	return final, nil
+}
+
+func (s *migrationSourceWs) Do(migrateOp *operation) error {
+	<-s.allConnected
+
+	criuType := migration.CRIUType_CRIU_RSYNC.Enum()
+	if !s.live {
+		criuType = nil
+		if s.container.IsRunning() {
+			criuType = migration.CRIUType_NONE.Enum()
+		}
+	}
+
+	// Storage needs to start unconditionally now, since we need to
+	// initialize a new storage interface.
+	ourStart, err := s.container.StorageStart()
+	if err != nil {
+		return err
+	}
+	if ourStart {
+		defer s.container.StorageStop()
+	}
+
+	idmaps := make([]*migration.IDMapType, 0)
+
+	idmapset, err := s.container.IdmapSet()
+	if err != nil {
+		return err
+	}
+
+	if idmapset != nil {
+		for _, ctnIdmap := range idmapset.Idmap {
+			idmap := migration.IDMapType{
+				Isuid:    proto.Bool(ctnIdmap.Isuid),
+				Isgid:    proto.Bool(ctnIdmap.Isgid),
+				Hostid:   proto.Int32(int32(ctnIdmap.Hostid)),
+				Nsid:     proto.Int32(int32(ctnIdmap.Nsid)),
+				Maprange: proto.Int32(int32(ctnIdmap.Maprange)),
+			}
+
+			idmaps = append(idmaps, &idmap)
+		}
+	}
+
+	driver, fsErr := s.container.Storage().MigrationSource(s.container, s.containerOnly)
+
+	snapshots := []*migration.Snapshot{}
+	snapshotNames := []string{}
+	// Only send snapshots when requested.
+	if !s.containerOnly {
+		if fsErr == nil {
+			fullSnaps := driver.Snapshots()
+			for _, snap := range fullSnaps {
+				snapshots = append(snapshots, snapshotToProtobuf(snap))
+				snapshotNames = append(snapshotNames, shared.ExtractSnapshotName(snap.Name()))
+			}
+		}
+	}
+
+	use_pre_dumps := false
+	max_iterations := 0
+	if s.live {
+		use_pre_dumps, max_iterations = s.checkForPreDumpSupport()
+	}
+
+	// The protocol says we have to send a header no matter what, so let's
+	// do that, but then immediately send an error.
+	myType := s.container.Storage().MigrationType()
+	header := migration.MigrationHeader{
+		Fs:            &myType,
+		Criu:          criuType,
+		Idmap:         idmaps,
+		SnapshotNames: snapshotNames,
+		Snapshots:     snapshots,
+		Predump:       proto.Bool(use_pre_dumps),
+	}
+
+	err = s.send(&header)
+	if err != nil {
+		s.sendControl(err)
+		return err
+	}
+
+	if fsErr != nil {
+		s.sendControl(fsErr)
+		return fsErr
+	}
+
+	err = s.recv(&header)
+	if err != nil {
+		s.sendControl(err)
+		return err
+	}
+
+	bwlimit := ""
+	if *header.Fs != myType {
+		myType = migration.MigrationFSType_RSYNC
+		header.Fs = &myType
+
+		driver, _ = rsyncMigrationSource(s.container, s.containerOnly)
+
+		// Check if this storage pool has a rate limit set for rsync.
+		poolwritable := s.container.Storage().GetStoragePoolWritable()
+		if poolwritable.Config != nil {
+			bwlimit = poolwritable.Config["rsync.bwlimit"]
+		}
+	}
+
+	// Check if the other side knows about pre-dumping and
+	// the associated rsync protocol
+	use_pre_dumps = header.GetPredump()
+	if use_pre_dumps {
+		logger.Debugf("The other side does support pre-copy")
+	} else {
+		logger.Debugf("The other side does not support pre-copy")
+	}
+
+	// All failure paths need to do a few things to correctly handle errors before returning.
+	// Unfortunately, handling errors is not well-suited to defer as the code depends on the
+	// status of driver and the error value.  The error value is especially tricky due to the
+	// common case of creating a new err variable (intentional or not) due to scoping and use
+	// of ":=".  Capturing err in a closure for use in defer would be fragile, which defeats
+	// the purpose of using defer.  An abort function reduces the odds of mishandling errors
+	// without introducing the fragility of closing on err.
+	abort := func(err error) error {
+		driver.Cleanup()
+		s.sendControl(err)
+		return err
+	}
+
+	err = driver.SendWhileRunning(s.fsConn, migrateOp, bwlimit, s.containerOnly)
+	if err != nil {
+		return abort(err)
+	}
+
+	restoreSuccess := make(chan bool, 1)
+	dumpSuccess := make(chan error, 1)
+
+	if s.live {
+		if header.Criu == nil {
+			return abort(fmt.Errorf("Got no CRIU socket type for live migration"))
+		} else if *header.Criu != migration.CRIUType_CRIU_RSYNC {
+			return abort(fmt.Errorf("Formats other than criu rsync not understood"))
+		}
+
+		checkpointDir, err := ioutil.TempDir("", "lxd_checkpoint_")
+		if err != nil {
+			return abort(err)
+		}
+
+		if util.RuntimeLiblxcVersionAtLeast(2, 0, 4) {
+			/* What happens below is slightly convoluted. Due to various
+			 * complications with networking, there's no easy way for criu
+			 * to exit and leave the container in a frozen state for us to
+			 * somehow resume later.
+			 *
+			 * Instead, we use what criu calls an "action-script", which is
+			 * basically a callback that lets us know when the dump is
+			 * done. (Unfortunately, we can't pass arguments, just an
+			 * executable path, so we write a custom action script with the
+			 * real command we want to run.)
+			 *
+			 * This script then hangs until the migration operation either
+			 * finishes successfully or fails, and exits 1 or 0, which
+			 * causes criu to either leave the container running or kill it
+			 * as we asked.
+			 */
+			dumpDone := make(chan bool, 1)
+			actionScriptOpSecret, err := shared.RandomCryptoString()
+			if err != nil {
+				os.RemoveAll(checkpointDir)
+				return abort(err)
+			}
+
+			state := s.container.DaemonState()
+			actionScriptOp, err := operationCreate(
+				state.Cluster,
+				operationClassWebsocket,
+				"Live-migrating container",
+				nil,
+				nil,
+				func(op *operation) error {
+					result := <-restoreSuccess
+					if !result {
+						return fmt.Errorf("restore failed, failing CRIU")
+					}
+					return nil
+				},
+				nil,
+				func(op *operation, r *http.Request, w http.ResponseWriter) error {
+					secret := r.FormValue("secret")
+					if secret == "" {
+						return fmt.Errorf("missing secret")
+					}
+
+					if secret != actionScriptOpSecret {
+						return os.ErrPermission
+					}
+
+					c, err := shared.WebsocketUpgrader.Upgrade(w, r, nil)
+					if err != nil {
+						return err
+					}
+
+					dumpDone <- true
+
+					closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
+					return c.WriteMessage(websocket.CloseMessage, closeMsg)
+				},
+			)
+			if err != nil {
+				os.RemoveAll(checkpointDir)
+				return abort(err)
+			}
+
+			err = writeActionScript(checkpointDir, actionScriptOp.url, actionScriptOpSecret, state.OS.ExecPath)
+			if err != nil {
+				os.RemoveAll(checkpointDir)
+				return abort(err)
+			}
+
+			preDumpCounter := 0
+			preDumpDir := ""
+			if use_pre_dumps {
+				final := false
+				for !final {
+					preDumpCounter++
+					if preDumpCounter < max_iterations {
+						final = false
+					} else {
+						final = true
+					}
+					dumpDir := fmt.Sprintf("%03d", preDumpCounter)
+					loop_args := preDumpLoopArgs{
+						checkpointDir: checkpointDir,
+						bwlimit:       bwlimit,
+						preDumpDir:    preDumpDir,
+						dumpDir:       dumpDir,
+						final:         final,
+					}
+					final, err = s.preDumpLoop(&loop_args)
+					if err != nil {
+						os.RemoveAll(checkpointDir)
+						return abort(err)
+					}
+					preDumpDir = fmt.Sprintf("%03d", preDumpCounter)
+					preDumpCounter++
+				}
+			}
+
+			_, err = actionScriptOp.Run()
+			if err != nil {
+				os.RemoveAll(checkpointDir)
+				return abort(err)
+			}
+
+			go func() {
+				criuMigrationArgs := CriuMigrationArgs{
+					cmd:          lxc.MIGRATE_DUMP,
+					stop:         true,
+					actionScript: true,
+					preDumpDir:   preDumpDir,
+					dumpDir:      "final",
+					stateDir:     checkpointDir,
+					function:     "migration",
+				}
+
+				// Do the final CRIU dump. This is needs no special
+				// handling if pre-dumps are used or not
+				dumpSuccess <- s.container.Migrate(&criuMigrationArgs)
+				os.RemoveAll(checkpointDir)
+			}()
+
+			select {
+			/* the checkpoint failed, let's just abort */
+			case err = <-dumpSuccess:
+				return abort(err)
+			/* the dump finished, let's continue on to the restore */
+			case <-dumpDone:
+				logger.Debugf("Dump finished, continuing with restore...")
+			}
+		} else {
+			logger.Debugf("liblxc version is older than 2.0.4 and the live migration will probably fail")
+			defer os.RemoveAll(checkpointDir)
+			criuMigrationArgs := CriuMigrationArgs{
+				cmd:          lxc.MIGRATE_DUMP,
+				stateDir:     checkpointDir,
+				function:     "migration",
+				stop:         true,
+				actionScript: false,
+				dumpDir:      "final",
+				preDumpDir:   "",
+			}
+
+			err = s.container.Migrate(&criuMigrationArgs)
+			if err != nil {
+				return abort(err)
+			}
+		}
+
+		/*
+		 * We do the serially right now, but there's really no reason for us
+		 * to; since we have separate websockets, we can do it in parallel if
+		 * we wanted to. However, assuming we're network bound, there's really
+		 * no reason to do these in parallel. In the future when we're using
+		 * p.haul's protocol, it will make sense to do these in parallel.
+		 */
+		ctName, _, _ := containerGetParentAndSnapshotName(s.container.Name())
+		state := s.container.DaemonState()
+		err = RsyncSend(ctName, shared.AddSlash(checkpointDir), s.criuConn, nil, bwlimit, state.OS.ExecPath)
+		if err != nil {
+			return abort(err)
+		}
+	}
+
+	if s.live || (header.Criu != nil && *header.Criu == migration.CRIUType_NONE) {
+		err = driver.SendAfterCheckpoint(s.fsConn, bwlimit)
+		if err != nil {
+			return abort(err)
+		}
+	}
+
+	driver.Cleanup()
+
+	msg := migration.MigrationControl{}
+	err = s.recv(&msg)
+	if err != nil {
+		s.disconnect()
+		return err
+	}
+
+	if s.live {
+		restoreSuccess <- *msg.Success
+		err := <-dumpSuccess
+		if err != nil {
+			logger.Errorf("dump failed after successful restore?: %q", err)
+		}
+	}
+
+	if !*msg.Success {
+		return fmt.Errorf(*msg.Message)
+	}
+
+	return nil
+}
+
+func NewMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) {
+	sink := migrationSink{
+		src:    migrationFields{container: args.Container, containerOnly: args.ContainerOnly},
+		dest:   migrationFields{containerOnly: args.ContainerOnly},
+		url:    args.Url,
+		dialer: args.Dialer,
+		push:   args.Push,
+	}
+
+	if sink.push {
+		sink.allConnected = make(chan bool, 1)
+	}
+
+	var ok bool
+	var err error
+	if sink.push {
+		sink.dest.controlSecret, err = shared.RandomCryptoString()
+		if err != nil {
+			return nil, err
+		}
+
+		sink.dest.fsSecret, err = shared.RandomCryptoString()
+		if err != nil {
+			return nil, err
+		}
+
+		sink.dest.live = args.Live
+		if sink.dest.live {
+			sink.dest.criuSecret, err = shared.RandomCryptoString()
+			if err != nil {
+				return nil, err
+			}
+		}
+	} else {
+		sink.src.controlSecret, ok = args.Secrets["control"]
+		if !ok {
+			return nil, fmt.Errorf("Missing control secret")
+		}
+
+		sink.src.fsSecret, ok = args.Secrets["fs"]
+		if !ok {
+			return nil, fmt.Errorf("Missing fs secret")
+		}
+
+		sink.src.criuSecret, ok = args.Secrets["criu"]
+		sink.src.live = ok
+	}
+
+	_, err = exec.LookPath("criu")
+	if sink.push && sink.dest.live && err != nil {
+		return nil, fmt.Errorf("Unable to perform container live migration. CRIU isn't installed on the destination server.")
+	} else if sink.src.live && err != nil {
+		return nil, fmt.Errorf("Unable to perform container live migration. CRIU isn't installed on the destination server.")
+	}
+
+	return &sink, nil
+}
+
+func (c *migrationSink) Do(migrateOp *operation) error {
+	var err error
+
+	if c.push {
+		<-c.allConnected
+	}
+
+	disconnector := c.src.disconnect
+	if c.push {
+		disconnector = c.dest.disconnect
+	}
+
+	if c.push {
+		defer disconnector()
+	} else {
+		c.src.controlConn, err = c.connectWithSecret(c.src.controlSecret)
+		if err != nil {
+			return err
+		}
+		defer c.src.disconnect()
+
+		c.src.fsConn, err = c.connectWithSecret(c.src.fsSecret)
+		if err != nil {
+			c.src.sendControl(err)
+			return err
+		}
+
+		if c.src.live {
+			c.src.criuConn, err = c.connectWithSecret(c.src.criuSecret)
+			if err != nil {
+				c.src.sendControl(err)
+				return err
+			}
+		}
+	}
+
+	receiver := c.src.recv
+	if c.push {
+		receiver = c.dest.recv
+	}
+
+	sender := c.src.send
+	if c.push {
+		sender = c.dest.send
+	}
+
+	controller := c.src.sendControl
+	if c.push {
+		controller = c.dest.sendControl
+	}
+
+	header := migration.MigrationHeader{}
+	if err := receiver(&header); err != nil {
+		controller(err)
+		return err
+	}
+
+	live := c.src.live
+	if c.push {
+		live = c.dest.live
+	}
+
+	criuType := migration.CRIUType_CRIU_RSYNC.Enum()
+	if header.Criu != nil && *header.Criu == migration.CRIUType_NONE {
+		criuType = migration.CRIUType_NONE.Enum()
+	} else {
+		if !live {
+			criuType = nil
+		}
+	}
+
+	mySink := c.src.container.Storage().MigrationSink
+	myType := c.src.container.Storage().MigrationType()
+	resp := migration.MigrationHeader{
+		Fs:   &myType,
+		Criu: criuType,
+	}
+
+	// If the storage type the source has doesn't match what we have, then
+	// we have to use rsync.
+	if *header.Fs != *resp.Fs {
+		mySink = rsyncMigrationSink
+		myType = migration.MigrationFSType_RSYNC
+		resp.Fs = &myType
+	}
+
+	if header.GetPredump() == true {
+		// If the other side wants pre-dump and if
+		// this side supports it, let's use it.
+		resp.Predump = proto.Bool(true)
+	} else {
+		resp.Predump = proto.Bool(false)
+	}
+
+	err = sender(&resp)
+	if err != nil {
+		controller(err)
+		return err
+	}
+
+	restore := make(chan error)
+	go func(c *migrationSink) {
+		imagesDir := ""
+		srcIdmap := new(idmap.IdmapSet)
+
+		for _, idmapSet := range header.Idmap {
+			e := idmap.IdmapEntry{
+				Isuid:    *idmapSet.Isuid,
+				Isgid:    *idmapSet.Isgid,
+				Nsid:     int64(*idmapSet.Nsid),
+				Hostid:   int64(*idmapSet.Hostid),
+				Maprange: int64(*idmapSet.Maprange)}
+			srcIdmap.Idmap = idmap.Extend(srcIdmap.Idmap, e)
+		}
+
+		/* We do the fs receive in parallel so we don't have to reason
+		 * about when to receive what. The sending side is smart enough
+		 * to send the filesystem bits that it can before it seizes the
+		 * container to start checkpointing, so the total transfer time
+		 * will be minimized even if we're dumb here.
+		 */
+		fsTransfer := make(chan error)
+		go func() {
+			snapshots := []*migration.Snapshot{}
+
+			/* Legacy: we only sent the snapshot names, so we just
+			 * copy the container's config over, same as we used to
+			 * do.
+			 */
+			if len(header.SnapshotNames) != len(header.Snapshots) {
+				for _, name := range header.SnapshotNames {
+					base := snapshotToProtobuf(c.src.container)
+					base.Name = &name
+					snapshots = append(snapshots, base)
+				}
+			} else {
+				snapshots = header.Snapshots
+			}
+
+			var fsConn *websocket.Conn
+			if c.push {
+				fsConn = c.dest.fsConn
+			} else {
+				fsConn = c.src.fsConn
+			}
+
+			sendFinalFsDelta := false
+			if live {
+				sendFinalFsDelta = true
+			}
+
+			if criuType != nil && *criuType == migration.CRIUType_NONE {
+				sendFinalFsDelta = true
+			}
+
+			err = mySink(sendFinalFsDelta, c.src.container,
+				snapshots, fsConn, srcIdmap, migrateOp,
+				c.src.containerOnly)
+			if err != nil {
+				fsTransfer <- err
+				return
+			}
+
+			err = ShiftIfNecessary(c.src.container, srcIdmap)
+			if err != nil {
+				fsTransfer <- err
+				return
+			}
+
+			fsTransfer <- nil
+		}()
+
+		if live {
+			var err error
+			imagesDir, err = ioutil.TempDir("", "lxd_restore_")
+			if err != nil {
+				restore <- err
+				return
+			}
+
+			defer os.RemoveAll(imagesDir)
+
+			var criuConn *websocket.Conn
+			if c.push {
+				criuConn = c.dest.criuConn
+			} else {
+				criuConn = c.src.criuConn
+			}
+
+			sync := &migration.MigrationSync{
+				FinalPreDump: proto.Bool(false),
+			}
+
+			if resp.GetPredump() {
+				logger.Debugf("Before the receive loop %s", sync.GetFinalPreDump())
+				for !sync.GetFinalPreDump() {
+					logger.Debugf("About to receive rsync")
+					// Transfer a CRIU pre-dump
+					err = RsyncRecv(shared.AddSlash(imagesDir), criuConn, nil)
+					if err != nil {
+						restore <- err
+						return
+					}
+					logger.Debugf("rsync receive done")
+
+					logger.Debugf("About to receive header")
+					// Check if this was the last pre-dump
+					// Only the FinalPreDump element if of interest
+					mtype, data, err := criuConn.ReadMessage()
+					if err != nil {
+						logger.Debugf("err %s", err)
+						restore <- err
+						return
+					}
+					if mtype != websocket.BinaryMessage {
+						restore <- err
+						return
+					}
+					err = proto.Unmarshal(data, sync)
+					if err != nil {
+						logger.Debugf("err %s", err)
+						restore <- err
+						return
+					}
+					logger.Debugf("At the end of the receive loop %s", sync.GetFinalPreDump())
+				}
+			}
+
+			// Final CRIU dump
+			err = RsyncRecv(shared.AddSlash(imagesDir), criuConn, nil)
+			if err != nil {
+				restore <- err
+				return
+			}
+		}
+
+		err := <-fsTransfer
+		if err != nil {
+			restore <- err
+			return
+		}
+
+		if live {
+			criuMigrationArgs := CriuMigrationArgs{
+				cmd:          lxc.MIGRATE_RESTORE,
+				stateDir:     imagesDir,
+				function:     "migration",
+				stop:         false,
+				actionScript: false,
+				dumpDir:      "final",
+				preDumpDir:   "",
+			}
+
+			// Currently we only do a single CRIU pre-dump so we
+			// can hardcode "final" here since we know that "final" is the
+			// folder for CRIU's final dump.
+			err = c.src.container.Migrate(&criuMigrationArgs)
+			if err != nil {
+				restore <- err
+				return
+			}
+
+		}
+
+		restore <- nil
+	}(c)
+
+	var source <-chan migration.MigrationControl
+	if c.push {
+		source = c.dest.controlChannel()
+	} else {
+		source = c.src.controlChannel()
+	}
+
+	for {
+		select {
+		case err = <-restore:
+			controller(err)
+			return err
+		case msg, ok := <-source:
+			if !ok {
+				disconnector()
+				return fmt.Errorf("Got error reading source")
+			}
+			if !*msg.Success {
+				disconnector()
+				return fmt.Errorf(*msg.Message)
+			} else {
+				// The source can only tell us it failed (e.g. if
+				// checkpointing failed). We have to tell the source
+				// whether or not the restore was successful.
+				logger.Debugf("Unknown message %v from source", msg)
+			}
+		}
+	}
+}
+
+func (s *migrationSourceWs) ConnectContainerTarget(target api.ContainerPostTarget) error {
+	return s.ConnectTarget(target.Operation, target.Operation, target.Websockets)
+}

From 610c8dcac6ab38dd27cf274a68605da775ca7f75 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 22 Mar 2018 18:30:57 +0100
Subject: [PATCH 2/3] client: send "copy" type

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 client/lxd_storage_volumes.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/client/lxd_storage_volumes.go b/client/lxd_storage_volumes.go
index eabeea5a1..79c418ab8 100644
--- a/client/lxd_storage_volumes.go
+++ b/client/lxd_storage_volumes.go
@@ -109,7 +109,7 @@ func (r *ProtocolLXD) CopyStoragePoolVolume(pool string, source ContainerServer,
 		Type: volume.Type,
 		Source: api.StorageVolumeSource{
 			Name: volume.Name,
-			Type: volume.Type,
+			Type: "copy",
 			Pool: sourcePool,
 		},
 	}

From c3329f6bfc12ab910949233e2d69c5bdffd4fd0e Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 22 Mar 2018 18:27:36 +0100
Subject: [PATCH 3/3] storage: add helper in API endpoint

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 lxd/storage_volumes.go | 22 ++++++++++++++++------
 1 file changed, 16 insertions(+), 6 deletions(-)

diff --git a/lxd/storage_volumes.go b/lxd/storage_volumes.go
index a85892f79..0efda2d70 100644
--- a/lxd/storage_volumes.go
+++ b/lxd/storage_volumes.go
@@ -162,10 +162,6 @@ func storagePoolVolumesTypePost(d *Daemon, r *http.Request) Response {
 		return BadRequest(fmt.Errorf("you must provide a storage volume type of the storage volume"))
 	}
 
-	// Check if the user gave us a valid pool name in which the new storage
-	// volume is supposed to be created.
-	poolName := mux.Vars(r)["name"]
-
 	// We currently only allow to create storage volumes of type
 	// storagePoolVolumeTypeCustom. So check, that nothing else was
 	// requested.
@@ -174,8 +170,22 @@ func storagePoolVolumesTypePost(d *Daemon, r *http.Request) Response {
 			`storage volumes of type %s`, req.Type))
 	}
 
+	poolName := mux.Vars(r)["name"]
+
+	switch req.Source.Type {
+	case "":
+		return doVolumeCreateOrCopy(d, poolName, &req)
+	case "copy":
+		return doVolumeCreateOrCopy(d, poolName, &req)
+	default:
+		return BadRequest(fmt.Errorf("unknown source type %s", req.Source.Type))
+	}
+}
+
+func doVolumeCreateOrCopy(d *Daemon, poolName string, req *api.StorageVolumesPost) Response {
+
 	doWork := func() error {
-		err = storagePoolVolumeCreateInternal(d.State(), poolName, &req)
+		err := storagePoolVolumeCreateInternal(d.State(), poolName, req)
 		if err != nil {
 			return err
 		}
@@ -183,7 +193,7 @@ func storagePoolVolumesTypePost(d *Daemon, r *http.Request) Response {
 	}
 
 	if req.Source.Name == "" {
-		err = doWork()
+		err := doWork()
 		if err != nil {
 			return SmartError(err)
 		}


More information about the lxc-devel mailing list