[lxc-devel] [lxd/master] storage: prelude
brauner on Github
lxc-bot at linuxcontainers.org
Thu Mar 22 17:32:30 UTC 2018
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 364 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180322/9339b86e/attachment.bin>
-------------- next part --------------
From f355d898caa1602770947c11f1090eb344d42d82 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Wed, 21 Mar 2018 12:29:51 +0100
Subject: [PATCH 1/3] migrate: split into migrate and migrate_container
Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
lxd/container_post.go | 2 +-
lxd/container_snapshot.go | 2 +-
lxd/migrate.go | 1029 +--------------------------------------------
lxd/migrate_container.go | 1013 ++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 1033 insertions(+), 1013 deletions(-)
create mode 100644 lxd/migrate_container.go
diff --git a/lxd/container_post.go b/lxd/container_post.go
index f77fbd04d..96548cfff 100644
--- a/lxd/container_post.go
+++ b/lxd/container_post.go
@@ -67,7 +67,7 @@ func containerPost(d *Daemon, r *http.Request) Response {
if req.Target != nil {
// Push mode
- err := ws.ConnectTarget(*req.Target)
+ err := ws.ConnectContainerTarget(*req.Target)
if err != nil {
return InternalError(err)
}
diff --git a/lxd/container_snapshot.go b/lxd/container_snapshot.go
index 2640609fc..14c7fef12 100644
--- a/lxd/container_snapshot.go
+++ b/lxd/container_snapshot.go
@@ -246,7 +246,7 @@ func snapshotPost(d *Daemon, r *http.Request, sc container, containerName string
if req.Target != nil {
// Push mode
- err := ws.ConnectTarget(*req.Target)
+ err := ws.ConnectContainerTarget(*req.Target)
if err != nil {
return InternalError(err)
}
diff --git a/lxd/migrate.go b/lxd/migrate.go
index f0cea6253..04a223b1a 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -7,36 +7,23 @@ package main
import (
"crypto/x509"
- "encoding/binary"
"encoding/pem"
"fmt"
- "io/ioutil"
"net/http"
"net/url"
"os"
- "os/exec"
- "path/filepath"
- "strconv"
"strings"
"sync"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
- "gopkg.in/lxc/go-lxc.v2"
"github.com/lxc/lxd/lxd/migration"
- "github.com/lxc/lxd/lxd/util"
"github.com/lxc/lxd/shared"
- "github.com/lxc/lxd/shared/api"
- "github.com/lxc/lxd/shared/idmap"
"github.com/lxc/lxd/shared/logger"
)
type migrationFields struct {
- live bool
-
- containerOnly bool
-
controlSecret string
controlConn *websocket.Conn
controlLock sync.Mutex
@@ -47,7 +34,10 @@ type migrationFields struct {
fsSecret string
fsConn *websocket.Conn
- container container
+ // container specific fields
+ live bool
+ containerOnly bool
+ container container
}
func (c *migrationFields) send(m proto.Message) error {
@@ -137,37 +127,6 @@ type migrationSourceWs struct {
allConnected chan bool
}
-func NewMigrationSource(c container, stateful bool, containerOnly bool) (*migrationSourceWs, error) {
- ret := migrationSourceWs{migrationFields{container: c}, make(chan bool, 1)}
- ret.containerOnly = containerOnly
-
- var err error
- ret.controlSecret, err = shared.RandomCryptoString()
- if err != nil {
- return nil, err
- }
-
- ret.fsSecret, err = shared.RandomCryptoString()
- if err != nil {
- return nil, err
- }
-
- if stateful && c.IsRunning() {
- _, err := exec.LookPath("criu")
- if err != nil {
- return nil, fmt.Errorf("Unable to perform container live migration. CRIU isn't installed on the source server.")
- }
-
- ret.live = true
- ret.criuSecret, err = shared.RandomCryptoString()
- if err != nil {
- return nil, err
- }
- }
-
- return &ret, nil
-}
-
func (s *migrationSourceWs) Metadata() interface{} {
secrets := shared.Jmap{
"control": s.controlSecret,
@@ -217,12 +176,12 @@ func (s *migrationSourceWs) Connect(op *operation, r *http.Request, w http.Respo
return nil
}
-func (s *migrationSourceWs) ConnectTarget(target api.ContainerPostTarget) error {
+func (s *migrationSourceWs) ConnectTarget(certificate string, operation string, websockets map[string]string) error {
var err error
var cert *x509.Certificate
- if target.Certificate != "" {
- certBlock, _ := pem.Decode([]byte(target.Certificate))
+ if certificate != "" {
+ certBlock, _ := pem.Decode([]byte(certificate))
if certBlock == nil {
return fmt.Errorf("Invalid certificate")
}
@@ -243,7 +202,7 @@ func (s *migrationSourceWs) ConnectTarget(target api.ContainerPostTarget) error
NetDial: shared.RFC3493Dialer,
}
- for name, secret := range target.Websockets {
+ for name, secret := range websockets {
var conn **websocket.Conn
switch name {
@@ -260,7 +219,7 @@ func (s *migrationSourceWs) ConnectTarget(target api.ContainerPostTarget) error
query := url.Values{"secret": []string{secret}}
// The URL is a https URL to the operation, mangle to be a wss URL to the secret
- wsUrl := fmt.Sprintf("wss://%s/websocket?%s", strings.TrimPrefix(target.Operation, "https://"), query.Encode())
+ wsUrl := fmt.Sprintf("wss://%s/websocket?%s", strings.TrimPrefix(operation, "https://"), query.Encode())
wsConn, _, err := dialer.Dial(wsUrl, http.Header{})
if err != nil {
@@ -275,609 +234,10 @@ func (s *migrationSourceWs) ConnectTarget(target api.ContainerPostTarget) error
return nil
}
-func writeActionScript(directory string, operation string, secret string, execPath string) error {
- script := fmt.Sprintf(`#!/bin/sh -e
-if [ "$CRTOOLS_SCRIPT_ACTION" = "post-dump" ]; then
- %s migratedumpsuccess %s %s
-fi
-`, execPath, operation, secret)
-
- f, err := os.Create(filepath.Join(directory, "action.sh"))
- if err != nil {
- return err
- }
- defer f.Close()
-
- err = f.Chmod(0500)
- if err != nil {
- return err
- }
-
- _, err = f.WriteString(script)
- return err
-}
-
-func snapshotToProtobuf(c container) *migration.Snapshot {
- config := []*migration.Config{}
- for k, v := range c.LocalConfig() {
- kCopy := string(k)
- vCopy := string(v)
- config = append(config, &migration.Config{Key: &kCopy, Value: &vCopy})
- }
-
- devices := []*migration.Device{}
- for name, d := range c.LocalDevices() {
- props := []*migration.Config{}
- for k, v := range d {
- kCopy := string(k)
- vCopy := string(v)
- props = append(props, &migration.Config{Key: &kCopy, Value: &vCopy})
- }
-
- devices = append(devices, &migration.Device{Name: &name, Config: props})
- }
-
- parts := strings.SplitN(c.Name(), shared.SnapshotDelimiter, 2)
- isEphemeral := c.IsEphemeral()
- arch := int32(c.Architecture())
- stateful := c.IsStateful()
-
- return &migration.Snapshot{
- Name: &parts[len(parts)-1],
- LocalConfig: config,
- Profiles: c.Profiles(),
- Ephemeral: &isEphemeral,
- LocalDevices: devices,
- Architecture: &arch,
- Stateful: &stateful,
- }
-}
-
-// Check if CRIU supports pre-dumping and number of
-// pre-dump iterations
-func (s *migrationSourceWs) checkForPreDumpSupport() (bool, int) {
- // Ask CRIU if this architecture/kernel/criu combination
- // supports pre-copy (dirty memory tracking)
- criuMigrationArgs := CriuMigrationArgs{
- cmd: lxc.MIGRATE_FEATURE_CHECK,
- stateDir: "",
- function: "feature-check",
- stop: false,
- actionScript: false,
- dumpDir: "",
- preDumpDir: "",
- features: lxc.FEATURE_MEM_TRACK,
- }
- err := s.container.Migrate(&criuMigrationArgs)
-
- if err != nil {
- // CRIU says it does not know about dirty memory tracking.
- // This means the rest of this function is irrelevant.
- return false, 0
- }
-
- // CRIU says it can actually do pre-dump. Let's set it to true
- // unless the user wants something else.
- use_pre_dumps := true
-
- // What does the configuration say about pre-copy
- tmp := s.container.ExpandedConfig()["migration.incremental.memory"]
-
- if tmp != "" {
- use_pre_dumps = shared.IsTrue(tmp)
- }
- logger.Debugf("migration.incremental.memory %d", use_pre_dumps)
-
- var max_iterations int
-
- // migration.incremental.memory.iterations is the value after which the
- // container will be definitely migrated, even if the remaining number
- // of memory pages is below the defined threshold.
- tmp = s.container.ExpandedConfig()["migration.incremental.memory.iterations"]
- if tmp != "" {
- max_iterations, _ = strconv.Atoi(tmp)
- } else {
- // default to 10
- max_iterations = 10
- }
- if max_iterations > 999 {
- // the pre-dump directory is hardcoded to a string
- // with maximal 3 digits. 999 pre-dumps makes no
- // sense at all, but let's make sure the number
- // is not higher than this.
- max_iterations = 999
- }
- logger.Debugf("using maximal %d iterations for pre-dumping", max_iterations)
-
- return use_pre_dumps, max_iterations
-}
-
-// The function readCriuStatsDump() reads the CRIU 'stats-dump' file
-// in path and returns the pages_written, pages_skipped_parent, error.
-func readCriuStatsDump(path string) (uint64, uint64, error) {
- statsDump := shared.AddSlash(path) + "stats-dump"
- in, err := ioutil.ReadFile(statsDump)
- if err != nil {
- logger.Errorf("Error reading CRIU's 'stats-dump' file: %s", err.Error())
- return 0, 0, err
- }
-
- // According to the CRIU file image format it starts with two magic values.
- // First magic IMG_SERVICE: 1427134784
- if binary.LittleEndian.Uint32(in[0:4]) != 1427134784 {
- msg := "IMG_SERVICE(1427134784) criu magic not found"
- logger.Errorf(msg)
- return 0, 0, fmt.Errorf(msg)
- }
- // Second magic STATS: 1460220678
- if binary.LittleEndian.Uint32(in[4:8]) != 1460220678 {
- msg := "STATS(1460220678) criu magic not found"
- logger.Errorf(msg)
- return 0, 0, fmt.Errorf(msg)
- }
-
- // Next, read the size of the image payload
- size := binary.LittleEndian.Uint32(in[8:12])
- logger.Debugf("stats-dump payload size %d", size)
-
- statsEntry := &migration.StatsEntry{}
- if err = proto.Unmarshal(in[12:12+size], statsEntry); err != nil {
- logger.Errorf("Failed to parse CRIU's 'stats-dump' file: %s", err.Error())
- return 0, 0, err
- }
-
- written := statsEntry.GetDump().GetPagesWritten()
- skipped := statsEntry.GetDump().GetPagesSkippedParent()
- return written, skipped, nil
-}
-
-type preDumpLoopArgs struct {
- checkpointDir string
- bwlimit string
- preDumpDir string
- dumpDir string
- final bool
-}
-
-// The function preDumpLoop is the main logic behind the pre-copy migration.
-// This function contains the actual pre-dump, the corresponding rsync
-// transfer and it tells the outer loop to abort if the threshold
-// of memory pages transferred by pre-dumping has been reached.
-func (s *migrationSourceWs) preDumpLoop(args *preDumpLoopArgs) (bool, error) {
- // Do a CRIU pre-dump
- criuMigrationArgs := CriuMigrationArgs{
- cmd: lxc.MIGRATE_PRE_DUMP,
- stop: false,
- actionScript: false,
- preDumpDir: args.preDumpDir,
- dumpDir: args.dumpDir,
- stateDir: args.checkpointDir,
- function: "migration",
- }
-
- logger.Debugf("Doing another pre-dump in %s", args.preDumpDir)
-
- final := args.final
-
- err := s.container.Migrate(&criuMigrationArgs)
- if err != nil {
- return final, err
- }
-
- // Send the pre-dump.
- ctName, _, _ := containerGetParentAndSnapshotName(s.container.Name())
- state := s.container.DaemonState()
- err = RsyncSend(ctName, shared.AddSlash(args.checkpointDir), s.criuConn, nil, args.bwlimit, state.OS.ExecPath)
- if err != nil {
- return final, err
- }
-
- // Read the CRIU's 'stats-dump' file
- dumpPath := shared.AddSlash(args.checkpointDir)
- dumpPath += shared.AddSlash(args.dumpDir)
- written, skipped_parent, err := readCriuStatsDump(dumpPath)
- if err != nil {
- return final, err
- }
-
- logger.Debugf("CRIU pages written %d", written)
- logger.Debugf("CRIU pages skipped %d", skipped_parent)
-
- total_pages := written + skipped_parent
-
- percentage_skipped := int(100 - ((100 * written) / total_pages))
-
- logger.Debugf("CRIU pages skipped percentage %d%%", percentage_skipped)
-
- // threshold is the percentage of memory pages that needs
- // to be pre-copied for the pre-copy migration to stop.
- var threshold int
- tmp := s.container.ExpandedConfig()["migration.incremental.memory.goal"]
- if tmp != "" {
- threshold, _ = strconv.Atoi(tmp)
- } else {
- // defaults to 70%
- threshold = 70
- }
-
- if percentage_skipped > threshold {
- logger.Debugf("Memory pages skipped (%d%%) due to pre-copy is larger than threshold (%d%%)", percentage_skipped, threshold)
- logger.Debugf("This was the last pre-dump; next dump is the final dump")
- final = true
- }
-
- // If in pre-dump mode, the receiving side
- // expects a message to know if this was the
- // last pre-dump
- logger.Debugf("Sending another header")
- sync := migration.MigrationSync{
- FinalPreDump: proto.Bool(final),
- }
-
- data, err := proto.Marshal(&sync)
-
- if err != nil {
- return final, err
- }
-
- err = s.criuConn.WriteMessage(websocket.BinaryMessage, data)
- if err != nil {
- s.sendControl(err)
- return final, err
- }
- logger.Debugf("Sending another header done")
-
- return final, nil
-}
-
-func (s *migrationSourceWs) Do(migrateOp *operation) error {
- <-s.allConnected
-
- criuType := migration.CRIUType_CRIU_RSYNC.Enum()
- if !s.live {
- criuType = nil
- if s.container.IsRunning() {
- criuType = migration.CRIUType_NONE.Enum()
- }
- }
-
- // Storage needs to start unconditionally now, since we need to
- // initialize a new storage interface.
- ourStart, err := s.container.StorageStart()
- if err != nil {
- return err
- }
- if ourStart {
- defer s.container.StorageStop()
- }
-
- idmaps := make([]*migration.IDMapType, 0)
-
- idmapset, err := s.container.IdmapSet()
- if err != nil {
- return err
- }
-
- if idmapset != nil {
- for _, ctnIdmap := range idmapset.Idmap {
- idmap := migration.IDMapType{
- Isuid: proto.Bool(ctnIdmap.Isuid),
- Isgid: proto.Bool(ctnIdmap.Isgid),
- Hostid: proto.Int32(int32(ctnIdmap.Hostid)),
- Nsid: proto.Int32(int32(ctnIdmap.Nsid)),
- Maprange: proto.Int32(int32(ctnIdmap.Maprange)),
- }
-
- idmaps = append(idmaps, &idmap)
- }
- }
-
- driver, fsErr := s.container.Storage().MigrationSource(s.container, s.containerOnly)
-
- snapshots := []*migration.Snapshot{}
- snapshotNames := []string{}
- // Only send snapshots when requested.
- if !s.containerOnly {
- if fsErr == nil {
- fullSnaps := driver.Snapshots()
- for _, snap := range fullSnaps {
- snapshots = append(snapshots, snapshotToProtobuf(snap))
- snapshotNames = append(snapshotNames, shared.ExtractSnapshotName(snap.Name()))
- }
- }
- }
-
- use_pre_dumps := false
- max_iterations := 0
- if s.live {
- use_pre_dumps, max_iterations = s.checkForPreDumpSupport()
- }
-
- // The protocol says we have to send a header no matter what, so let's
- // do that, but then immediately send an error.
- myType := s.container.Storage().MigrationType()
- header := migration.MigrationHeader{
- Fs: &myType,
- Criu: criuType,
- Idmap: idmaps,
- SnapshotNames: snapshotNames,
- Snapshots: snapshots,
- Predump: proto.Bool(use_pre_dumps),
- }
-
- err = s.send(&header)
- if err != nil {
- s.sendControl(err)
- return err
- }
-
- if fsErr != nil {
- s.sendControl(fsErr)
- return fsErr
- }
-
- err = s.recv(&header)
- if err != nil {
- s.sendControl(err)
- return err
- }
-
- bwlimit := ""
- if *header.Fs != myType {
- myType = migration.MigrationFSType_RSYNC
- header.Fs = &myType
-
- driver, _ = rsyncMigrationSource(s.container, s.containerOnly)
-
- // Check if this storage pool has a rate limit set for rsync.
- poolwritable := s.container.Storage().GetStoragePoolWritable()
- if poolwritable.Config != nil {
- bwlimit = poolwritable.Config["rsync.bwlimit"]
- }
- }
-
- // Check if the other side knows about pre-dumping and
- // the associated rsync protocol
- use_pre_dumps = header.GetPredump()
- if use_pre_dumps {
- logger.Debugf("The other side does support pre-copy")
- } else {
- logger.Debugf("The other side does not support pre-copy")
- }
-
- // All failure paths need to do a few things to correctly handle errors before returning.
- // Unfortunately, handling errors is not well-suited to defer as the code depends on the
- // status of driver and the error value. The error value is especially tricky due to the
- // common case of creating a new err variable (intentional or not) due to scoping and use
- // of ":=". Capturing err in a closure for use in defer would be fragile, which defeats
- // the purpose of using defer. An abort function reduces the odds of mishandling errors
- // without introducing the fragility of closing on err.
- abort := func(err error) error {
- driver.Cleanup()
- s.sendControl(err)
- return err
- }
-
- err = driver.SendWhileRunning(s.fsConn, migrateOp, bwlimit, s.containerOnly)
- if err != nil {
- return abort(err)
- }
-
- restoreSuccess := make(chan bool, 1)
- dumpSuccess := make(chan error, 1)
-
- if s.live {
- if header.Criu == nil {
- return abort(fmt.Errorf("Got no CRIU socket type for live migration"))
- } else if *header.Criu != migration.CRIUType_CRIU_RSYNC {
- return abort(fmt.Errorf("Formats other than criu rsync not understood"))
- }
-
- checkpointDir, err := ioutil.TempDir("", "lxd_checkpoint_")
- if err != nil {
- return abort(err)
- }
-
- if util.RuntimeLiblxcVersionAtLeast(2, 0, 4) {
- /* What happens below is slightly convoluted. Due to various
- * complications with networking, there's no easy way for criu
- * to exit and leave the container in a frozen state for us to
- * somehow resume later.
- *
- * Instead, we use what criu calls an "action-script", which is
- * basically a callback that lets us know when the dump is
- * done. (Unfortunately, we can't pass arguments, just an
- * executable path, so we write a custom action script with the
- * real command we want to run.)
- *
- * This script then hangs until the migration operation either
- * finishes successfully or fails, and exits 1 or 0, which
- * causes criu to either leave the container running or kill it
- * as we asked.
- */
- dumpDone := make(chan bool, 1)
- actionScriptOpSecret, err := shared.RandomCryptoString()
- if err != nil {
- os.RemoveAll(checkpointDir)
- return abort(err)
- }
-
- state := s.container.DaemonState()
- actionScriptOp, err := operationCreate(
- state.Cluster,
- operationClassWebsocket,
- "Live-migrating container",
- nil,
- nil,
- func(op *operation) error {
- result := <-restoreSuccess
- if !result {
- return fmt.Errorf("restore failed, failing CRIU")
- }
- return nil
- },
- nil,
- func(op *operation, r *http.Request, w http.ResponseWriter) error {
- secret := r.FormValue("secret")
- if secret == "" {
- return fmt.Errorf("missing secret")
- }
-
- if secret != actionScriptOpSecret {
- return os.ErrPermission
- }
-
- c, err := shared.WebsocketUpgrader.Upgrade(w, r, nil)
- if err != nil {
- return err
- }
-
- dumpDone <- true
-
- closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
- return c.WriteMessage(websocket.CloseMessage, closeMsg)
- },
- )
- if err != nil {
- os.RemoveAll(checkpointDir)
- return abort(err)
- }
-
- err = writeActionScript(checkpointDir, actionScriptOp.url, actionScriptOpSecret, state.OS.ExecPath)
- if err != nil {
- os.RemoveAll(checkpointDir)
- return abort(err)
- }
-
- preDumpCounter := 0
- preDumpDir := ""
- if use_pre_dumps {
- final := false
- for !final {
- preDumpCounter++
- if preDumpCounter < max_iterations {
- final = false
- } else {
- final = true
- }
- dumpDir := fmt.Sprintf("%03d", preDumpCounter)
- loop_args := preDumpLoopArgs{
- checkpointDir: checkpointDir,
- bwlimit: bwlimit,
- preDumpDir: preDumpDir,
- dumpDir: dumpDir,
- final: final,
- }
- final, err = s.preDumpLoop(&loop_args)
- if err != nil {
- os.RemoveAll(checkpointDir)
- return abort(err)
- }
- preDumpDir = fmt.Sprintf("%03d", preDumpCounter)
- preDumpCounter++
- }
- }
-
- _, err = actionScriptOp.Run()
- if err != nil {
- os.RemoveAll(checkpointDir)
- return abort(err)
- }
-
- go func() {
- criuMigrationArgs := CriuMigrationArgs{
- cmd: lxc.MIGRATE_DUMP,
- stop: true,
- actionScript: true,
- preDumpDir: preDumpDir,
- dumpDir: "final",
- stateDir: checkpointDir,
- function: "migration",
- }
-
- // Do the final CRIU dump. This is needs no special
- // handling if pre-dumps are used or not
- dumpSuccess <- s.container.Migrate(&criuMigrationArgs)
- os.RemoveAll(checkpointDir)
- }()
-
- select {
- /* the checkpoint failed, let's just abort */
- case err = <-dumpSuccess:
- return abort(err)
- /* the dump finished, let's continue on to the restore */
- case <-dumpDone:
- logger.Debugf("Dump finished, continuing with restore...")
- }
- } else {
- logger.Debugf("liblxc version is older than 2.0.4 and the live migration will probably fail")
- defer os.RemoveAll(checkpointDir)
- criuMigrationArgs := CriuMigrationArgs{
- cmd: lxc.MIGRATE_DUMP,
- stateDir: checkpointDir,
- function: "migration",
- stop: true,
- actionScript: false,
- dumpDir: "final",
- preDumpDir: "",
- }
-
- err = s.container.Migrate(&criuMigrationArgs)
- if err != nil {
- return abort(err)
- }
- }
-
- /*
- * We do the serially right now, but there's really no reason for us
- * to; since we have separate websockets, we can do it in parallel if
- * we wanted to. However, assuming we're network bound, there's really
- * no reason to do these in parallel. In the future when we're using
- * p.haul's protocol, it will make sense to do these in parallel.
- */
- ctName, _, _ := containerGetParentAndSnapshotName(s.container.Name())
- state := s.container.DaemonState()
- err = RsyncSend(ctName, shared.AddSlash(checkpointDir), s.criuConn, nil, bwlimit, state.OS.ExecPath)
- if err != nil {
- return abort(err)
- }
- }
-
- if s.live || (header.Criu != nil && *header.Criu == migration.CRIUType_NONE) {
- err = driver.SendAfterCheckpoint(s.fsConn, bwlimit)
- if err != nil {
- return abort(err)
- }
- }
-
- driver.Cleanup()
-
- msg := migration.MigrationControl{}
- err = s.recv(&msg)
- if err != nil {
- s.disconnect()
- return err
- }
-
- if s.live {
- restoreSuccess <- *msg.Success
- err := <-dumpSuccess
- if err != nil {
- logger.Errorf("dump failed after successful restore?: %q", err)
- }
- }
-
- if !*msg.Success {
- return fmt.Errorf(*msg.Message)
- }
-
- return nil
-}
-
type migrationSink struct {
- // We are pulling the container from src in pull mode.
+ // We are pulling the entity from src in pull mode.
src migrationFields
- // The container is pushed from src to dest in push mode. Note that
+ // The entity is pushed from src to dest in push mode. Note that
// websocket connections are not set in push mode. Only the secret
// fields are used since the client will connect to the sockets.
dest migrationFields
@@ -889,73 +249,17 @@ type migrationSink struct {
}
type MigrationSinkArgs struct {
- Url string
- Dialer websocket.Dialer
- Container container
- Secrets map[string]string
- Push bool
+ Url string
+ Dialer websocket.Dialer
+ Secrets map[string]string
+ Push bool
+
+ // container specific fields
Live bool
+ Container container
ContainerOnly bool
}
-func NewMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) {
- sink := migrationSink{
- src: migrationFields{container: args.Container, containerOnly: args.ContainerOnly},
- dest: migrationFields{containerOnly: args.ContainerOnly},
- url: args.Url,
- dialer: args.Dialer,
- push: args.Push,
- }
-
- if sink.push {
- sink.allConnected = make(chan bool, 1)
- }
-
- var ok bool
- var err error
- if sink.push {
- sink.dest.controlSecret, err = shared.RandomCryptoString()
- if err != nil {
- return nil, err
- }
-
- sink.dest.fsSecret, err = shared.RandomCryptoString()
- if err != nil {
- return nil, err
- }
-
- sink.dest.live = args.Live
- if sink.dest.live {
- sink.dest.criuSecret, err = shared.RandomCryptoString()
- if err != nil {
- return nil, err
- }
- }
- } else {
- sink.src.controlSecret, ok = args.Secrets["control"]
- if !ok {
- return nil, fmt.Errorf("Missing control secret")
- }
-
- sink.src.fsSecret, ok = args.Secrets["fs"]
- if !ok {
- return nil, fmt.Errorf("Missing fs secret")
- }
-
- sink.src.criuSecret, ok = args.Secrets["criu"]
- sink.src.live = ok
- }
-
- _, err = exec.LookPath("criu")
- if sink.push && sink.dest.live && err != nil {
- return nil, fmt.Errorf("Unable to perform container live migration. CRIU isn't installed on the destination server.")
- } else if sink.src.live && err != nil {
- return nil, fmt.Errorf("Unable to perform container live migration. CRIU isn't installed on the destination server.")
- }
-
- return &sink, nil
-}
-
func (c *migrationSink) connectWithSecret(secret string) (*websocket.Conn, error) {
query := url.Values{"secret": []string{secret}}
@@ -1017,300 +321,3 @@ func (s *migrationSink) Connect(op *operation, r *http.Request, w http.ResponseW
return nil
}
-
-func (c *migrationSink) Do(migrateOp *operation) error {
- var err error
-
- if c.push {
- <-c.allConnected
- }
-
- disconnector := c.src.disconnect
- if c.push {
- disconnector = c.dest.disconnect
- }
-
- if c.push {
- defer disconnector()
- } else {
- c.src.controlConn, err = c.connectWithSecret(c.src.controlSecret)
- if err != nil {
- return err
- }
- defer c.src.disconnect()
-
- c.src.fsConn, err = c.connectWithSecret(c.src.fsSecret)
- if err != nil {
- c.src.sendControl(err)
- return err
- }
-
- if c.src.live {
- c.src.criuConn, err = c.connectWithSecret(c.src.criuSecret)
- if err != nil {
- c.src.sendControl(err)
- return err
- }
- }
- }
-
- receiver := c.src.recv
- if c.push {
- receiver = c.dest.recv
- }
-
- sender := c.src.send
- if c.push {
- sender = c.dest.send
- }
-
- controller := c.src.sendControl
- if c.push {
- controller = c.dest.sendControl
- }
-
- header := migration.MigrationHeader{}
- if err := receiver(&header); err != nil {
- controller(err)
- return err
- }
-
- live := c.src.live
- if c.push {
- live = c.dest.live
- }
-
- criuType := migration.CRIUType_CRIU_RSYNC.Enum()
- if header.Criu != nil && *header.Criu == migration.CRIUType_NONE {
- criuType = migration.CRIUType_NONE.Enum()
- } else {
- if !live {
- criuType = nil
- }
- }
-
- mySink := c.src.container.Storage().MigrationSink
- myType := c.src.container.Storage().MigrationType()
- resp := migration.MigrationHeader{
- Fs: &myType,
- Criu: criuType,
- }
-
- // If the storage type the source has doesn't match what we have, then
- // we have to use rsync.
- if *header.Fs != *resp.Fs {
- mySink = rsyncMigrationSink
- myType = migration.MigrationFSType_RSYNC
- resp.Fs = &myType
- }
-
- if header.GetPredump() == true {
- // If the other side wants pre-dump and if
- // this side supports it, let's use it.
- resp.Predump = proto.Bool(true)
- } else {
- resp.Predump = proto.Bool(false)
- }
-
- err = sender(&resp)
- if err != nil {
- controller(err)
- return err
- }
-
- restore := make(chan error)
- go func(c *migrationSink) {
- imagesDir := ""
- srcIdmap := new(idmap.IdmapSet)
-
- for _, idmapSet := range header.Idmap {
- e := idmap.IdmapEntry{
- Isuid: *idmapSet.Isuid,
- Isgid: *idmapSet.Isgid,
- Nsid: int64(*idmapSet.Nsid),
- Hostid: int64(*idmapSet.Hostid),
- Maprange: int64(*idmapSet.Maprange)}
- srcIdmap.Idmap = idmap.Extend(srcIdmap.Idmap, e)
- }
-
- /* We do the fs receive in parallel so we don't have to reason
- * about when to receive what. The sending side is smart enough
- * to send the filesystem bits that it can before it seizes the
- * container to start checkpointing, so the total transfer time
- * will be minimized even if we're dumb here.
- */
- fsTransfer := make(chan error)
- go func() {
- snapshots := []*migration.Snapshot{}
-
- /* Legacy: we only sent the snapshot names, so we just
- * copy the container's config over, same as we used to
- * do.
- */
- if len(header.SnapshotNames) != len(header.Snapshots) {
- for _, name := range header.SnapshotNames {
- base := snapshotToProtobuf(c.src.container)
- base.Name = &name
- snapshots = append(snapshots, base)
- }
- } else {
- snapshots = header.Snapshots
- }
-
- var fsConn *websocket.Conn
- if c.push {
- fsConn = c.dest.fsConn
- } else {
- fsConn = c.src.fsConn
- }
-
- sendFinalFsDelta := false
- if live {
- sendFinalFsDelta = true
- }
-
- if criuType != nil && *criuType == migration.CRIUType_NONE {
- sendFinalFsDelta = true
- }
-
- err = mySink(sendFinalFsDelta, c.src.container,
- snapshots, fsConn, srcIdmap, migrateOp,
- c.src.containerOnly)
- if err != nil {
- fsTransfer <- err
- return
- }
-
- err = ShiftIfNecessary(c.src.container, srcIdmap)
- if err != nil {
- fsTransfer <- err
- return
- }
-
- fsTransfer <- nil
- }()
-
- if live {
- var err error
- imagesDir, err = ioutil.TempDir("", "lxd_restore_")
- if err != nil {
- restore <- err
- return
- }
-
- defer os.RemoveAll(imagesDir)
-
- var criuConn *websocket.Conn
- if c.push {
- criuConn = c.dest.criuConn
- } else {
- criuConn = c.src.criuConn
- }
-
- sync := &migration.MigrationSync{
- FinalPreDump: proto.Bool(false),
- }
-
- if resp.GetPredump() {
- logger.Debugf("Before the receive loop %s", sync.GetFinalPreDump())
- for !sync.GetFinalPreDump() {
- logger.Debugf("About to receive rsync")
- // Transfer a CRIU pre-dump
- err = RsyncRecv(shared.AddSlash(imagesDir), criuConn, nil)
- if err != nil {
- restore <- err
- return
- }
- logger.Debugf("rsync receive done")
-
- logger.Debugf("About to receive header")
- // Check if this was the last pre-dump
- // Only the FinalPreDump element if of interest
- mtype, data, err := criuConn.ReadMessage()
- if err != nil {
- logger.Debugf("err %s", err)
- restore <- err
- return
- }
- if mtype != websocket.BinaryMessage {
- restore <- err
- return
- }
- err = proto.Unmarshal(data, sync)
- if err != nil {
- logger.Debugf("err %s", err)
- restore <- err
- return
- }
- logger.Debugf("At the end of the receive loop %s", sync.GetFinalPreDump())
- }
- }
-
- // Final CRIU dump
- err = RsyncRecv(shared.AddSlash(imagesDir), criuConn, nil)
- if err != nil {
- restore <- err
- return
- }
- }
-
- err := <-fsTransfer
- if err != nil {
- restore <- err
- return
- }
-
- if live {
- criuMigrationArgs := CriuMigrationArgs{
- cmd: lxc.MIGRATE_RESTORE,
- stateDir: imagesDir,
- function: "migration",
- stop: false,
- actionScript: false,
- dumpDir: "final",
- preDumpDir: "",
- }
-
- // Currently we only do a single CRIU pre-dump so we
- // can hardcode "final" here since we know that "final" is the
- // folder for CRIU's final dump.
- err = c.src.container.Migrate(&criuMigrationArgs)
- if err != nil {
- restore <- err
- return
- }
-
- }
-
- restore <- nil
- }(c)
-
- var source <-chan migration.MigrationControl
- if c.push {
- source = c.dest.controlChannel()
- } else {
- source = c.src.controlChannel()
- }
-
- for {
- select {
- case err = <-restore:
- controller(err)
- return err
- case msg, ok := <-source:
- if !ok {
- disconnector()
- return fmt.Errorf("Got error reading source")
- }
- if !*msg.Success {
- disconnector()
- return fmt.Errorf(*msg.Message)
- } else {
- // The source can only tell us it failed (e.g. if
- // checkpointing failed). We have to tell the source
- // whether or not the restore was successful.
- logger.Debugf("Unknown message %v from source", msg)
- }
- }
- }
-}
diff --git a/lxd/migrate_container.go b/lxd/migrate_container.go
new file mode 100644
index 000000000..58300cccc
--- /dev/null
+++ b/lxd/migrate_container.go
@@ -0,0 +1,1013 @@
+package main
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strconv"
+ "strings"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/gorilla/websocket"
+ "gopkg.in/lxc/go-lxc.v2"
+
+ "github.com/lxc/lxd/lxd/migration"
+ "github.com/lxc/lxd/lxd/util"
+ "github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/api"
+ "github.com/lxc/lxd/shared/idmap"
+ "github.com/lxc/lxd/shared/logger"
+)
+
+func NewMigrationSource(c container, stateful bool, containerOnly bool) (*migrationSourceWs, error) {
+ ret := migrationSourceWs{migrationFields{container: c}, make(chan bool, 1)}
+ ret.containerOnly = containerOnly
+
+ var err error
+ ret.controlSecret, err = shared.RandomCryptoString()
+ if err != nil {
+ return nil, err
+ }
+
+ ret.fsSecret, err = shared.RandomCryptoString()
+ if err != nil {
+ return nil, err
+ }
+
+ if stateful && c.IsRunning() {
+ _, err := exec.LookPath("criu")
+ if err != nil {
+ return nil, fmt.Errorf("Unable to perform container live migration. CRIU isn't installed on the source server.")
+ }
+
+ ret.live = true
+ ret.criuSecret, err = shared.RandomCryptoString()
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return &ret, nil
+}
+
+func writeActionScript(directory string, operation string, secret string, execPath string) error {
+ script := fmt.Sprintf(`#!/bin/sh -e
+if [ "$CRTOOLS_SCRIPT_ACTION" = "post-dump" ]; then
+ %s migratedumpsuccess %s %s
+fi
+`, execPath, operation, secret)
+
+ f, err := os.Create(filepath.Join(directory, "action.sh"))
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+
+ err = f.Chmod(0500)
+ if err != nil {
+ return err
+ }
+
+ _, err = f.WriteString(script)
+ return err
+}
+
+func snapshotToProtobuf(c container) *migration.Snapshot {
+ config := []*migration.Config{}
+ for k, v := range c.LocalConfig() {
+ kCopy := string(k)
+ vCopy := string(v)
+ config = append(config, &migration.Config{Key: &kCopy, Value: &vCopy})
+ }
+
+ devices := []*migration.Device{}
+ for name, d := range c.LocalDevices() {
+ props := []*migration.Config{}
+ for k, v := range d {
+ kCopy := string(k)
+ vCopy := string(v)
+ props = append(props, &migration.Config{Key: &kCopy, Value: &vCopy})
+ }
+
+ devices = append(devices, &migration.Device{Name: &name, Config: props})
+ }
+
+ parts := strings.SplitN(c.Name(), shared.SnapshotDelimiter, 2)
+ isEphemeral := c.IsEphemeral()
+ arch := int32(c.Architecture())
+ stateful := c.IsStateful()
+
+ return &migration.Snapshot{
+ Name: &parts[len(parts)-1],
+ LocalConfig: config,
+ Profiles: c.Profiles(),
+ Ephemeral: &isEphemeral,
+ LocalDevices: devices,
+ Architecture: &arch,
+ Stateful: &stateful,
+ }
+}
+
+// Check if CRIU supports pre-dumping and number of
+// pre-dump iterations
+func (s *migrationSourceWs) checkForPreDumpSupport() (bool, int) {
+ // Ask CRIU if this architecture/kernel/criu combination
+ // supports pre-copy (dirty memory tracking)
+ criuMigrationArgs := CriuMigrationArgs{
+ cmd: lxc.MIGRATE_FEATURE_CHECK,
+ stateDir: "",
+ function: "feature-check",
+ stop: false,
+ actionScript: false,
+ dumpDir: "",
+ preDumpDir: "",
+ features: lxc.FEATURE_MEM_TRACK,
+ }
+ err := s.container.Migrate(&criuMigrationArgs)
+
+ if err != nil {
+ // CRIU says it does not know about dirty memory tracking.
+ // This means the rest of this function is irrelevant.
+ return false, 0
+ }
+
+ // CRIU says it can actually do pre-dump. Let's set it to true
+ // unless the user wants something else.
+ use_pre_dumps := true
+
+ // What does the configuration say about pre-copy
+ tmp := s.container.ExpandedConfig()["migration.incremental.memory"]
+
+ if tmp != "" {
+ use_pre_dumps = shared.IsTrue(tmp)
+ }
+ logger.Debugf("migration.incremental.memory %d", use_pre_dumps)
+
+ var max_iterations int
+
+ // migration.incremental.memory.iterations is the value after which the
+ // container will be definitely migrated, even if the remaining number
+ // of memory pages is below the defined threshold.
+ tmp = s.container.ExpandedConfig()["migration.incremental.memory.iterations"]
+ if tmp != "" {
+ max_iterations, _ = strconv.Atoi(tmp)
+ } else {
+ // default to 10
+ max_iterations = 10
+ }
+ if max_iterations > 999 {
+ // the pre-dump directory is hardcoded to a string
+ // with maximal 3 digits. 999 pre-dumps makes no
+ // sense at all, but let's make sure the number
+ // is not higher than this.
+ max_iterations = 999
+ }
+ logger.Debugf("using maximal %d iterations for pre-dumping", max_iterations)
+
+ return use_pre_dumps, max_iterations
+}
+
+// The function readCriuStatsDump() reads the CRIU 'stats-dump' file
+// in path and returns the pages_written, pages_skipped_parent, error.
+func readCriuStatsDump(path string) (uint64, uint64, error) {
+ statsDump := shared.AddSlash(path) + "stats-dump"
+ in, err := ioutil.ReadFile(statsDump)
+ if err != nil {
+ logger.Errorf("Error reading CRIU's 'stats-dump' file: %s", err.Error())
+ return 0, 0, err
+ }
+
+ // According to the CRIU file image format it starts with two magic values.
+ // First magic IMG_SERVICE: 1427134784
+ if binary.LittleEndian.Uint32(in[0:4]) != 1427134784 {
+ msg := "IMG_SERVICE(1427134784) criu magic not found"
+ logger.Errorf(msg)
+ return 0, 0, fmt.Errorf(msg)
+ }
+ // Second magic STATS: 1460220678
+ if binary.LittleEndian.Uint32(in[4:8]) != 1460220678 {
+ msg := "STATS(1460220678) criu magic not found"
+ logger.Errorf(msg)
+ return 0, 0, fmt.Errorf(msg)
+ }
+
+ // Next, read the size of the image payload
+ size := binary.LittleEndian.Uint32(in[8:12])
+ logger.Debugf("stats-dump payload size %d", size)
+
+ statsEntry := &migration.StatsEntry{}
+ if err = proto.Unmarshal(in[12:12+size], statsEntry); err != nil {
+ logger.Errorf("Failed to parse CRIU's 'stats-dump' file: %s", err.Error())
+ return 0, 0, err
+ }
+
+ written := statsEntry.GetDump().GetPagesWritten()
+ skipped := statsEntry.GetDump().GetPagesSkippedParent()
+ return written, skipped, nil
+}
+
+type preDumpLoopArgs struct {
+ checkpointDir string
+ bwlimit string
+ preDumpDir string
+ dumpDir string
+ final bool
+}
+
+// The function preDumpLoop is the main logic behind the pre-copy migration.
+// This function contains the actual pre-dump, the corresponding rsync
+// transfer and it tells the outer loop to abort if the threshold
+// of memory pages transferred by pre-dumping has been reached.
+func (s *migrationSourceWs) preDumpLoop(args *preDumpLoopArgs) (bool, error) {
+ // Do a CRIU pre-dump
+ criuMigrationArgs := CriuMigrationArgs{
+ cmd: lxc.MIGRATE_PRE_DUMP,
+ stop: false,
+ actionScript: false,
+ preDumpDir: args.preDumpDir,
+ dumpDir: args.dumpDir,
+ stateDir: args.checkpointDir,
+ function: "migration",
+ }
+
+ logger.Debugf("Doing another pre-dump in %s", args.preDumpDir)
+
+ final := args.final
+
+ err := s.container.Migrate(&criuMigrationArgs)
+ if err != nil {
+ return final, err
+ }
+
+ // Send the pre-dump.
+ ctName, _, _ := containerGetParentAndSnapshotName(s.container.Name())
+ state := s.container.DaemonState()
+ err = RsyncSend(ctName, shared.AddSlash(args.checkpointDir), s.criuConn, nil, args.bwlimit, state.OS.ExecPath)
+ if err != nil {
+ return final, err
+ }
+
+ // Read the CRIU's 'stats-dump' file
+ dumpPath := shared.AddSlash(args.checkpointDir)
+ dumpPath += shared.AddSlash(args.dumpDir)
+ written, skipped_parent, err := readCriuStatsDump(dumpPath)
+ if err != nil {
+ return final, err
+ }
+
+ logger.Debugf("CRIU pages written %d", written)
+ logger.Debugf("CRIU pages skipped %d", skipped_parent)
+
+ total_pages := written + skipped_parent
+
+ percentage_skipped := int(100 - ((100 * written) / total_pages))
+
+ logger.Debugf("CRIU pages skipped percentage %d%%", percentage_skipped)
+
+ // threshold is the percentage of memory pages that needs
+ // to be pre-copied for the pre-copy migration to stop.
+ var threshold int
+ tmp := s.container.ExpandedConfig()["migration.incremental.memory.goal"]
+ if tmp != "" {
+ threshold, _ = strconv.Atoi(tmp)
+ } else {
+ // defaults to 70%
+ threshold = 70
+ }
+
+ if percentage_skipped > threshold {
+ logger.Debugf("Memory pages skipped (%d%%) due to pre-copy is larger than threshold (%d%%)", percentage_skipped, threshold)
+ logger.Debugf("This was the last pre-dump; next dump is the final dump")
+ final = true
+ }
+
+ // If in pre-dump mode, the receiving side
+ // expects a message to know if this was the
+ // last pre-dump
+ logger.Debugf("Sending another header")
+ sync := migration.MigrationSync{
+ FinalPreDump: proto.Bool(final),
+ }
+
+ data, err := proto.Marshal(&sync)
+
+ if err != nil {
+ return final, err
+ }
+
+ err = s.criuConn.WriteMessage(websocket.BinaryMessage, data)
+ if err != nil {
+ s.sendControl(err)
+ return final, err
+ }
+ logger.Debugf("Sending another header done")
+
+ return final, nil
+}
+
+func (s *migrationSourceWs) Do(migrateOp *operation) error {
+ <-s.allConnected
+
+ criuType := migration.CRIUType_CRIU_RSYNC.Enum()
+ if !s.live {
+ criuType = nil
+ if s.container.IsRunning() {
+ criuType = migration.CRIUType_NONE.Enum()
+ }
+ }
+
+ // Storage needs to start unconditionally now, since we need to
+ // initialize a new storage interface.
+ ourStart, err := s.container.StorageStart()
+ if err != nil {
+ return err
+ }
+ if ourStart {
+ defer s.container.StorageStop()
+ }
+
+ idmaps := make([]*migration.IDMapType, 0)
+
+ idmapset, err := s.container.IdmapSet()
+ if err != nil {
+ return err
+ }
+
+ if idmapset != nil {
+ for _, ctnIdmap := range idmapset.Idmap {
+ idmap := migration.IDMapType{
+ Isuid: proto.Bool(ctnIdmap.Isuid),
+ Isgid: proto.Bool(ctnIdmap.Isgid),
+ Hostid: proto.Int32(int32(ctnIdmap.Hostid)),
+ Nsid: proto.Int32(int32(ctnIdmap.Nsid)),
+ Maprange: proto.Int32(int32(ctnIdmap.Maprange)),
+ }
+
+ idmaps = append(idmaps, &idmap)
+ }
+ }
+
+ driver, fsErr := s.container.Storage().MigrationSource(s.container, s.containerOnly)
+
+ snapshots := []*migration.Snapshot{}
+ snapshotNames := []string{}
+ // Only send snapshots when requested.
+ if !s.containerOnly {
+ if fsErr == nil {
+ fullSnaps := driver.Snapshots()
+ for _, snap := range fullSnaps {
+ snapshots = append(snapshots, snapshotToProtobuf(snap))
+ snapshotNames = append(snapshotNames, shared.ExtractSnapshotName(snap.Name()))
+ }
+ }
+ }
+
+ use_pre_dumps := false
+ max_iterations := 0
+ if s.live {
+ use_pre_dumps, max_iterations = s.checkForPreDumpSupport()
+ }
+
+ // The protocol says we have to send a header no matter what, so let's
+ // do that, but then immediately send an error.
+ myType := s.container.Storage().MigrationType()
+ header := migration.MigrationHeader{
+ Fs: &myType,
+ Criu: criuType,
+ Idmap: idmaps,
+ SnapshotNames: snapshotNames,
+ Snapshots: snapshots,
+ Predump: proto.Bool(use_pre_dumps),
+ }
+
+ err = s.send(&header)
+ if err != nil {
+ s.sendControl(err)
+ return err
+ }
+
+ if fsErr != nil {
+ s.sendControl(fsErr)
+ return fsErr
+ }
+
+ err = s.recv(&header)
+ if err != nil {
+ s.sendControl(err)
+ return err
+ }
+
+ bwlimit := ""
+ if *header.Fs != myType {
+ myType = migration.MigrationFSType_RSYNC
+ header.Fs = &myType
+
+ driver, _ = rsyncMigrationSource(s.container, s.containerOnly)
+
+ // Check if this storage pool has a rate limit set for rsync.
+ poolwritable := s.container.Storage().GetStoragePoolWritable()
+ if poolwritable.Config != nil {
+ bwlimit = poolwritable.Config["rsync.bwlimit"]
+ }
+ }
+
+ // Check if the other side knows about pre-dumping and
+ // the associated rsync protocol
+ use_pre_dumps = header.GetPredump()
+ if use_pre_dumps {
+ logger.Debugf("The other side does support pre-copy")
+ } else {
+ logger.Debugf("The other side does not support pre-copy")
+ }
+
+ // All failure paths need to do a few things to correctly handle errors before returning.
+ // Unfortunately, handling errors is not well-suited to defer as the code depends on the
+ // status of driver and the error value. The error value is especially tricky due to the
+ // common case of creating a new err variable (intentional or not) due to scoping and use
+ // of ":=". Capturing err in a closure for use in defer would be fragile, which defeats
+ // the purpose of using defer. An abort function reduces the odds of mishandling errors
+ // without introducing the fragility of closing on err.
+ abort := func(err error) error {
+ driver.Cleanup()
+ s.sendControl(err)
+ return err
+ }
+
+ err = driver.SendWhileRunning(s.fsConn, migrateOp, bwlimit, s.containerOnly)
+ if err != nil {
+ return abort(err)
+ }
+
+ restoreSuccess := make(chan bool, 1)
+ dumpSuccess := make(chan error, 1)
+
+ if s.live {
+ if header.Criu == nil {
+ return abort(fmt.Errorf("Got no CRIU socket type for live migration"))
+ } else if *header.Criu != migration.CRIUType_CRIU_RSYNC {
+ return abort(fmt.Errorf("Formats other than criu rsync not understood"))
+ }
+
+ checkpointDir, err := ioutil.TempDir("", "lxd_checkpoint_")
+ if err != nil {
+ return abort(err)
+ }
+
+ if util.RuntimeLiblxcVersionAtLeast(2, 0, 4) {
+ /* What happens below is slightly convoluted. Due to various
+ * complications with networking, there's no easy way for criu
+ * to exit and leave the container in a frozen state for us to
+ * somehow resume later.
+ *
+ * Instead, we use what criu calls an "action-script", which is
+ * basically a callback that lets us know when the dump is
+ * done. (Unfortunately, we can't pass arguments, just an
+ * executable path, so we write a custom action script with the
+ * real command we want to run.)
+ *
+ * This script then hangs until the migration operation either
+ * finishes successfully or fails, and exits 1 or 0, which
+ * causes criu to either leave the container running or kill it
+ * as we asked.
+ */
+ dumpDone := make(chan bool, 1)
+ actionScriptOpSecret, err := shared.RandomCryptoString()
+ if err != nil {
+ os.RemoveAll(checkpointDir)
+ return abort(err)
+ }
+
+ state := s.container.DaemonState()
+ actionScriptOp, err := operationCreate(
+ state.Cluster,
+ operationClassWebsocket,
+ "Live-migrating container",
+ nil,
+ nil,
+ func(op *operation) error {
+ result := <-restoreSuccess
+ if !result {
+ return fmt.Errorf("restore failed, failing CRIU")
+ }
+ return nil
+ },
+ nil,
+ func(op *operation, r *http.Request, w http.ResponseWriter) error {
+ secret := r.FormValue("secret")
+ if secret == "" {
+ return fmt.Errorf("missing secret")
+ }
+
+ if secret != actionScriptOpSecret {
+ return os.ErrPermission
+ }
+
+ c, err := shared.WebsocketUpgrader.Upgrade(w, r, nil)
+ if err != nil {
+ return err
+ }
+
+ dumpDone <- true
+
+ closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
+ return c.WriteMessage(websocket.CloseMessage, closeMsg)
+ },
+ )
+ if err != nil {
+ os.RemoveAll(checkpointDir)
+ return abort(err)
+ }
+
+ err = writeActionScript(checkpointDir, actionScriptOp.url, actionScriptOpSecret, state.OS.ExecPath)
+ if err != nil {
+ os.RemoveAll(checkpointDir)
+ return abort(err)
+ }
+
+ preDumpCounter := 0
+ preDumpDir := ""
+ if use_pre_dumps {
+ final := false
+ for !final {
+ preDumpCounter++
+ if preDumpCounter < max_iterations {
+ final = false
+ } else {
+ final = true
+ }
+ dumpDir := fmt.Sprintf("%03d", preDumpCounter)
+ loop_args := preDumpLoopArgs{
+ checkpointDir: checkpointDir,
+ bwlimit: bwlimit,
+ preDumpDir: preDumpDir,
+ dumpDir: dumpDir,
+ final: final,
+ }
+ final, err = s.preDumpLoop(&loop_args)
+ if err != nil {
+ os.RemoveAll(checkpointDir)
+ return abort(err)
+ }
+ preDumpDir = fmt.Sprintf("%03d", preDumpCounter)
+ preDumpCounter++
+ }
+ }
+
+ _, err = actionScriptOp.Run()
+ if err != nil {
+ os.RemoveAll(checkpointDir)
+ return abort(err)
+ }
+
+ go func() {
+ criuMigrationArgs := CriuMigrationArgs{
+ cmd: lxc.MIGRATE_DUMP,
+ stop: true,
+ actionScript: true,
+ preDumpDir: preDumpDir,
+ dumpDir: "final",
+ stateDir: checkpointDir,
+ function: "migration",
+ }
+
+ // Do the final CRIU dump. This is needs no special
+ // handling if pre-dumps are used or not
+ dumpSuccess <- s.container.Migrate(&criuMigrationArgs)
+ os.RemoveAll(checkpointDir)
+ }()
+
+ select {
+ /* the checkpoint failed, let's just abort */
+ case err = <-dumpSuccess:
+ return abort(err)
+ /* the dump finished, let's continue on to the restore */
+ case <-dumpDone:
+ logger.Debugf("Dump finished, continuing with restore...")
+ }
+ } else {
+ logger.Debugf("liblxc version is older than 2.0.4 and the live migration will probably fail")
+ defer os.RemoveAll(checkpointDir)
+ criuMigrationArgs := CriuMigrationArgs{
+ cmd: lxc.MIGRATE_DUMP,
+ stateDir: checkpointDir,
+ function: "migration",
+ stop: true,
+ actionScript: false,
+ dumpDir: "final",
+ preDumpDir: "",
+ }
+
+ err = s.container.Migrate(&criuMigrationArgs)
+ if err != nil {
+ return abort(err)
+ }
+ }
+
+ /*
+ * We do the serially right now, but there's really no reason for us
+ * to; since we have separate websockets, we can do it in parallel if
+ * we wanted to. However, assuming we're network bound, there's really
+ * no reason to do these in parallel. In the future when we're using
+ * p.haul's protocol, it will make sense to do these in parallel.
+ */
+ ctName, _, _ := containerGetParentAndSnapshotName(s.container.Name())
+ state := s.container.DaemonState()
+ err = RsyncSend(ctName, shared.AddSlash(checkpointDir), s.criuConn, nil, bwlimit, state.OS.ExecPath)
+ if err != nil {
+ return abort(err)
+ }
+ }
+
+ if s.live || (header.Criu != nil && *header.Criu == migration.CRIUType_NONE) {
+ err = driver.SendAfterCheckpoint(s.fsConn, bwlimit)
+ if err != nil {
+ return abort(err)
+ }
+ }
+
+ driver.Cleanup()
+
+ msg := migration.MigrationControl{}
+ err = s.recv(&msg)
+ if err != nil {
+ s.disconnect()
+ return err
+ }
+
+ if s.live {
+ restoreSuccess <- *msg.Success
+ err := <-dumpSuccess
+ if err != nil {
+ logger.Errorf("dump failed after successful restore?: %q", err)
+ }
+ }
+
+ if !*msg.Success {
+ return fmt.Errorf(*msg.Message)
+ }
+
+ return nil
+}
+
+func NewMigrationSink(args *MigrationSinkArgs) (*migrationSink, error) {
+ sink := migrationSink{
+ src: migrationFields{container: args.Container, containerOnly: args.ContainerOnly},
+ dest: migrationFields{containerOnly: args.ContainerOnly},
+ url: args.Url,
+ dialer: args.Dialer,
+ push: args.Push,
+ }
+
+ if sink.push {
+ sink.allConnected = make(chan bool, 1)
+ }
+
+ var ok bool
+ var err error
+ if sink.push {
+ sink.dest.controlSecret, err = shared.RandomCryptoString()
+ if err != nil {
+ return nil, err
+ }
+
+ sink.dest.fsSecret, err = shared.RandomCryptoString()
+ if err != nil {
+ return nil, err
+ }
+
+ sink.dest.live = args.Live
+ if sink.dest.live {
+ sink.dest.criuSecret, err = shared.RandomCryptoString()
+ if err != nil {
+ return nil, err
+ }
+ }
+ } else {
+ sink.src.controlSecret, ok = args.Secrets["control"]
+ if !ok {
+ return nil, fmt.Errorf("Missing control secret")
+ }
+
+ sink.src.fsSecret, ok = args.Secrets["fs"]
+ if !ok {
+ return nil, fmt.Errorf("Missing fs secret")
+ }
+
+ sink.src.criuSecret, ok = args.Secrets["criu"]
+ sink.src.live = ok
+ }
+
+ _, err = exec.LookPath("criu")
+ if sink.push && sink.dest.live && err != nil {
+ return nil, fmt.Errorf("Unable to perform container live migration. CRIU isn't installed on the destination server.")
+ } else if sink.src.live && err != nil {
+ return nil, fmt.Errorf("Unable to perform container live migration. CRIU isn't installed on the destination server.")
+ }
+
+ return &sink, nil
+}
+
+func (c *migrationSink) Do(migrateOp *operation) error {
+ var err error
+
+ if c.push {
+ <-c.allConnected
+ }
+
+ disconnector := c.src.disconnect
+ if c.push {
+ disconnector = c.dest.disconnect
+ }
+
+ if c.push {
+ defer disconnector()
+ } else {
+ c.src.controlConn, err = c.connectWithSecret(c.src.controlSecret)
+ if err != nil {
+ return err
+ }
+ defer c.src.disconnect()
+
+ c.src.fsConn, err = c.connectWithSecret(c.src.fsSecret)
+ if err != nil {
+ c.src.sendControl(err)
+ return err
+ }
+
+ if c.src.live {
+ c.src.criuConn, err = c.connectWithSecret(c.src.criuSecret)
+ if err != nil {
+ c.src.sendControl(err)
+ return err
+ }
+ }
+ }
+
+ receiver := c.src.recv
+ if c.push {
+ receiver = c.dest.recv
+ }
+
+ sender := c.src.send
+ if c.push {
+ sender = c.dest.send
+ }
+
+ controller := c.src.sendControl
+ if c.push {
+ controller = c.dest.sendControl
+ }
+
+ header := migration.MigrationHeader{}
+ if err := receiver(&header); err != nil {
+ controller(err)
+ return err
+ }
+
+ live := c.src.live
+ if c.push {
+ live = c.dest.live
+ }
+
+ criuType := migration.CRIUType_CRIU_RSYNC.Enum()
+ if header.Criu != nil && *header.Criu == migration.CRIUType_NONE {
+ criuType = migration.CRIUType_NONE.Enum()
+ } else {
+ if !live {
+ criuType = nil
+ }
+ }
+
+ mySink := c.src.container.Storage().MigrationSink
+ myType := c.src.container.Storage().MigrationType()
+ resp := migration.MigrationHeader{
+ Fs: &myType,
+ Criu: criuType,
+ }
+
+ // If the storage type the source has doesn't match what we have, then
+ // we have to use rsync.
+ if *header.Fs != *resp.Fs {
+ mySink = rsyncMigrationSink
+ myType = migration.MigrationFSType_RSYNC
+ resp.Fs = &myType
+ }
+
+ if header.GetPredump() == true {
+ // If the other side wants pre-dump and if
+ // this side supports it, let's use it.
+ resp.Predump = proto.Bool(true)
+ } else {
+ resp.Predump = proto.Bool(false)
+ }
+
+ err = sender(&resp)
+ if err != nil {
+ controller(err)
+ return err
+ }
+
+ restore := make(chan error)
+ go func(c *migrationSink) {
+ imagesDir := ""
+ srcIdmap := new(idmap.IdmapSet)
+
+ for _, idmapSet := range header.Idmap {
+ e := idmap.IdmapEntry{
+ Isuid: *idmapSet.Isuid,
+ Isgid: *idmapSet.Isgid,
+ Nsid: int64(*idmapSet.Nsid),
+ Hostid: int64(*idmapSet.Hostid),
+ Maprange: int64(*idmapSet.Maprange)}
+ srcIdmap.Idmap = idmap.Extend(srcIdmap.Idmap, e)
+ }
+
+ /* We do the fs receive in parallel so we don't have to reason
+ * about when to receive what. The sending side is smart enough
+ * to send the filesystem bits that it can before it seizes the
+ * container to start checkpointing, so the total transfer time
+ * will be minimized even if we're dumb here.
+ */
+ fsTransfer := make(chan error)
+ go func() {
+ snapshots := []*migration.Snapshot{}
+
+ /* Legacy: we only sent the snapshot names, so we just
+ * copy the container's config over, same as we used to
+ * do.
+ */
+ if len(header.SnapshotNames) != len(header.Snapshots) {
+ for _, name := range header.SnapshotNames {
+ base := snapshotToProtobuf(c.src.container)
+ base.Name = &name
+ snapshots = append(snapshots, base)
+ }
+ } else {
+ snapshots = header.Snapshots
+ }
+
+ var fsConn *websocket.Conn
+ if c.push {
+ fsConn = c.dest.fsConn
+ } else {
+ fsConn = c.src.fsConn
+ }
+
+ sendFinalFsDelta := false
+ if live {
+ sendFinalFsDelta = true
+ }
+
+ if criuType != nil && *criuType == migration.CRIUType_NONE {
+ sendFinalFsDelta = true
+ }
+
+ err = mySink(sendFinalFsDelta, c.src.container,
+ snapshots, fsConn, srcIdmap, migrateOp,
+ c.src.containerOnly)
+ if err != nil {
+ fsTransfer <- err
+ return
+ }
+
+ err = ShiftIfNecessary(c.src.container, srcIdmap)
+ if err != nil {
+ fsTransfer <- err
+ return
+ }
+
+ fsTransfer <- nil
+ }()
+
+ if live {
+ var err error
+ imagesDir, err = ioutil.TempDir("", "lxd_restore_")
+ if err != nil {
+ restore <- err
+ return
+ }
+
+ defer os.RemoveAll(imagesDir)
+
+ var criuConn *websocket.Conn
+ if c.push {
+ criuConn = c.dest.criuConn
+ } else {
+ criuConn = c.src.criuConn
+ }
+
+ sync := &migration.MigrationSync{
+ FinalPreDump: proto.Bool(false),
+ }
+
+ if resp.GetPredump() {
+ logger.Debugf("Before the receive loop %s", sync.GetFinalPreDump())
+ for !sync.GetFinalPreDump() {
+ logger.Debugf("About to receive rsync")
+ // Transfer a CRIU pre-dump
+ err = RsyncRecv(shared.AddSlash(imagesDir), criuConn, nil)
+ if err != nil {
+ restore <- err
+ return
+ }
+ logger.Debugf("rsync receive done")
+
+ logger.Debugf("About to receive header")
+ // Check if this was the last pre-dump
+ // Only the FinalPreDump element if of interest
+ mtype, data, err := criuConn.ReadMessage()
+ if err != nil {
+ logger.Debugf("err %s", err)
+ restore <- err
+ return
+ }
+ if mtype != websocket.BinaryMessage {
+ restore <- err
+ return
+ }
+ err = proto.Unmarshal(data, sync)
+ if err != nil {
+ logger.Debugf("err %s", err)
+ restore <- err
+ return
+ }
+ logger.Debugf("At the end of the receive loop %s", sync.GetFinalPreDump())
+ }
+ }
+
+ // Final CRIU dump
+ err = RsyncRecv(shared.AddSlash(imagesDir), criuConn, nil)
+ if err != nil {
+ restore <- err
+ return
+ }
+ }
+
+ err := <-fsTransfer
+ if err != nil {
+ restore <- err
+ return
+ }
+
+ if live {
+ criuMigrationArgs := CriuMigrationArgs{
+ cmd: lxc.MIGRATE_RESTORE,
+ stateDir: imagesDir,
+ function: "migration",
+ stop: false,
+ actionScript: false,
+ dumpDir: "final",
+ preDumpDir: "",
+ }
+
+ // Currently we only do a single CRIU pre-dump so we
+ // can hardcode "final" here since we know that "final" is the
+ // folder for CRIU's final dump.
+ err = c.src.container.Migrate(&criuMigrationArgs)
+ if err != nil {
+ restore <- err
+ return
+ }
+
+ }
+
+ restore <- nil
+ }(c)
+
+ var source <-chan migration.MigrationControl
+ if c.push {
+ source = c.dest.controlChannel()
+ } else {
+ source = c.src.controlChannel()
+ }
+
+ for {
+ select {
+ case err = <-restore:
+ controller(err)
+ return err
+ case msg, ok := <-source:
+ if !ok {
+ disconnector()
+ return fmt.Errorf("Got error reading source")
+ }
+ if !*msg.Success {
+ disconnector()
+ return fmt.Errorf(*msg.Message)
+ } else {
+ // The source can only tell us it failed (e.g. if
+ // checkpointing failed). We have to tell the source
+ // whether or not the restore was successful.
+ logger.Debugf("Unknown message %v from source", msg)
+ }
+ }
+ }
+}
+
+func (s *migrationSourceWs) ConnectContainerTarget(target api.ContainerPostTarget) error {
+ return s.ConnectTarget(target.Operation, target.Operation, target.Websockets)
+}
From 610c8dcac6ab38dd27cf274a68605da775ca7f75 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 22 Mar 2018 18:30:57 +0100
Subject: [PATCH 2/3] client: send "copy" type
Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
client/lxd_storage_volumes.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/client/lxd_storage_volumes.go b/client/lxd_storage_volumes.go
index eabeea5a1..79c418ab8 100644
--- a/client/lxd_storage_volumes.go
+++ b/client/lxd_storage_volumes.go
@@ -109,7 +109,7 @@ func (r *ProtocolLXD) CopyStoragePoolVolume(pool string, source ContainerServer,
Type: volume.Type,
Source: api.StorageVolumeSource{
Name: volume.Name,
- Type: volume.Type,
+ Type: "copy",
Pool: sourcePool,
},
}
From c3329f6bfc12ab910949233e2d69c5bdffd4fd0e Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 22 Mar 2018 18:27:36 +0100
Subject: [PATCH 3/3] storage: add helper in API endpoint
Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
lxd/storage_volumes.go | 22 ++++++++++++++++------
1 file changed, 16 insertions(+), 6 deletions(-)
diff --git a/lxd/storage_volumes.go b/lxd/storage_volumes.go
index a85892f79..0efda2d70 100644
--- a/lxd/storage_volumes.go
+++ b/lxd/storage_volumes.go
@@ -162,10 +162,6 @@ func storagePoolVolumesTypePost(d *Daemon, r *http.Request) Response {
return BadRequest(fmt.Errorf("you must provide a storage volume type of the storage volume"))
}
- // Check if the user gave us a valid pool name in which the new storage
- // volume is supposed to be created.
- poolName := mux.Vars(r)["name"]
-
// We currently only allow to create storage volumes of type
// storagePoolVolumeTypeCustom. So check, that nothing else was
// requested.
@@ -174,8 +170,22 @@ func storagePoolVolumesTypePost(d *Daemon, r *http.Request) Response {
`storage volumes of type %s`, req.Type))
}
+ poolName := mux.Vars(r)["name"]
+
+ switch req.Source.Type {
+ case "":
+ return doVolumeCreateOrCopy(d, poolName, &req)
+ case "copy":
+ return doVolumeCreateOrCopy(d, poolName, &req)
+ default:
+ return BadRequest(fmt.Errorf("unknown source type %s", req.Source.Type))
+ }
+}
+
+func doVolumeCreateOrCopy(d *Daemon, poolName string, req *api.StorageVolumesPost) Response {
+
doWork := func() error {
- err = storagePoolVolumeCreateInternal(d.State(), poolName, &req)
+ err := storagePoolVolumeCreateInternal(d.State(), poolName, req)
if err != nil {
return err
}
@@ -183,7 +193,7 @@ func storagePoolVolumesTypePost(d *Daemon, r *http.Request) Response {
}
if req.Source.Name == "" {
- err = doWork()
+ err := doWork()
if err != nil {
return SmartError(err)
}
More information about the lxc-devel
mailing list