[lxc-devel] [lxd/master] Migration progress tracking
stgraber on Github
lxc-bot at linuxcontainers.org
Thu Nov 17 05:30:15 UTC 2016
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20161117/074b750d/attachment.bin>
-------------- next part --------------
From 9791a44ae9b254af5c012c52ab4495ad8c79dac4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 16 Nov 2016 15:21:34 -0500
Subject: [PATCH 1/6] On progress updates, keep cursor at end of line
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxc/main.go | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/lxc/main.go b/lxc/main.go
index b50904e..3eacd63 100644
--- a/lxc/main.go
+++ b/lxc/main.go
@@ -323,9 +323,10 @@ func (p *ProgressRenderer) Done(msg string) {
if len(msg) > p.maxLength {
p.maxLength = len(msg)
} else {
- fmt.Printf("%s\r", strings.Repeat(" ", p.maxLength))
+ fmt.Printf("\r%s", strings.Repeat(" ", p.maxLength))
}
+ fmt.Print("\r")
fmt.Print(msg)
}
@@ -335,13 +336,12 @@ func (p *ProgressRenderer) Update(status string) {
msg = p.Format
}
- msg = fmt.Sprintf(msg, status)
- msg += "\r"
+ msg = fmt.Sprintf("\r"+msg, status)
if len(msg) > p.maxLength {
p.maxLength = len(msg)
} else {
- fmt.Printf("%s\r", strings.Repeat(" ", p.maxLength))
+ fmt.Printf("\r%s", strings.Repeat(" ", p.maxLength))
}
fmt.Print(msg)
From 2b057a2f192e33a78678c2775b2d1a87f9195ef9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 16 Nov 2016 15:13:16 -0500
Subject: [PATCH 2/6] Support absolute file transfer tracking
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
For when we don't know the total length.
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
client.go | 2 +-
lxc/image.go | 4 +--
lxd/daemon_images.go | 4 +--
shared/simplestreams.go | 4 +--
shared/util.go | 67 +++++++++++++++++++++++++++++++++++--------------
5 files changed, 55 insertions(+), 26 deletions(-)
diff --git a/client.go b/client.go
index a32709b..31a47ba 100644
--- a/client.go
+++ b/client.go
@@ -988,7 +988,7 @@ func (c *Client) PostImageURL(imageFile string, properties []string, public bool
return fingerprint, nil
}
-func (c *Client) PostImage(imageFile string, rootfsFile string, properties []string, public bool, aliases []string, progressHandler func(int, int)) (string, error) {
+func (c *Client) PostImage(imageFile string, rootfsFile string, properties []string, public bool, aliases []string, progressHandler func(int64, int64)) (string, error) {
if c.Remote.Public {
return "", fmt.Errorf("This function isn't supported by public remotes.")
}
diff --git a/lxc/image.go b/lxc/image.go
index 2241d7f..96db696 100644
--- a/lxc/image.go
+++ b/lxc/image.go
@@ -425,8 +425,8 @@ func (c *imageCmd) run(config *lxd.Config, args []string) error {
return fmt.Errorf(i18n.G("Only https:// is supported for remote image import."))
} else {
progress := ProgressRenderer{Format: i18n.G("Transferring image: %s")}
- handler := func(percent int, speed int) {
- progress.Update(fmt.Sprintf("%d%% (%s/s)", percent, shared.GetByteSizeString(int64(speed))))
+ handler := func(percent int64, speed int64) {
+ progress.Update(fmt.Sprintf("%d%% (%s/s)", percent, shared.GetByteSizeString(speed)))
}
fingerprint, err = d.PostImage(imageFile, rootfsFile, properties, c.publicImage, c.addAliases, handler)
diff --git a/lxd/daemon_images.go b/lxd/daemon_images.go
index 5007d13..fe19413 100644
--- a/lxd/daemon_images.go
+++ b/lxd/daemon_images.go
@@ -247,7 +247,7 @@ func (d *Daemon) ImageDownload(op *operation, server string, protocol string, ce
d.Storage.ImageDelete(fp)
}
- progress := func(progressInt int, speedInt int) {
+ progress := func(progressInt int64, speedInt int64) {
if op == nil {
return
}
@@ -257,7 +257,7 @@ func (d *Daemon) ImageDownload(op *operation, server string, protocol string, ce
meta = make(map[string]interface{})
}
- progress := fmt.Sprintf("%d%% (%s/s)", progressInt, shared.GetByteSizeString(int64(speedInt)))
+ progress := fmt.Sprintf("%d%% (%s/s)", progressInt, shared.GetByteSizeString(speedInt))
if meta["download_progress"] != progress {
meta["download_progress"] = progress
diff --git a/shared/simplestreams.go b/shared/simplestreams.go
index 8fced30..771e038 100644
--- a/shared/simplestreams.go
+++ b/shared/simplestreams.go
@@ -494,7 +494,7 @@ func (s *SimpleStreams) getPaths(fingerprint string) ([][]string, error) {
return nil, fmt.Errorf("Couldn't find the requested image")
}
-func (s *SimpleStreams) downloadFile(path string, hash string, target string, progress func(int, int)) error {
+func (s *SimpleStreams) downloadFile(path string, hash string, target string, progress func(int64, int64)) error {
download := func(url string, hash string, target string) error {
out, err := os.Create(target)
if err != nil {
@@ -623,7 +623,7 @@ func (s *SimpleStreams) ExportImage(image string, target string) (string, error)
return target, nil
}
-func (s *SimpleStreams) Download(image string, file string, target string, progress func(int, int)) error {
+func (s *SimpleStreams) Download(image string, file string, target string, progress func(int64, int64)) error {
paths, err := s.getPaths(image)
if err != nil {
return err
diff --git a/shared/util.go b/shared/util.go
index 2b5b70e..6906775 100644
--- a/shared/util.go
+++ b/shared/util.go
@@ -744,47 +744,76 @@ type TransferProgress struct {
total int64
start *time.Time
+ last *time.Time
Length int64
- Handler func(int, int)
+ Handler func(int64, int64)
}
func (pt *TransferProgress) Read(p []byte) (int, error) {
+ // Do normal reader tasks
n, err := pt.Reader.Read(p)
+ pt.total += int64(n)
+ // Skip the rest if no handler attached
if pt.Handler == nil {
return n, err
}
+ // Initialize start time if needed
if pt.start == nil {
cur := time.Now()
pt.start = &cur
+ pt.last = pt.start
}
- if n > 0 {
- pt.total += int64(n)
- percentage := float64(pt.total) / float64(pt.Length) * float64(100)
+ // Skip if no data to count
+ if n <= 0 {
+ return n, err
+ }
- if percentage-pt.percentage > 0.9 {
- // Determine percentage
- pt.percentage = percentage
- progressInt := 1 - (int(percentage) % 1) + int(percentage)
- if progressInt > 100 {
- progressInt = 100
- }
+ // Update interval handling
+ var percentage float64
+ if pt.Length > 0 {
+ // If running in relative mode, check that we increased by at least 1%
+ percentage = float64(pt.total) / float64(pt.Length) * float64(100)
+ if percentage-pt.percentage < 0.9 {
+ return n, err
+ }
+ } else {
+ // If running in absolute mode, check that at least a second elapsed
+ interval := time.Since(*pt.last).Seconds()
+ if interval < 1 {
+ return n, err
+ }
+ }
- // Determine speed
- speedInt := int(0)
- duration := time.Since(*pt.start).Seconds()
- if duration > 0 {
- speed := float64(pt.total) / duration
- speedInt = int(speed)
- }
+ // Determine speed
+ speedInt := int64(0)
+ duration := time.Since(*pt.start).Seconds()
+ if duration > 0 {
+ speed := float64(pt.total) / duration
+ speedInt = int64(speed)
+ }
- pt.Handler(progressInt, speedInt)
+ // Determine progress
+ progressInt := int64(0)
+ if pt.Length > 0 {
+ pt.percentage = percentage
+ progressInt = int64(1 - (int(percentage) % 1) + int(percentage))
+ if progressInt > 100 {
+ progressInt = 100
}
+ } else {
+ progressInt = pt.total
+
+ // Update timestamp
+ cur := time.Now()
+ pt.last = &cur
}
+ pt.Handler(progressInt, speedInt)
+
return n, err
}
From f38d9263f4ca17149d98f676adea22a3d4b5bb25 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 16 Nov 2016 15:22:59 -0500
Subject: [PATCH 3/6] Convert TransferProgress to ReadCloser
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
client.go | 4 ++--
lxd/daemon_images.go | 2 +-
shared/simplestreams.go | 2 +-
shared/util.go | 4 ++--
4 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/client.go b/client.go
index 31a47ba..7ccaf74 100644
--- a/client.go
+++ b/client.go
@@ -1055,7 +1055,7 @@ func (c *Client) PostImage(imageFile string, rootfsFile string, properties []str
return "", err
}
- progress := &shared.TransferProgress{Reader: body, Length: size, Handler: progressHandler}
+ progress := &shared.TransferProgress{ReadCloser: body, Length: size, Handler: progressHandler}
req, err = http.NewRequest("POST", uri, progress)
req.Header.Set("Content-Type", w.FormDataContentType())
@@ -1071,7 +1071,7 @@ func (c *Client) PostImage(imageFile string, rootfsFile string, properties []str
return "", err
}
- progress := &shared.TransferProgress{Reader: fImage, Length: stat.Size(), Handler: progressHandler}
+ progress := &shared.TransferProgress{ReadCloser: fImage, Length: stat.Size(), Handler: progressHandler}
req, err = http.NewRequest("POST", uri, progress)
req.Header.Set("X-LXD-filename", filepath.Base(imageFile))
diff --git a/lxd/daemon_images.go b/lxd/daemon_images.go
index fe19413..7c364b6 100644
--- a/lxd/daemon_images.go
+++ b/lxd/daemon_images.go
@@ -359,7 +359,7 @@ func (d *Daemon) ImageDownload(op *operation, server string, protocol string, ce
ctype = "application/octet-stream"
}
- body := &shared.TransferProgress{Reader: raw.Body, Length: raw.ContentLength, Handler: progress}
+ body := &shared.TransferProgress{ReadCloser: raw.Body, Length: raw.ContentLength, Handler: progress}
if ctype == "multipart/form-data" {
// Parse the POST data
diff --git a/shared/simplestreams.go b/shared/simplestreams.go
index 771e038..c4e3f3c 100644
--- a/shared/simplestreams.go
+++ b/shared/simplestreams.go
@@ -518,7 +518,7 @@ func (s *SimpleStreams) downloadFile(path string, hash string, target string, pr
return fmt.Errorf("invalid simplestreams source: got %d looking for %s", resp.StatusCode, path)
}
- body := &TransferProgress{Reader: resp.Body, Length: resp.ContentLength, Handler: progress}
+ body := &TransferProgress{ReadCloser: resp.Body, Length: resp.ContentLength, Handler: progress}
sha256 := sha256.New()
_, err = io.Copy(io.MultiWriter(out, sha256), body)
diff --git a/shared/util.go b/shared/util.go
index 6906775..7ba774b 100644
--- a/shared/util.go
+++ b/shared/util.go
@@ -739,7 +739,7 @@ func RemoveDuplicatesFromString(s string, sep string) string {
}
type TransferProgress struct {
- io.Reader
+ io.ReadCloser
percentage float64
total int64
@@ -752,7 +752,7 @@ type TransferProgress struct {
func (pt *TransferProgress) Read(p []byte) (int, error) {
// Do normal reader tasks
- n, err := pt.Reader.Read(p)
+ n, err := pt.ReadCloser.Read(p)
pt.total += int64(n)
// Skip the rest if no handler attached
From 1c3473ba68f08360b2ca16a7408dc7d5a50d470b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 16 Nov 2016 21:03:11 -0500
Subject: [PATCH 4/6] Implement write tracking
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
client.go | 16 +++++++++++--
lxd/daemon_images.go | 8 ++++++-
shared/simplestreams.go | 8 ++++++-
shared/util.go | 62 +++++++++++++++++++++++++++++++++++--------------
4 files changed, 73 insertions(+), 21 deletions(-)
diff --git a/client.go b/client.go
index 7ccaf74..6388fdb 100644
--- a/client.go
+++ b/client.go
@@ -1055,7 +1055,13 @@ func (c *Client) PostImage(imageFile string, rootfsFile string, properties []str
return "", err
}
- progress := &shared.TransferProgress{ReadCloser: body, Length: size, Handler: progressHandler}
+ progress := &shared.ProgressReader{
+ ReadCloser: body,
+ Tracker: &shared.ProgressTracker{
+ Length: size,
+ Handler: progressHandler,
+ },
+ }
req, err = http.NewRequest("POST", uri, progress)
req.Header.Set("Content-Type", w.FormDataContentType())
@@ -1071,7 +1077,13 @@ func (c *Client) PostImage(imageFile string, rootfsFile string, properties []str
return "", err
}
- progress := &shared.TransferProgress{ReadCloser: fImage, Length: stat.Size(), Handler: progressHandler}
+ progress := &shared.ProgressReader{
+ ReadCloser: fImage,
+ Tracker: &shared.ProgressTracker{
+ Length: stat.Size(),
+ Handler: progressHandler,
+ },
+ }
req, err = http.NewRequest("POST", uri, progress)
req.Header.Set("X-LXD-filename", filepath.Base(imageFile))
diff --git a/lxd/daemon_images.go b/lxd/daemon_images.go
index 7c364b6..f841a6d 100644
--- a/lxd/daemon_images.go
+++ b/lxd/daemon_images.go
@@ -359,7 +359,13 @@ func (d *Daemon) ImageDownload(op *operation, server string, protocol string, ce
ctype = "application/octet-stream"
}
- body := &shared.TransferProgress{ReadCloser: raw.Body, Length: raw.ContentLength, Handler: progress}
+ body := &shared.ProgressReader{
+ ReadCloser: raw.Body,
+ Tracker: &shared.ProgressTracker{
+ Length: raw.ContentLength,
+ Handler: progress,
+ },
+ }
if ctype == "multipart/form-data" {
// Parse the POST data
diff --git a/shared/simplestreams.go b/shared/simplestreams.go
index c4e3f3c..e81cce2 100644
--- a/shared/simplestreams.go
+++ b/shared/simplestreams.go
@@ -518,7 +518,13 @@ func (s *SimpleStreams) downloadFile(path string, hash string, target string, pr
return fmt.Errorf("invalid simplestreams source: got %d looking for %s", resp.StatusCode, path)
}
- body := &TransferProgress{ReadCloser: resp.Body, Length: resp.ContentLength, Handler: progress}
+ body := &ProgressReader{
+ ReadCloser: resp.Body,
+ Tracker: &ProgressTracker{
+ Length: resp.ContentLength,
+ Handler: progress,
+ },
+ }
sha256 := sha256.New()
_, err = io.Copy(io.MultiWriter(out, sha256), body)
diff --git a/shared/util.go b/shared/util.go
index 7ba774b..8651c9d 100644
--- a/shared/util.go
+++ b/shared/util.go
@@ -738,26 +738,20 @@ func RemoveDuplicatesFromString(s string, sep string) string {
return s
}
-type TransferProgress struct {
- io.ReadCloser
- percentage float64
- total int64
-
- start *time.Time
- last *time.Time
-
+type ProgressTracker struct {
Length int64
Handler func(int64, int64)
-}
-func (pt *TransferProgress) Read(p []byte) (int, error) {
- // Do normal reader tasks
- n, err := pt.ReadCloser.Read(p)
- pt.total += int64(n)
+ percentage float64
+ total int64
+ start *time.Time
+ last *time.Time
+}
+func (pt *ProgressTracker) Update(n int) {
// Skip the rest if no handler attached
if pt.Handler == nil {
- return n, err
+ return
}
// Initialize start time if needed
@@ -769,7 +763,7 @@ func (pt *TransferProgress) Read(p []byte) (int, error) {
// Skip if no data to count
if n <= 0 {
- return n, err
+ return
}
// Update interval handling
@@ -778,13 +772,13 @@ func (pt *TransferProgress) Read(p []byte) (int, error) {
// If running in relative mode, check that we increased by at least 1%
percentage = float64(pt.total) / float64(pt.Length) * float64(100)
if percentage-pt.percentage < 0.9 {
- return n, err
+ return
}
} else {
// If running in absolute mode, check that at least a second elapsed
interval := time.Since(*pt.last).Seconds()
if interval < 1 {
- return n, err
+ return
}
}
@@ -813,6 +807,40 @@ func (pt *TransferProgress) Read(p []byte) (int, error) {
}
pt.Handler(progressInt, speedInt)
+}
+
+type ProgressReader struct {
+ io.ReadCloser
+ Tracker *ProgressTracker
+}
+
+func (pt *ProgressReader) Read(p []byte) (int, error) {
+ // Do normal reader tasks
+ n, err := pt.ReadCloser.Read(p)
+
+ // Do the actual progress tracking
+ if pt.Tracker != nil {
+ pt.Tracker.total += int64(n)
+ pt.Tracker.Update(n)
+ }
+
+ return n, err
+}
+
+type ProgressWriter struct {
+ io.WriteCloser
+ Tracker *ProgressTracker
+}
+
+func (pt *ProgressWriter) Write(p []byte) (int, error) {
+ // Do normal writer tasks
+ n, err := pt.WriteCloser.Write(p)
+
+ // Do the actual progress tracking
+ if pt.Tracker != nil {
+ pt.Tracker.total += int64(n)
+ pt.Tracker.Update(n)
+ }
return n, err
}
From d11348c452f402930ca640c19406c26afd211de1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 16 Nov 2016 23:35:52 -0500
Subject: [PATCH 5/6] Simplify rsync code
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/rsync.go | 86 ++++++++++++++++++++++++++++--------------------------------
1 file changed, 40 insertions(+), 46 deletions(-)
diff --git a/lxd/rsync.go b/lxd/rsync.go
index 6369d1c..49554b9 100644
--- a/lxd/rsync.go
+++ b/lxd/rsync.go
@@ -14,44 +14,6 @@ import (
"github.com/lxc/lxd/shared"
)
-func rsyncWebsocket(path string, cmd *exec.Cmd, conn *websocket.Conn) error {
- stdin, err := cmd.StdinPipe()
- if err != nil {
- return err
- }
-
- stdout, err := cmd.StdoutPipe()
- if err != nil {
- return err
- }
-
- stderr, err := cmd.StderrPipe()
- if err != nil {
- return err
- }
-
- if err := cmd.Start(); err != nil {
- return err
- }
-
- readDone, writeDone := shared.WebsocketMirror(conn, stdin, stdout)
- data, err2 := ioutil.ReadAll(stderr)
- if err2 != nil {
- shared.LogDebugf("error reading rsync stderr: %s", err2)
- return err2
- }
-
- err = cmd.Wait()
- if err != nil {
- shared.LogDebugf("rsync recv error for path %s: %s: %s", path, err, string(data))
- }
-
- <-readDone
- <-writeDone
-
- return err
-}
-
func rsyncSendSetup(path string) (*exec.Cmd, net.Conn, io.ReadCloser, error) {
/*
* It's sort of unfortunate, but there's no library call to get a
@@ -153,8 +115,11 @@ func RsyncSend(path string, conn *websocket.Conn) error {
return err
}
-func rsyncRecvCmd(path string) *exec.Cmd {
- return exec.Command("rsync",
+// RsyncRecv sets up the receiving half of the websocket to rsync (the other
+// half set up by RsyncSend), putting the contents in the directory specified
+// by path.
+func RsyncRecv(path string, conn *websocket.Conn) error {
+ cmd := exec.Command("rsync",
"--server",
"-vlogDtpre.iLsfx",
"--numeric-ids",
@@ -162,13 +127,42 @@ func rsyncRecvCmd(path string) *exec.Cmd {
"--partial",
".",
path)
-}
-// RsyncRecv sets up the receiving half of the websocket to rsync (the other
-// half set up by RsyncSend), putting the contents in the directory specified
-// by path.
-func RsyncRecv(path string, conn *websocket.Conn) error {
- return rsyncWebsocket(path, rsyncRecvCmd(path), conn)
+ stdin, err := cmd.StdinPipe()
+ if err != nil {
+ return err
+ }
+
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ return err
+ }
+
+ stderr, err := cmd.StderrPipe()
+ if err != nil {
+ return err
+ }
+
+ if err := cmd.Start(); err != nil {
+ return err
+ }
+
+ readDone, writeDone := shared.WebsocketMirror(conn, stdin, stdout)
+ data, err2 := ioutil.ReadAll(stderr)
+ if err2 != nil {
+ shared.LogDebugf("error reading rsync stderr: %s", err2)
+ return err2
+ }
+
+ err = cmd.Wait()
+ if err != nil {
+ shared.LogDebugf("rsync recv error for path %s: %s: %s", path, err, string(data))
+ }
+
+ <-readDone
+ <-writeDone
+
+ return err
}
// Netcat is called with:
From c3ebf6eb38a0458a40d06670b6e1ea7f06a6b171 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 16 Nov 2016 17:50:18 -0500
Subject: [PATCH 6/6] Implement migration progress
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Closes #1891
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
doc/api-extensions.md | 4 +++
lxd/api_1.0.go | 1 +
lxd/migrate.go | 8 ++---
lxd/rsync.go | 18 +++++++---
lxd/storage.go | 98 +++++++++++++++++++++++++++++++++++++++++++--------
lxd/storage_btrfs.go | 45 +++++++++++++++--------
lxd/storage_dir.go | 4 +--
lxd/storage_lvm.go | 4 +--
lxd/storage_test.go | 2 +-
lxd/storage_zfs.go | 43 +++++++++++++++-------
10 files changed, 172 insertions(+), 55 deletions(-)
diff --git a/doc/api-extensions.md b/doc/api-extensions.md
index fc592c4..d2cfd69 100644
--- a/doc/api-extensions.md
+++ b/doc/api-extensions.md
@@ -156,3 +156,7 @@ Enables adding GPUs to a container.
## container\_image\_properties
Introduces a new "image" config key space. Read-only, includes the properties of the parent image.
+
+## migration\_progress
+Transfer progress is now exported as part of the operation, on both sending and receiving ends.
+This shows up as a "fs\_progress" attribute in the operation metadata.
diff --git a/lxd/api_1.0.go b/lxd/api_1.0.go
index c4654e2..a8b001a 100644
--- a/lxd/api_1.0.go
+++ b/lxd/api_1.0.go
@@ -77,6 +77,7 @@ func api10Get(d *Daemon, r *http.Request) Response {
"container_exec_signal_handling",
"gpu_devices",
"container_image_properties",
+ "migration_progress",
},
"api_status": "stable",
diff --git a/lxd/migrate.go b/lxd/migrate.go
index 061f2a5..6b125cf 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -375,7 +375,7 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
return err
}
- if err := driver.SendWhileRunning(s.fsConn); err != nil {
+ if err := driver.SendWhileRunning(s.fsConn, migrateOp); err != nil {
return abort(err)
}
@@ -493,7 +493,7 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
* no reason to do these in parallel. In the future when we're using
* p.haul's protocol, it will make sense to do these in parallel.
*/
- if err := RsyncSend(shared.AddSlash(checkpointDir), s.criuConn); err != nil {
+ if err := RsyncSend(shared.AddSlash(checkpointDir), s.criuConn, nil); err != nil {
return abort(err)
}
@@ -794,7 +794,7 @@ func (c *migrationSink) Do(migrateOp *operation) error {
} else {
fsConn = c.src.fsConn
}
- if err := mySink(live, c.src.container, header.Snapshots, fsConn, srcIdmap); err != nil {
+ if err := mySink(live, c.src.container, header.Snapshots, fsConn, srcIdmap, migrateOp); err != nil {
fsTransfer <- err
return
}
@@ -823,7 +823,7 @@ func (c *migrationSink) Do(migrateOp *operation) error {
} else {
criuConn = c.src.criuConn
}
- if err := RsyncRecv(shared.AddSlash(imagesDir), criuConn); err != nil {
+ if err := RsyncRecv(shared.AddSlash(imagesDir), criuConn, nil); err != nil {
restore <- err
return
}
diff --git a/lxd/rsync.go b/lxd/rsync.go
index 49554b9..407fbcf 100644
--- a/lxd/rsync.go
+++ b/lxd/rsync.go
@@ -88,7 +88,7 @@ func rsyncSendSetup(path string) (*exec.Cmd, net.Conn, io.ReadCloser, error) {
// RsyncSend sets up the sending half of an rsync, to recursively send the
// directory pointed to by path over the websocket.
-func RsyncSend(path string, conn *websocket.Conn) error {
+func RsyncSend(path string, conn *websocket.Conn, readWrapper func(io.ReadCloser) io.ReadCloser) error {
cmd, dataSocket, stderr, err := rsyncSendSetup(path)
if dataSocket != nil {
defer dataSocket.Close()
@@ -97,7 +97,12 @@ func RsyncSend(path string, conn *websocket.Conn) error {
return err
}
- readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, dataSocket)
+ readPipe := io.ReadCloser(dataSocket)
+ if readWrapper != nil {
+ readPipe = readWrapper(dataSocket)
+ }
+
+ readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, readPipe)
output, err := ioutil.ReadAll(stderr)
if err != nil {
@@ -118,7 +123,7 @@ func RsyncSend(path string, conn *websocket.Conn) error {
// RsyncRecv sets up the receiving half of the websocket to rsync (the other
// half set up by RsyncSend), putting the contents in the directory specified
// by path.
-func RsyncRecv(path string, conn *websocket.Conn) error {
+func RsyncRecv(path string, conn *websocket.Conn, writeWrapper func(io.WriteCloser) io.WriteCloser) error {
cmd := exec.Command("rsync",
"--server",
"-vlogDtpre.iLsfx",
@@ -147,7 +152,12 @@ func RsyncRecv(path string, conn *websocket.Conn) error {
return err
}
- readDone, writeDone := shared.WebsocketMirror(conn, stdin, stdout)
+ writePipe := io.WriteCloser(stdin)
+ if writeWrapper != nil {
+ writePipe = writeWrapper(stdin)
+ }
+
+ readDone, writeDone := shared.WebsocketMirror(conn, writePipe, stdout)
data, err2 := ioutil.ReadAll(stderr)
if err2 != nil {
shared.LogDebugf("error reading rsync stderr: %s", err2)
diff --git a/lxd/storage.go b/lxd/storage.go
index 5bb94dc..a41ca1b 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -3,6 +3,7 @@ package main
import (
"encoding/json"
"fmt"
+ "io"
"os"
"os/exec"
"path/filepath"
@@ -117,7 +118,7 @@ type MigrationStorageSourceDriver interface {
/* send any bits of the container/snapshots that are possible while the
* container is still running.
*/
- SendWhileRunning(conn *websocket.Conn) error
+ SendWhileRunning(conn *websocket.Conn, op *operation) error
/* send the final bits (e.g. a final delta snapshot for zfs, btrfs, or
* do a final rsync) of the fs after the container has been
@@ -192,7 +193,7 @@ type storage interface {
// already present on the target instance as an exercise for the
// enterprising developer.
MigrationSource(container container) (MigrationStorageSourceDriver, error)
- MigrationSink(live bool, container container, objects []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error
+ MigrationSink(live bool, container container, objects []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error
}
func newStorage(d *Daemon, sType storageType) (storage, error) {
@@ -556,7 +557,7 @@ func (lw *storageLogWrapper) MigrationSource(container container) (MigrationStor
return lw.w.MigrationSource(container)
}
-func (lw *storageLogWrapper) MigrationSink(live bool, container container, objects []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
+func (lw *storageLogWrapper) MigrationSink(live bool, container container, objects []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
objNames := []string{}
for _, obj := range objects {
objNames = append(objNames, obj.GetName())
@@ -567,9 +568,10 @@ func (lw *storageLogWrapper) MigrationSink(live bool, container container, objec
"container": container.Name(),
"objects": objNames,
"srcIdmap": *srcIdmap,
+ "op": op,
})
- return lw.w.MigrationSink(live, container, objects, conn, srcIdmap)
+ return lw.w.MigrationSink(live, container, objects, conn, srcIdmap, op)
}
func ShiftIfNecessary(container container, srcIdmap *shared.IdmapSet) error {
@@ -608,7 +610,7 @@ func (s rsyncStorageSourceDriver) Snapshots() []container {
return s.snapshots
}
-func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
+func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operation) error {
for _, send := range s.snapshots {
if err := send.StorageStart(); err != nil {
return err
@@ -616,17 +618,19 @@ func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
defer send.StorageStop()
path := send.Path()
- if err := RsyncSend(shared.AddSlash(path), conn); err != nil {
+ wrapper := StorageProgressReader(op, "fs_progress", send.Name())
+ if err := RsyncSend(shared.AddSlash(path), conn, wrapper); err != nil {
return err
}
}
- return RsyncSend(shared.AddSlash(s.container.Path()), conn)
+ wrapper := StorageProgressReader(op, "fs_progress", s.container.Name())
+ return RsyncSend(shared.AddSlash(s.container.Path()), conn, wrapper)
}
func (s rsyncStorageSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error {
/* resync anything that changed between our first send and the checkpoint */
- return RsyncSend(shared.AddSlash(s.container.Path()), conn)
+ return RsyncSend(shared.AddSlash(s.container.Path()), conn, nil)
}
func (s rsyncStorageSourceDriver) Cleanup() {
@@ -672,7 +676,7 @@ func snapshotProtobufToContainerArgs(containerName string, snap *Snapshot) conta
}
}
-func rsyncMigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
+func rsyncMigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
isDirBackend := container.Storage().GetStorageType() == storageTypeDir
if isDirBackend {
@@ -689,7 +693,8 @@ func rsyncMigrationSink(live bool, container container, snapshots []*Snapshot, c
return err
}
- if err := RsyncRecv(shared.AddSlash(s.Path()), conn); err != nil {
+ wrapper := StorageProgressWriter(op, "fs_progress", s.Name())
+ if err := RsyncRecv(shared.AddSlash(s.Path()), conn, wrapper); err != nil {
return err
}
@@ -698,7 +703,8 @@ func rsyncMigrationSink(live bool, container container, snapshots []*Snapshot, c
}
}
- if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
+ wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+ if err := RsyncRecv(shared.AddSlash(container.Path()), conn, wrapper); err != nil {
return err
}
} else {
@@ -708,7 +714,9 @@ func rsyncMigrationSink(live bool, container container, snapshots []*Snapshot, c
defer container.StorageStop()
for _, snap := range snapshots {
- if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
+ args := snapshotProtobufToContainerArgs(container.Name(), snap)
+ wrapper := StorageProgressWriter(op, "fs_progress", snap.GetName())
+ if err := RsyncRecv(shared.AddSlash(container.Path()), conn, wrapper); err != nil {
return err
}
@@ -716,21 +724,22 @@ func rsyncMigrationSink(live bool, container container, snapshots []*Snapshot, c
return err
}
- args := snapshotProtobufToContainerArgs(container.Name(), snap)
_, err := containerCreateAsSnapshot(container.Daemon(), args, container)
if err != nil {
return err
}
}
- if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
+ wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+ if err := RsyncRecv(shared.AddSlash(container.Path()), conn, wrapper); err != nil {
return err
}
}
if live {
/* now receive the final sync */
- if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
+ wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+ if err := RsyncRecv(shared.AddSlash(container.Path()), conn, wrapper); err != nil {
return err
}
}
@@ -796,3 +805,62 @@ func tryUnmount(path string, flags int) error {
return nil
}
+
+func progressWrapperRender(op *operation, key string, description string, progressInt int64, speedInt int64) {
+ meta := op.metadata
+ if meta == nil {
+ meta = make(map[string]interface{})
+ }
+
+ progress := fmt.Sprintf("%s (%s/s)", shared.GetByteSizeString(progressInt), shared.GetByteSizeString(speedInt))
+ if description != "" {
+ progress = fmt.Sprintf("%s: %s (%s/s)", description, shared.GetByteSizeString(progressInt), shared.GetByteSizeString(speedInt))
+ }
+
+ if meta[key] != progress {
+ meta[key] = progress
+ op.UpdateMetadata(meta)
+ }
+}
+
+func StorageProgressReader(op *operation, key string, description string) func(io.ReadCloser) io.ReadCloser {
+ return func(reader io.ReadCloser) io.ReadCloser {
+ if op == nil {
+ return reader
+ }
+
+ progress := func(progressInt int64, speedInt int64) {
+ progressWrapperRender(op, key, description, progressInt, speedInt)
+ }
+
+ readPipe := &shared.ProgressReader{
+ ReadCloser: reader,
+ Tracker: &shared.ProgressTracker{
+ Handler: progress,
+ },
+ }
+
+ return readPipe
+ }
+}
+
+func StorageProgressWriter(op *operation, key string, description string) func(io.WriteCloser) io.WriteCloser {
+ return func(writer io.WriteCloser) io.WriteCloser {
+ if op == nil {
+ return writer
+ }
+
+ progress := func(progressInt int64, speedInt int64) {
+ progressWrapperRender(op, key, description, progressInt, speedInt)
+ }
+
+ writePipe := &shared.ProgressWriter{
+ WriteCloser: writer,
+ Tracker: &shared.ProgressTracker{
+ Handler: progress,
+ },
+ }
+
+ return writePipe
+ }
+}
diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go
index 639c85f..4a8d63f 100644
--- a/lxd/storage_btrfs.go
+++ b/lxd/storage_btrfs.go
@@ -2,6 +2,7 @@ package main
import (
"fmt"
+ "io"
"io/ioutil"
"os"
"os/exec"
@@ -828,7 +829,7 @@ func (s *btrfsMigrationSourceDriver) Snapshots() []container {
return s.snapshots
}
-func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string, btrfsParent string) error {
+func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string, btrfsParent string, readWrapper func(io.ReadCloser) io.ReadCloser) error {
args := []string{"send", btrfsPath}
if btrfsParent != "" {
args = append(args, "-p", btrfsParent)
@@ -841,6 +842,11 @@ func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string
return err
}
+ readPipe := io.ReadCloser(stdout)
+ if readWrapper != nil {
+ readPipe = readWrapper(stdout)
+ }
+
stderr, err := cmd.StderrPipe()
if err != nil {
return err
@@ -850,7 +856,7 @@ func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string
return err
}
- <-shared.WebsocketSendStream(conn, stdout, 4*1024*1024)
+ <-shared.WebsocketSendStream(conn, readPipe, 4*1024*1024)
output, err := ioutil.ReadAll(stderr)
if err != nil {
@@ -864,7 +870,7 @@ func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string
return err
}
-func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
+func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operation) error {
if s.container.IsSnapshot() {
tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", s.container.Name(), uuid.NewRandom().String()), true)
err := os.MkdirAll(tmpPath, 0700)
@@ -879,7 +885,8 @@ func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) erro
defer s.btrfs.subvolDelete(btrfsPath)
- return s.send(conn, btrfsPath, "")
+ wrapper := StorageProgressReader(op, "fs_progress", s.container.Name())
+ return s.send(conn, btrfsPath, "", wrapper)
}
for i, snap := range s.snapshots {
@@ -888,7 +895,8 @@ func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) erro
prev = s.snapshots[i-1].Path()
}
- if err := s.send(conn, snap.Path(), prev); err != nil {
+ wrapper := StorageProgressReader(op, "fs_progress", snap.Name())
+ if err := s.send(conn, snap.Path(), prev, wrapper); err != nil {
return err
}
}
@@ -912,7 +920,8 @@ func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) erro
btrfsParent = s.btrfsSnapshotNames[len(s.btrfsSnapshotNames)-1]
}
- return s.send(conn, s.runningSnapName, btrfsParent)
+ wrapper := StorageProgressReader(op, "fs_progress", s.container.Name())
+ return s.send(conn, s.runningSnapName, btrfsParent, wrapper)
}
func (s *btrfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error {
@@ -927,7 +936,7 @@ func (s *btrfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) e
return err
}
- return s.send(conn, s.stoppedSnapName, s.runningSnapName)
+ return s.send(conn, s.stoppedSnapName, s.runningSnapName, nil)
}
func (s *btrfsMigrationSourceDriver) Cleanup() {
@@ -985,9 +994,9 @@ func (s *storageBtrfs) MigrationSource(c container) (MigrationStorageSourceDrive
return driver, nil
}
-func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
+func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
if runningInUserns {
- return rsyncMigrationSink(live, container, snapshots, conn, srcIdmap)
+ return rsyncMigrationSink(live, container, snapshots, conn, srcIdmap, op)
}
cName := container.Name()
@@ -1000,7 +1009,7 @@ func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots [
}
}
- btrfsRecv := func(btrfsPath string, targetPath string, isSnapshot bool) error {
+ btrfsRecv := func(btrfsPath string, targetPath string, isSnapshot bool, writeWrapper func(io.WriteCloser) io.WriteCloser) error {
args := []string{"receive", "-e", btrfsPath}
cmd := exec.Command("btrfs", args...)
@@ -1024,7 +1033,12 @@ func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots [
return err
}
- <-shared.WebsocketRecvStream(stdin, conn)
+ writePipe := io.WriteCloser(stdin)
+ if writeWrapper != nil {
+ writePipe = writeWrapper(stdin)
+ }
+
+ <-shared.WebsocketRecvStream(writePipe, conn)
output, err := ioutil.ReadAll(stderr)
if err != nil {
@@ -1063,18 +1077,21 @@ func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots [
return err
}
- if err := btrfsRecv(containerPath(cName, true), s.Path(), true); err != nil {
+ wrapper := StorageProgressWriter(op, "fs_progress", snap.GetName())
+ if err := btrfsRecv(containerPath(cName, true), s.Path(), true, wrapper); err != nil {
return err
}
}
/* finally, do the real container */
- if err := btrfsRecv(containerPath(cName, true), container.Path(), false); err != nil {
+ wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+ if err := btrfsRecv(containerPath(cName, true), container.Path(), false, wrapper); err != nil {
return err
}
if live {
- if err := btrfsRecv(containerPath(cName, true), container.Path(), false); err != nil {
+ wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+ if err := btrfsRecv(containerPath(cName, true), container.Path(), false, wrapper); err != nil {
return err
}
}
diff --git a/lxd/storage_dir.go b/lxd/storage_dir.go
index 3c3a2b0..1fdd85f 100644
--- a/lxd/storage_dir.go
+++ b/lxd/storage_dir.go
@@ -282,6 +282,6 @@ func (s *storageDir) MigrationSource(container container) (MigrationStorageSourc
return rsyncMigrationSource(container)
}
-func (s *storageDir) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
- return rsyncMigrationSink(live, container, snapshots, conn, srcIdmap)
+func (s *storageDir) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
+ return rsyncMigrationSink(live, container, snapshots, conn, srcIdmap, op)
}
diff --git a/lxd/storage_lvm.go b/lxd/storage_lvm.go
index 7b1e241..3e9404d 100644
--- a/lxd/storage_lvm.go
+++ b/lxd/storage_lvm.go
@@ -976,6 +976,6 @@ func (s *storageLvm) MigrationSource(container container) (MigrationStorageSourc
return rsyncMigrationSource(container)
}
-func (s *storageLvm) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
- return rsyncMigrationSink(live, container, snapshots, conn, srcIdmap)
+func (s *storageLvm) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
+ return rsyncMigrationSink(live, container, snapshots, conn, srcIdmap, op)
}
diff --git a/lxd/storage_test.go b/lxd/storage_test.go
index c31db24..bd723d4 100644
--- a/lxd/storage_test.go
+++ b/lxd/storage_test.go
@@ -140,6 +140,6 @@ func (s *storageMock) PreservesInodes() bool {
func (s *storageMock) MigrationSource(container container) (MigrationStorageSourceDriver, error) {
return nil, fmt.Errorf("not implemented")
}
-func (s *storageMock) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
+func (s *storageMock) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
return nil
}
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index 8d59b57..85d57b4 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -2,6 +2,7 @@ package main
import (
"fmt"
+ "io"
"io/ioutil"
"os"
"os/exec"
@@ -1246,7 +1247,7 @@ func (s *zfsMigrationSourceDriver) Snapshots() []container {
return s.snapshots
}
-func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zfsParent string) error {
+func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zfsParent string, readWrapper func(io.ReadCloser) io.ReadCloser) error {
fields := strings.SplitN(s.container.Name(), shared.SnapshotDelimiter, 2)
args := []string{"send", fmt.Sprintf("%s/containers/%s@%s", s.zfs.zfsPool, fields[0], zfsName)}
if zfsParent != "" {
@@ -1260,6 +1261,11 @@ func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zf
return err
}
+ readPipe := io.ReadCloser(stdout)
+ if readWrapper != nil {
+ readPipe = readWrapper(stdout)
+ }
+
stderr, err := cmd.StderrPipe()
if err != nil {
return err
@@ -1269,7 +1275,7 @@ func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zf
return err
}
- <-shared.WebsocketSendStream(conn, stdout, 4*1024*1024)
+ <-shared.WebsocketSendStream(conn, readPipe, 4*1024*1024)
output, err := ioutil.ReadAll(stderr)
if err != nil {
@@ -1284,11 +1290,12 @@ func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zf
return err
}
-func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
+func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operation) error {
if s.container.IsSnapshot() {
fields := strings.SplitN(s.container.Name(), shared.SnapshotDelimiter, 2)
snapshotName := fmt.Sprintf("snapshot-%s", fields[1])
- return s.send(conn, snapshotName, "")
+ wrapper := StorageProgressReader(op, "fs_progress", s.container.Name())
+ return s.send(conn, snapshotName, "", wrapper)
}
lastSnap := ""
@@ -1301,7 +1308,8 @@ func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error
lastSnap = snap
- if err := s.send(conn, snap, prev); err != nil {
+ wrapper := StorageProgressReader(op, "fs_progress", snap)
+ if err := s.send(conn, snap, prev, wrapper); err != nil {
return err
}
}
@@ -1311,7 +1319,8 @@ func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error
return err
}
- if err := s.send(conn, s.runningSnapName, lastSnap); err != nil {
+ wrapper := StorageProgressReader(op, "fs_progress", s.container.Name())
+ if err := s.send(conn, s.runningSnapName, lastSnap, wrapper); err != nil {
return err
}
@@ -1324,7 +1333,7 @@ func (s *zfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) err
return err
}
- if err := s.send(conn, s.stoppedSnapName, s.runningSnapName); err != nil {
+ if err := s.send(conn, s.stoppedSnapName, s.runningSnapName, nil); err != nil {
return err
}
@@ -1396,8 +1405,8 @@ func (s *storageZfs) MigrationSource(ct container) (MigrationStorageSourceDriver
return &driver, nil
}
-func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
- zfsRecv := func(zfsName string) error {
+func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
+ zfsRecv := func(zfsName string, writeWrapper func(io.WriteCloser) io.WriteCloser) error {
zfsFsName := fmt.Sprintf("%s/%s", s.zfsPool, zfsName)
args := []string{"receive", "-F", "-u", zfsFsName}
cmd := exec.Command("zfs", args...)
@@ -1416,7 +1425,12 @@ func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*
return err
}
- <-shared.WebsocketRecvStream(stdin, conn)
+ writePipe := io.WriteCloser(stdin)
+ if writeWrapper != nil {
+ writePipe = writeWrapper(stdin)
+ }
+
+ <-shared.WebsocketRecvStream(writePipe, conn)
output, err := ioutil.ReadAll(stderr)
if err != nil {
@@ -1449,8 +1463,9 @@ func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*
return err
}
+ wrapper := StorageProgressWriter(op, "fs_progress", snap.GetName())
name := fmt.Sprintf("containers/%s at snapshot-%s", container.Name(), snap.GetName())
- if err := zfsRecv(name); err != nil {
+ if err := zfsRecv(name, wrapper); err != nil {
return err
}
@@ -1484,13 +1499,15 @@ func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*
}()
/* finally, do the real container */
- if err := zfsRecv(zfsName); err != nil {
+ wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+ if err := zfsRecv(zfsName, wrapper); err != nil {
return err
}
if live {
/* and again for the post-running snapshot if this was a live migration */
- if err := zfsRecv(zfsName); err != nil {
+ wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+ if err := zfsRecv(zfsName, wrapper); err != nil {
return err
}
}
More information about the lxc-devel
mailing list