[lxc-devel] [lxd/master] Migration progress tracking

stgraber on Github lxc-bot at linuxcontainers.org
Thu Nov 17 05:30:15 UTC 2016


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20161117/074b750d/attachment.bin>
-------------- next part --------------
From 9791a44ae9b254af5c012c52ab4495ad8c79dac4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 16 Nov 2016 15:21:34 -0500
Subject: [PATCH 1/6] On progress updates, keep cursor at end of line
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxc/main.go | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/lxc/main.go b/lxc/main.go
index b50904e..3eacd63 100644
--- a/lxc/main.go
+++ b/lxc/main.go
@@ -323,9 +323,10 @@ func (p *ProgressRenderer) Done(msg string) {
 	if len(msg) > p.maxLength {
 		p.maxLength = len(msg)
 	} else {
-		fmt.Printf("%s\r", strings.Repeat(" ", p.maxLength))
+		fmt.Printf("\r%s", strings.Repeat(" ", p.maxLength))
 	}
 
+	fmt.Print("\r")
 	fmt.Print(msg)
 }
 
@@ -335,13 +336,12 @@ func (p *ProgressRenderer) Update(status string) {
 		msg = p.Format
 	}
 
-	msg = fmt.Sprintf(msg, status)
-	msg += "\r"
+	msg = fmt.Sprintf("\r"+msg, status)
 
 	if len(msg) > p.maxLength {
 		p.maxLength = len(msg)
 	} else {
-		fmt.Printf("%s\r", strings.Repeat(" ", p.maxLength))
+		fmt.Printf("\r%s", strings.Repeat(" ", p.maxLength))
 	}
 
 	fmt.Print(msg)

From 2b057a2f192e33a78678c2775b2d1a87f9195ef9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 16 Nov 2016 15:13:16 -0500
Subject: [PATCH 2/6] Support absolute file transfer tracking
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

For when we don't know the total length.

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 client.go               |  2 +-
 lxc/image.go            |  4 +--
 lxd/daemon_images.go    |  4 +--
 shared/simplestreams.go |  4 +--
 shared/util.go          | 67 +++++++++++++++++++++++++++++++++++--------------
 5 files changed, 55 insertions(+), 26 deletions(-)

diff --git a/client.go b/client.go
index a32709b..31a47ba 100644
--- a/client.go
+++ b/client.go
@@ -988,7 +988,7 @@ func (c *Client) PostImageURL(imageFile string, properties []string, public bool
 	return fingerprint, nil
 }
 
-func (c *Client) PostImage(imageFile string, rootfsFile string, properties []string, public bool, aliases []string, progressHandler func(int, int)) (string, error) {
+func (c *Client) PostImage(imageFile string, rootfsFile string, properties []string, public bool, aliases []string, progressHandler func(int64, int64)) (string, error) {
 	if c.Remote.Public {
 		return "", fmt.Errorf("This function isn't supported by public remotes.")
 	}
diff --git a/lxc/image.go b/lxc/image.go
index 2241d7f..96db696 100644
--- a/lxc/image.go
+++ b/lxc/image.go
@@ -425,8 +425,8 @@ func (c *imageCmd) run(config *lxd.Config, args []string) error {
 			return fmt.Errorf(i18n.G("Only https:// is supported for remote image import."))
 		} else {
 			progress := ProgressRenderer{Format: i18n.G("Transferring image: %s")}
-			handler := func(percent int, speed int) {
-				progress.Update(fmt.Sprintf("%d%% (%s/s)", percent, shared.GetByteSizeString(int64(speed))))
+			handler := func(percent int64, speed int64) {
+				progress.Update(fmt.Sprintf("%d%% (%s/s)", percent, shared.GetByteSizeString(speed)))
 			}
 
 			fingerprint, err = d.PostImage(imageFile, rootfsFile, properties, c.publicImage, c.addAliases, handler)
diff --git a/lxd/daemon_images.go b/lxd/daemon_images.go
index 5007d13..fe19413 100644
--- a/lxd/daemon_images.go
+++ b/lxd/daemon_images.go
@@ -247,7 +247,7 @@ func (d *Daemon) ImageDownload(op *operation, server string, protocol string, ce
 		d.Storage.ImageDelete(fp)
 	}
 
-	progress := func(progressInt int, speedInt int) {
+	progress := func(progressInt int64, speedInt int64) {
 		if op == nil {
 			return
 		}
@@ -257,7 +257,7 @@ func (d *Daemon) ImageDownload(op *operation, server string, protocol string, ce
 			meta = make(map[string]interface{})
 		}
 
-		progress := fmt.Sprintf("%d%% (%s/s)", progressInt, shared.GetByteSizeString(int64(speedInt)))
+		progress := fmt.Sprintf("%d%% (%s/s)", progressInt, shared.GetByteSizeString(speedInt))
 
 		if meta["download_progress"] != progress {
 			meta["download_progress"] = progress
diff --git a/shared/simplestreams.go b/shared/simplestreams.go
index 8fced30..771e038 100644
--- a/shared/simplestreams.go
+++ b/shared/simplestreams.go
@@ -494,7 +494,7 @@ func (s *SimpleStreams) getPaths(fingerprint string) ([][]string, error) {
 	return nil, fmt.Errorf("Couldn't find the requested image")
 }
 
-func (s *SimpleStreams) downloadFile(path string, hash string, target string, progress func(int, int)) error {
+func (s *SimpleStreams) downloadFile(path string, hash string, target string, progress func(int64, int64)) error {
 	download := func(url string, hash string, target string) error {
 		out, err := os.Create(target)
 		if err != nil {
@@ -623,7 +623,7 @@ func (s *SimpleStreams) ExportImage(image string, target string) (string, error)
 	return target, nil
 }
 
-func (s *SimpleStreams) Download(image string, file string, target string, progress func(int, int)) error {
+func (s *SimpleStreams) Download(image string, file string, target string, progress func(int64, int64)) error {
 	paths, err := s.getPaths(image)
 	if err != nil {
 		return err
diff --git a/shared/util.go b/shared/util.go
index 2b5b70e..6906775 100644
--- a/shared/util.go
+++ b/shared/util.go
@@ -744,47 +744,76 @@ type TransferProgress struct {
 	total      int64
 
 	start *time.Time
+	last  *time.Time
 
 	Length  int64
-	Handler func(int, int)
+	Handler func(int64, int64)
 }
 
 func (pt *TransferProgress) Read(p []byte) (int, error) {
+	// Do normal reader tasks
 	n, err := pt.Reader.Read(p)
+	pt.total += int64(n)
 
+	// Skip the rest if no handler attached
 	if pt.Handler == nil {
 		return n, err
 	}
 
+	// Initialize start time if needed
 	if pt.start == nil {
 		cur := time.Now()
 		pt.start = &cur
+		pt.last = pt.start
 	}
 
-	if n > 0 {
-		pt.total += int64(n)
-		percentage := float64(pt.total) / float64(pt.Length) * float64(100)
+	// Skip if no data to count
+	if n <= 0 {
+		return n, err
+	}
 
-		if percentage-pt.percentage > 0.9 {
-			// Determine percentage
-			pt.percentage = percentage
-			progressInt := 1 - (int(percentage) % 1) + int(percentage)
-			if progressInt > 100 {
-				progressInt = 100
-			}
+	// Update interval handling
+	var percentage float64
+	if pt.Length > 0 {
+		// If running in relative mode, check that we increased by at least 1%
+		percentage = float64(pt.total) / float64(pt.Length) * float64(100)
+		if percentage-pt.percentage < 0.9 {
+			return n, err
+		}
+	} else {
+		// If running in absolute mode, check that at least a second elapsed
+		interval := time.Since(*pt.last).Seconds()
+		if interval < 1 {
+			return n, err
+		}
+	}
 
-			// Determine speed
-			speedInt := int(0)
-			duration := time.Since(*pt.start).Seconds()
-			if duration > 0 {
-				speed := float64(pt.total) / duration
-				speedInt = int(speed)
-			}
+	// Determine speed
+	speedInt := int64(0)
+	duration := time.Since(*pt.start).Seconds()
+	if duration > 0 {
+		speed := float64(pt.total) / duration
+		speedInt = int64(speed)
+	}
 
-			pt.Handler(progressInt, speedInt)
+	// Determine progress
+	progressInt := int64(0)
+	if pt.Length > 0 {
+		pt.percentage = percentage
+		progressInt = int64(1 - (int(percentage) % 1) + int(percentage))
+		if progressInt > 100 {
+			progressInt = 100
 		}
+	} else {
+		progressInt = pt.total
+
+		// Update timestamp
+		cur := time.Now()
+		pt.last = &cur
 	}
 
+	pt.Handler(progressInt, speedInt)
+
 	return n, err
 }
 

From f38d9263f4ca17149d98f676adea22a3d4b5bb25 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 16 Nov 2016 15:22:59 -0500
Subject: [PATCH 3/6] Convert TransferProgress to ReadCloser
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 client.go               | 4 ++--
 lxd/daemon_images.go    | 2 +-
 shared/simplestreams.go | 2 +-
 shared/util.go          | 4 ++--
 4 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/client.go b/client.go
index 31a47ba..7ccaf74 100644
--- a/client.go
+++ b/client.go
@@ -1055,7 +1055,7 @@ func (c *Client) PostImage(imageFile string, rootfsFile string, properties []str
 			return "", err
 		}
 
-		progress := &shared.TransferProgress{Reader: body, Length: size, Handler: progressHandler}
+		progress := &shared.TransferProgress{ReadCloser: body, Length: size, Handler: progressHandler}
 
 		req, err = http.NewRequest("POST", uri, progress)
 		req.Header.Set("Content-Type", w.FormDataContentType())
@@ -1071,7 +1071,7 @@ func (c *Client) PostImage(imageFile string, rootfsFile string, properties []str
 			return "", err
 		}
 
-		progress := &shared.TransferProgress{Reader: fImage, Length: stat.Size(), Handler: progressHandler}
+		progress := &shared.TransferProgress{ReadCloser: fImage, Length: stat.Size(), Handler: progressHandler}
 
 		req, err = http.NewRequest("POST", uri, progress)
 		req.Header.Set("X-LXD-filename", filepath.Base(imageFile))
diff --git a/lxd/daemon_images.go b/lxd/daemon_images.go
index fe19413..7c364b6 100644
--- a/lxd/daemon_images.go
+++ b/lxd/daemon_images.go
@@ -359,7 +359,7 @@ func (d *Daemon) ImageDownload(op *operation, server string, protocol string, ce
 		ctype = "application/octet-stream"
 	}
 
-	body := &shared.TransferProgress{Reader: raw.Body, Length: raw.ContentLength, Handler: progress}
+	body := &shared.TransferProgress{ReadCloser: raw.Body, Length: raw.ContentLength, Handler: progress}
 
 	if ctype == "multipart/form-data" {
 		// Parse the POST data
diff --git a/shared/simplestreams.go b/shared/simplestreams.go
index 771e038..c4e3f3c 100644
--- a/shared/simplestreams.go
+++ b/shared/simplestreams.go
@@ -518,7 +518,7 @@ func (s *SimpleStreams) downloadFile(path string, hash string, target string, pr
 			return fmt.Errorf("invalid simplestreams source: got %d looking for %s", resp.StatusCode, path)
 		}
 
-		body := &TransferProgress{Reader: resp.Body, Length: resp.ContentLength, Handler: progress}
+		body := &TransferProgress{ReadCloser: resp.Body, Length: resp.ContentLength, Handler: progress}
 
 		sha256 := sha256.New()
 		_, err = io.Copy(io.MultiWriter(out, sha256), body)
diff --git a/shared/util.go b/shared/util.go
index 6906775..7ba774b 100644
--- a/shared/util.go
+++ b/shared/util.go
@@ -739,7 +739,7 @@ func RemoveDuplicatesFromString(s string, sep string) string {
 }
 
 type TransferProgress struct {
-	io.Reader
+	io.ReadCloser
 	percentage float64
 	total      int64
 
@@ -752,7 +752,7 @@ type TransferProgress struct {
 
 func (pt *TransferProgress) Read(p []byte) (int, error) {
 	// Do normal reader tasks
-	n, err := pt.Reader.Read(p)
+	n, err := pt.ReadCloser.Read(p)
 	pt.total += int64(n)
 
 	// Skip the rest if no handler attached

From 1c3473ba68f08360b2ca16a7408dc7d5a50d470b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 16 Nov 2016 21:03:11 -0500
Subject: [PATCH 4/6] Implement write tracking
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 client.go               | 16 +++++++++++--
 lxd/daemon_images.go    |  8 ++++++-
 shared/simplestreams.go |  8 ++++++-
 shared/util.go          | 62 +++++++++++++++++++++++++++++++++++--------------
 4 files changed, 73 insertions(+), 21 deletions(-)

diff --git a/client.go b/client.go
index 7ccaf74..6388fdb 100644
--- a/client.go
+++ b/client.go
@@ -1055,7 +1055,13 @@ func (c *Client) PostImage(imageFile string, rootfsFile string, properties []str
 			return "", err
 		}
 
-		progress := &shared.TransferProgress{ReadCloser: body, Length: size, Handler: progressHandler}
+		progress := &shared.ProgressReader{
+			ReadCloser: body,
+			Tracker: &shared.ProgressTracker{
+				Length:  size,
+				Handler: progressHandler,
+			},
+		}
 
 		req, err = http.NewRequest("POST", uri, progress)
 		req.Header.Set("Content-Type", w.FormDataContentType())
@@ -1071,7 +1077,13 @@ func (c *Client) PostImage(imageFile string, rootfsFile string, properties []str
 			return "", err
 		}
 
-		progress := &shared.TransferProgress{ReadCloser: fImage, Length: stat.Size(), Handler: progressHandler}
+		progress := &shared.ProgressReader{
+			ReadCloser: fImage,
+			Tracker: &shared.ProgressTracker{
+				Length:  stat.Size(),
+				Handler: progressHandler,
+			},
+		}
 
 		req, err = http.NewRequest("POST", uri, progress)
 		req.Header.Set("X-LXD-filename", filepath.Base(imageFile))
diff --git a/lxd/daemon_images.go b/lxd/daemon_images.go
index 7c364b6..f841a6d 100644
--- a/lxd/daemon_images.go
+++ b/lxd/daemon_images.go
@@ -359,7 +359,13 @@ func (d *Daemon) ImageDownload(op *operation, server string, protocol string, ce
 		ctype = "application/octet-stream"
 	}
 
-	body := &shared.TransferProgress{ReadCloser: raw.Body, Length: raw.ContentLength, Handler: progress}
+	body := &shared.ProgressReader{
+		ReadCloser: raw.Body,
+		Tracker: &shared.ProgressTracker{
+			Length:  raw.ContentLength,
+			Handler: progress,
+		},
+	}
 
 	if ctype == "multipart/form-data" {
 		// Parse the POST data
diff --git a/shared/simplestreams.go b/shared/simplestreams.go
index c4e3f3c..e81cce2 100644
--- a/shared/simplestreams.go
+++ b/shared/simplestreams.go
@@ -518,7 +518,13 @@ func (s *SimpleStreams) downloadFile(path string, hash string, target string, pr
 			return fmt.Errorf("invalid simplestreams source: got %d looking for %s", resp.StatusCode, path)
 		}
 
-		body := &TransferProgress{ReadCloser: resp.Body, Length: resp.ContentLength, Handler: progress}
+		body := &ProgressReader{
+			ReadCloser: resp.Body,
+			Tracker: &ProgressTracker{
+				Length:  resp.ContentLength,
+				Handler: progress,
+			},
+		}
 
 		sha256 := sha256.New()
 		_, err = io.Copy(io.MultiWriter(out, sha256), body)
diff --git a/shared/util.go b/shared/util.go
index 7ba774b..8651c9d 100644
--- a/shared/util.go
+++ b/shared/util.go
@@ -738,26 +738,20 @@ func RemoveDuplicatesFromString(s string, sep string) string {
 	return s
 }
 
-type TransferProgress struct {
-	io.ReadCloser
-	percentage float64
-	total      int64
-
-	start *time.Time
-	last  *time.Time
-
+type ProgressTracker struct {
 	Length  int64
 	Handler func(int64, int64)
-}
 
-func (pt *TransferProgress) Read(p []byte) (int, error) {
-	// Do normal reader tasks
-	n, err := pt.ReadCloser.Read(p)
-	pt.total += int64(n)
+	percentage float64
+	total      int64
+	start      *time.Time
+	last       *time.Time
+}
 
+func (pt *ProgressTracker) Update(n int) {
 	// Skip the rest if no handler attached
 	if pt.Handler == nil {
-		return n, err
+		return
 	}
 
 	// Initialize start time if needed
@@ -769,7 +763,7 @@ func (pt *TransferProgress) Read(p []byte) (int, error) {
 
 	// Skip if no data to count
 	if n <= 0 {
-		return n, err
+		return
 	}
 
 	// Update interval handling
@@ -778,13 +772,13 @@ func (pt *TransferProgress) Read(p []byte) (int, error) {
 		// If running in relative mode, check that we increased by at least 1%
 		percentage = float64(pt.total) / float64(pt.Length) * float64(100)
 		if percentage-pt.percentage < 0.9 {
-			return n, err
+			return
 		}
 	} else {
 		// If running in absolute mode, check that at least a second elapsed
 		interval := time.Since(*pt.last).Seconds()
 		if interval < 1 {
-			return n, err
+			return
 		}
 	}
 
@@ -813,6 +807,40 @@ func (pt *TransferProgress) Read(p []byte) (int, error) {
 	}
 
 	pt.Handler(progressInt, speedInt)
+}
+
+type ProgressReader struct {
+	io.ReadCloser
+	Tracker *ProgressTracker
+}
+
+func (pt *ProgressReader) Read(p []byte) (int, error) {
+	// Do normal reader tasks
+	n, err := pt.ReadCloser.Read(p)
+
+	// Do the actual progress tracking
+	if pt.Tracker != nil {
+		pt.Tracker.total += int64(n)
+		pt.Tracker.Update(n)
+	}
+
+	return n, err
+}
+
+type ProgressWriter struct {
+	io.WriteCloser
+	Tracker *ProgressTracker
+}
+
+func (pt *ProgressWriter) Write(p []byte) (int, error) {
+	// Do normal writer tasks
+	n, err := pt.WriteCloser.Write(p)
+
+	// Do the actual progress tracking
+	if pt.Tracker != nil {
+		pt.Tracker.total += int64(n)
+		pt.Tracker.Update(n)
+	}
 
 	return n, err
 }

From d11348c452f402930ca640c19406c26afd211de1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 16 Nov 2016 23:35:52 -0500
Subject: [PATCH 5/6] Simplify rsync code
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/rsync.go | 86 ++++++++++++++++++++++++++++--------------------------------
 1 file changed, 40 insertions(+), 46 deletions(-)

diff --git a/lxd/rsync.go b/lxd/rsync.go
index 6369d1c..49554b9 100644
--- a/lxd/rsync.go
+++ b/lxd/rsync.go
@@ -14,44 +14,6 @@ import (
 	"github.com/lxc/lxd/shared"
 )
 
-func rsyncWebsocket(path string, cmd *exec.Cmd, conn *websocket.Conn) error {
-	stdin, err := cmd.StdinPipe()
-	if err != nil {
-		return err
-	}
-
-	stdout, err := cmd.StdoutPipe()
-	if err != nil {
-		return err
-	}
-
-	stderr, err := cmd.StderrPipe()
-	if err != nil {
-		return err
-	}
-
-	if err := cmd.Start(); err != nil {
-		return err
-	}
-
-	readDone, writeDone := shared.WebsocketMirror(conn, stdin, stdout)
-	data, err2 := ioutil.ReadAll(stderr)
-	if err2 != nil {
-		shared.LogDebugf("error reading rsync stderr: %s", err2)
-		return err2
-	}
-
-	err = cmd.Wait()
-	if err != nil {
-		shared.LogDebugf("rsync recv error for path %s: %s: %s", path, err, string(data))
-	}
-
-	<-readDone
-	<-writeDone
-
-	return err
-}
-
 func rsyncSendSetup(path string) (*exec.Cmd, net.Conn, io.ReadCloser, error) {
 	/*
 	 * It's sort of unfortunate, but there's no library call to get a
@@ -153,8 +115,11 @@ func RsyncSend(path string, conn *websocket.Conn) error {
 	return err
 }
 
-func rsyncRecvCmd(path string) *exec.Cmd {
-	return exec.Command("rsync",
+// RsyncRecv sets up the receiving half of the websocket to rsync (the other
+// half set up by RsyncSend), putting the contents in the directory specified
+// by path.
+func RsyncRecv(path string, conn *websocket.Conn) error {
+	cmd := exec.Command("rsync",
 		"--server",
 		"-vlogDtpre.iLsfx",
 		"--numeric-ids",
@@ -162,13 +127,42 @@ func rsyncRecvCmd(path string) *exec.Cmd {
 		"--partial",
 		".",
 		path)
-}
 
-// RsyncRecv sets up the receiving half of the websocket to rsync (the other
-// half set up by RsyncSend), putting the contents in the directory specified
-// by path.
-func RsyncRecv(path string, conn *websocket.Conn) error {
-	return rsyncWebsocket(path, rsyncRecvCmd(path), conn)
+	stdin, err := cmd.StdinPipe()
+	if err != nil {
+		return err
+	}
+
+	stdout, err := cmd.StdoutPipe()
+	if err != nil {
+		return err
+	}
+
+	stderr, err := cmd.StderrPipe()
+	if err != nil {
+		return err
+	}
+
+	if err := cmd.Start(); err != nil {
+		return err
+	}
+
+	readDone, writeDone := shared.WebsocketMirror(conn, stdin, stdout)
+	data, err2 := ioutil.ReadAll(stderr)
+	if err2 != nil {
+		shared.LogDebugf("error reading rsync stderr: %s", err2)
+		return err2
+	}
+
+	err = cmd.Wait()
+	if err != nil {
+		shared.LogDebugf("rsync recv error for path %s: %s: %s", path, err, string(data))
+	}
+
+	<-readDone
+	<-writeDone
+
+	return err
 }
 
 // Netcat is called with:

From c3ebf6eb38a0458a40d06670b6e1ea7f06a6b171 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Wed, 16 Nov 2016 17:50:18 -0500
Subject: [PATCH 6/6] Implement migration progress
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Closes #1891

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 doc/api-extensions.md |  4 +++
 lxd/api_1.0.go        |  1 +
 lxd/migrate.go        |  8 ++---
 lxd/rsync.go          | 18 +++++++---
 lxd/storage.go        | 98 +++++++++++++++++++++++++++++++++++++++++++--------
 lxd/storage_btrfs.go  | 45 +++++++++++++++--------
 lxd/storage_dir.go    |  4 +--
 lxd/storage_lvm.go    |  4 +--
 lxd/storage_test.go   |  2 +-
 lxd/storage_zfs.go    | 43 +++++++++++++++-------
 10 files changed, 172 insertions(+), 55 deletions(-)

diff --git a/doc/api-extensions.md b/doc/api-extensions.md
index fc592c4..d2cfd69 100644
--- a/doc/api-extensions.md
+++ b/doc/api-extensions.md
@@ -156,3 +156,7 @@ Enables adding GPUs to a container.
 
 ## container\_image\_properties
 Introduces a new "image" config key space. Read-only, includes the properties of the parent image.
+
+## migration\_progress
+Transfer progress is now exported as part of the operation, on both sending and receiving ends.
+This shows up as a "fs\_progress" attribute in the operation metadata.
diff --git a/lxd/api_1.0.go b/lxd/api_1.0.go
index c4654e2..a8b001a 100644
--- a/lxd/api_1.0.go
+++ b/lxd/api_1.0.go
@@ -77,6 +77,7 @@ func api10Get(d *Daemon, r *http.Request) Response {
 			"container_exec_signal_handling",
 			"gpu_devices",
 			"container_image_properties",
+			"migration_progress",
 		},
 
 		"api_status":  "stable",
diff --git a/lxd/migrate.go b/lxd/migrate.go
index 061f2a5..6b125cf 100644
--- a/lxd/migrate.go
+++ b/lxd/migrate.go
@@ -375,7 +375,7 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
 		return err
 	}
 
-	if err := driver.SendWhileRunning(s.fsConn); err != nil {
+	if err := driver.SendWhileRunning(s.fsConn, migrateOp); err != nil {
 		return abort(err)
 	}
 
@@ -493,7 +493,7 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
 		 * no reason to do these in parallel. In the future when we're using
 		 * p.haul's protocol, it will make sense to do these in parallel.
 		 */
-		if err := RsyncSend(shared.AddSlash(checkpointDir), s.criuConn); err != nil {
+		if err := RsyncSend(shared.AddSlash(checkpointDir), s.criuConn, nil); err != nil {
 			return abort(err)
 		}
 
@@ -794,7 +794,7 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 			} else {
 				fsConn = c.src.fsConn
 			}
-			if err := mySink(live, c.src.container, header.Snapshots, fsConn, srcIdmap); err != nil {
+			if err := mySink(live, c.src.container, header.Snapshots, fsConn, srcIdmap, migrateOp); err != nil {
 				fsTransfer <- err
 				return
 			}
@@ -823,7 +823,7 @@ func (c *migrationSink) Do(migrateOp *operation) error {
 			} else {
 				criuConn = c.src.criuConn
 			}
-			if err := RsyncRecv(shared.AddSlash(imagesDir), criuConn); err != nil {
+			if err := RsyncRecv(shared.AddSlash(imagesDir), criuConn, nil); err != nil {
 				restore <- err
 				return
 			}
diff --git a/lxd/rsync.go b/lxd/rsync.go
index 49554b9..407fbcf 100644
--- a/lxd/rsync.go
+++ b/lxd/rsync.go
@@ -88,7 +88,7 @@ func rsyncSendSetup(path string) (*exec.Cmd, net.Conn, io.ReadCloser, error) {
 
 // RsyncSend sets up the sending half of an rsync, to recursively send the
 // directory pointed to by path over the websocket.
-func RsyncSend(path string, conn *websocket.Conn) error {
+func RsyncSend(path string, conn *websocket.Conn, readWrapper func(io.ReadCloser) io.ReadCloser) error {
 	cmd, dataSocket, stderr, err := rsyncSendSetup(path)
 	if dataSocket != nil {
 		defer dataSocket.Close()
@@ -97,7 +97,12 @@ func RsyncSend(path string, conn *websocket.Conn) error {
 		return err
 	}
 
-	readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, dataSocket)
+	readPipe := io.ReadCloser(dataSocket)
+	if readWrapper != nil {
+		readPipe = readWrapper(dataSocket)
+	}
+
+	readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, readPipe)
 
 	output, err := ioutil.ReadAll(stderr)
 	if err != nil {
@@ -118,7 +123,7 @@ func RsyncSend(path string, conn *websocket.Conn) error {
 // RsyncRecv sets up the receiving half of the websocket to rsync (the other
 // half set up by RsyncSend), putting the contents in the directory specified
 // by path.
-func RsyncRecv(path string, conn *websocket.Conn) error {
+func RsyncRecv(path string, conn *websocket.Conn, writeWrapper func(io.WriteCloser) io.WriteCloser) error {
 	cmd := exec.Command("rsync",
 		"--server",
 		"-vlogDtpre.iLsfx",
@@ -147,7 +152,12 @@ func RsyncRecv(path string, conn *websocket.Conn) error {
 		return err
 	}
 
-	readDone, writeDone := shared.WebsocketMirror(conn, stdin, stdout)
+	writePipe := io.WriteCloser(stdin)
+	if writeWrapper != nil {
+		writePipe = writeWrapper(stdin)
+	}
+
+	readDone, writeDone := shared.WebsocketMirror(conn, writePipe, stdout)
 	data, err2 := ioutil.ReadAll(stderr)
 	if err2 != nil {
 		shared.LogDebugf("error reading rsync stderr: %s", err2)
diff --git a/lxd/storage.go b/lxd/storage.go
index 5bb94dc..a41ca1b 100644
--- a/lxd/storage.go
+++ b/lxd/storage.go
@@ -3,6 +3,7 @@ package main
 import (
 	"encoding/json"
 	"fmt"
+	"io"
 	"os"
 	"os/exec"
 	"path/filepath"
@@ -117,7 +118,7 @@ type MigrationStorageSourceDriver interface {
 	/* send any bits of the container/snapshots that are possible while the
 	 * container is still running.
 	 */
-	SendWhileRunning(conn *websocket.Conn) error
+	SendWhileRunning(conn *websocket.Conn, op *operation) error
 
 	/* send the final bits (e.g. a final delta snapshot for zfs, btrfs, or
 	 * do a final rsync) of the fs after the container has been
@@ -192,7 +193,7 @@ type storage interface {
 	// already present on the target instance as an exercise for the
 	// enterprising developer.
 	MigrationSource(container container) (MigrationStorageSourceDriver, error)
-	MigrationSink(live bool, container container, objects []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error
+	MigrationSink(live bool, container container, objects []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error
 }
 
 func newStorage(d *Daemon, sType storageType) (storage, error) {
@@ -556,7 +557,7 @@ func (lw *storageLogWrapper) MigrationSource(container container) (MigrationStor
 	return lw.w.MigrationSource(container)
 }
 
-func (lw *storageLogWrapper) MigrationSink(live bool, container container, objects []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
+func (lw *storageLogWrapper) MigrationSink(live bool, container container, objects []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
 	objNames := []string{}
 	for _, obj := range objects {
 		objNames = append(objNames, obj.GetName())
@@ -567,9 +568,10 @@ func (lw *storageLogWrapper) MigrationSink(live bool, container container, objec
 		"container": container.Name(),
 		"objects":   objNames,
 		"srcIdmap":  *srcIdmap,
+		"op":        op,
 	})
 
-	return lw.w.MigrationSink(live, container, objects, conn, srcIdmap)
+	return lw.w.MigrationSink(live, container, objects, conn, srcIdmap, op)
 }
 
 func ShiftIfNecessary(container container, srcIdmap *shared.IdmapSet) error {
@@ -608,7 +610,7 @@ func (s rsyncStorageSourceDriver) Snapshots() []container {
 	return s.snapshots
 }
 
-func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
+func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operation) error {
 	for _, send := range s.snapshots {
 		if err := send.StorageStart(); err != nil {
 			return err
@@ -616,17 +618,19 @@ func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
 		defer send.StorageStop()
 
 		path := send.Path()
-		if err := RsyncSend(shared.AddSlash(path), conn); err != nil {
+		wrapper := StorageProgressReader(op, "fs_progress", send.Name())
+		if err := RsyncSend(shared.AddSlash(path), conn, wrapper); err != nil {
 			return err
 		}
 	}
 
-	return RsyncSend(shared.AddSlash(s.container.Path()), conn)
+	wrapper := StorageProgressReader(op, "fs_progress", s.container.Name())
+	return RsyncSend(shared.AddSlash(s.container.Path()), conn, wrapper)
 }
 
 func (s rsyncStorageSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error {
 	/* resync anything that changed between our first send and the checkpoint */
-	return RsyncSend(shared.AddSlash(s.container.Path()), conn)
+	return RsyncSend(shared.AddSlash(s.container.Path()), conn, nil)
 }
 
 func (s rsyncStorageSourceDriver) Cleanup() {
@@ -672,7 +676,7 @@ func snapshotProtobufToContainerArgs(containerName string, snap *Snapshot) conta
 	}
 }
 
-func rsyncMigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
+func rsyncMigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
 	isDirBackend := container.Storage().GetStorageType() == storageTypeDir
 
 	if isDirBackend {
@@ -689,7 +693,8 @@ func rsyncMigrationSink(live bool, container container, snapshots []*Snapshot, c
 				return err
 			}
 
-			if err := RsyncRecv(shared.AddSlash(s.Path()), conn); err != nil {
+			wrapper := StorageProgressWriter(op, "fs_progress", s.Name())
+			if err := RsyncRecv(shared.AddSlash(s.Path()), conn, wrapper); err != nil {
 				return err
 			}
 
@@ -698,7 +703,8 @@ func rsyncMigrationSink(live bool, container container, snapshots []*Snapshot, c
 			}
 		}
 
-		if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
+		wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+		if err := RsyncRecv(shared.AddSlash(container.Path()), conn, wrapper); err != nil {
 			return err
 		}
 	} else {
@@ -708,7 +714,9 @@ func rsyncMigrationSink(live bool, container container, snapshots []*Snapshot, c
 		defer container.StorageStop()
 
 		for _, snap := range snapshots {
-			if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
+			args := snapshotProtobufToContainerArgs(container.Name(), snap)
+			wrapper := StorageProgressWriter(op, "fs_progress", snap.GetName())
+			if err := RsyncRecv(shared.AddSlash(container.Path()), conn, wrapper); err != nil {
 				return err
 			}
 
@@ -716,21 +724,22 @@ func rsyncMigrationSink(live bool, container container, snapshots []*Snapshot, c
 				return err
 			}
 
-			args := snapshotProtobufToContainerArgs(container.Name(), snap)
 			_, err := containerCreateAsSnapshot(container.Daemon(), args, container)
 			if err != nil {
 				return err
 			}
 		}
 
-		if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
+		wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+		if err := RsyncRecv(shared.AddSlash(container.Path()), conn, wrapper); err != nil {
 			return err
 		}
 	}
 
 	if live {
 		/* now receive the final sync */
-		if err := RsyncRecv(shared.AddSlash(container.Path()), conn); err != nil {
+		wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+		if err := RsyncRecv(shared.AddSlash(container.Path()), conn, wrapper); err != nil {
 			return err
 		}
 	}
@@ -796,3 +805,62 @@ func tryUnmount(path string, flags int) error {
 
 	return nil
 }
+
+func progressWrapperRender(op *operation, key string, description string, progressInt int64, speedInt int64) {
+	meta := op.metadata
+	if meta == nil {
+		meta = make(map[string]interface{})
+	}
+
+	progress := fmt.Sprintf("%s (%s/s)", shared.GetByteSizeString(progressInt), shared.GetByteSizeString(speedInt))
+	if description != "" {
+		progress = fmt.Sprintf("%s: %s (%s/s)", description, shared.GetByteSizeString(progressInt), shared.GetByteSizeString(speedInt))
+	}
+
+	if meta[key] != progress {
+		meta[key] = progress
+		op.UpdateMetadata(meta)
+	}
+}
+
+func StorageProgressReader(op *operation, key string, description string) func(io.ReadCloser) io.ReadCloser {
+	return func(reader io.ReadCloser) io.ReadCloser {
+		if op == nil {
+			return reader
+		}
+
+		progress := func(progressInt int64, speedInt int64) {
+			progressWrapperRender(op, key, description, progressInt, speedInt)
+		}
+
+		readPipe := &shared.ProgressReader{
+			ReadCloser: reader,
+			Tracker: &shared.ProgressTracker{
+				Handler: progress,
+			},
+		}
+
+		return readPipe
+	}
+}
+
+func StorageProgressWriter(op *operation, key string, description string) func(io.WriteCloser) io.WriteCloser {
+	return func(writer io.WriteCloser) io.WriteCloser {
+		if op == nil {
+			return writer
+		}
+
+		progress := func(progressInt int64, speedInt int64) {
+			progressWrapperRender(op, key, description, progressInt, speedInt)
+		}
+
+		writePipe := &shared.ProgressWriter{
+			WriteCloser: writer,
+			Tracker: &shared.ProgressTracker{
+				Handler: progress,
+			},
+		}
+
+		return writePipe
+	}
+}
diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go
index 639c85f..4a8d63f 100644
--- a/lxd/storage_btrfs.go
+++ b/lxd/storage_btrfs.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"fmt"
+	"io"
 	"io/ioutil"
 	"os"
 	"os/exec"
@@ -828,7 +829,7 @@ func (s *btrfsMigrationSourceDriver) Snapshots() []container {
 	return s.snapshots
 }
 
-func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string, btrfsParent string) error {
+func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string, btrfsParent string, readWrapper func(io.ReadCloser) io.ReadCloser) error {
 	args := []string{"send", btrfsPath}
 	if btrfsParent != "" {
 		args = append(args, "-p", btrfsParent)
@@ -841,6 +842,11 @@ func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string
 		return err
 	}
 
+	readPipe := io.ReadCloser(stdout)
+	if readWrapper != nil {
+		readPipe = readWrapper(stdout)
+	}
+
 	stderr, err := cmd.StderrPipe()
 	if err != nil {
 		return err
@@ -850,7 +856,7 @@ func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string
 		return err
 	}
 
-	<-shared.WebsocketSendStream(conn, stdout, 4*1024*1024)
+	<-shared.WebsocketSendStream(conn, readPipe, 4*1024*1024)
 
 	output, err := ioutil.ReadAll(stderr)
 	if err != nil {
@@ -864,7 +870,7 @@ func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string
 	return err
 }
 
-func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
+func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operation) error {
 	if s.container.IsSnapshot() {
 		tmpPath := containerPath(fmt.Sprintf("%s/.migration-send-%s", s.container.Name(), uuid.NewRandom().String()), true)
 		err := os.MkdirAll(tmpPath, 0700)
@@ -879,7 +885,8 @@ func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) erro
 
 		defer s.btrfs.subvolDelete(btrfsPath)
 
-		return s.send(conn, btrfsPath, "")
+		wrapper := StorageProgressReader(op, "fs_progress", s.container.Name())
+		return s.send(conn, btrfsPath, "", wrapper)
 	}
 
 	for i, snap := range s.snapshots {
@@ -888,7 +895,8 @@ func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) erro
 			prev = s.snapshots[i-1].Path()
 		}
 
-		if err := s.send(conn, snap.Path(), prev); err != nil {
+		wrapper := StorageProgressReader(op, "fs_progress", snap.Name())
+		if err := s.send(conn, snap.Path(), prev, wrapper); err != nil {
 			return err
 		}
 	}
@@ -912,7 +920,8 @@ func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) erro
 		btrfsParent = s.btrfsSnapshotNames[len(s.btrfsSnapshotNames)-1]
 	}
 
-	return s.send(conn, s.runningSnapName, btrfsParent)
+	wrapper := StorageProgressReader(op, "fs_progress", s.container.Name())
+	return s.send(conn, s.runningSnapName, btrfsParent, wrapper)
 }
 
 func (s *btrfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error {
@@ -927,7 +936,7 @@ func (s *btrfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) e
 		return err
 	}
 
-	return s.send(conn, s.stoppedSnapName, s.runningSnapName)
+	return s.send(conn, s.stoppedSnapName, s.runningSnapName, nil)
 }
 
 func (s *btrfsMigrationSourceDriver) Cleanup() {
@@ -985,9 +994,9 @@ func (s *storageBtrfs) MigrationSource(c container) (MigrationStorageSourceDrive
 	return driver, nil
 }
 
-func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
+func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
 	if runningInUserns {
-		return rsyncMigrationSink(live, container, snapshots, conn, srcIdmap)
+		return rsyncMigrationSink(live, container, snapshots, conn, srcIdmap, op)
 	}
 
 	cName := container.Name()
@@ -1000,7 +1009,7 @@ func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots [
 		}
 	}
 
-	btrfsRecv := func(btrfsPath string, targetPath string, isSnapshot bool) error {
+	btrfsRecv := func(btrfsPath string, targetPath string, isSnapshot bool, writeWrapper func(io.WriteCloser) io.WriteCloser) error {
 		args := []string{"receive", "-e", btrfsPath}
 		cmd := exec.Command("btrfs", args...)
 
@@ -1024,7 +1033,12 @@ func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots [
 			return err
 		}
 
-		<-shared.WebsocketRecvStream(stdin, conn)
+		writePipe := io.WriteCloser(stdin)
+		if writeWrapper != nil {
+			writePipe = writeWrapper(stdin)
+		}
+
+		<-shared.WebsocketRecvStream(writePipe, conn)
 
 		output, err := ioutil.ReadAll(stderr)
 		if err != nil {
@@ -1063,18 +1077,21 @@ func (s *storageBtrfs) MigrationSink(live bool, container container, snapshots [
 			return err
 		}
 
-		if err := btrfsRecv(containerPath(cName, true), s.Path(), true); err != nil {
+		wrapper := StorageProgressWriter(op, "fs_progress", snap.GetName())
+		if err := btrfsRecv(containerPath(cName, true), s.Path(), true, wrapper); err != nil {
 			return err
 		}
 	}
 
 	/* finally, do the real container */
-	if err := btrfsRecv(containerPath(cName, true), container.Path(), false); err != nil {
+	wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+	if err := btrfsRecv(containerPath(cName, true), container.Path(), false, wrapper); err != nil {
 		return err
 	}
 
 	if live {
-		if err := btrfsRecv(containerPath(cName, true), container.Path(), false); err != nil {
+		wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+		if err := btrfsRecv(containerPath(cName, true), container.Path(), false, wrapper); err != nil {
 			return err
 		}
 	}
diff --git a/lxd/storage_dir.go b/lxd/storage_dir.go
index 3c3a2b0..1fdd85f 100644
--- a/lxd/storage_dir.go
+++ b/lxd/storage_dir.go
@@ -282,6 +282,6 @@ func (s *storageDir) MigrationSource(container container) (MigrationStorageSourc
 	return rsyncMigrationSource(container)
 }
 
-func (s *storageDir) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
-	return rsyncMigrationSink(live, container, snapshots, conn, srcIdmap)
+func (s *storageDir) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
+	return rsyncMigrationSink(live, container, snapshots, conn, srcIdmap, op)
 }
diff --git a/lxd/storage_lvm.go b/lxd/storage_lvm.go
index 7b1e241..3e9404d 100644
--- a/lxd/storage_lvm.go
+++ b/lxd/storage_lvm.go
@@ -976,6 +976,6 @@ func (s *storageLvm) MigrationSource(container container) (MigrationStorageSourc
 	return rsyncMigrationSource(container)
 }
 
-func (s *storageLvm) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
-	return rsyncMigrationSink(live, container, snapshots, conn, srcIdmap)
+func (s *storageLvm) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
+	return rsyncMigrationSink(live, container, snapshots, conn, srcIdmap, op)
 }
diff --git a/lxd/storage_test.go b/lxd/storage_test.go
index c31db24..bd723d4 100644
--- a/lxd/storage_test.go
+++ b/lxd/storage_test.go
@@ -140,6 +140,6 @@ func (s *storageMock) PreservesInodes() bool {
 func (s *storageMock) MigrationSource(container container) (MigrationStorageSourceDriver, error) {
 	return nil, fmt.Errorf("not implemented")
 }
-func (s *storageMock) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
+func (s *storageMock) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
 	return nil
 }
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index 8d59b57..85d57b4 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"fmt"
+	"io"
 	"io/ioutil"
 	"os"
 	"os/exec"
@@ -1246,7 +1247,7 @@ func (s *zfsMigrationSourceDriver) Snapshots() []container {
 	return s.snapshots
 }
 
-func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zfsParent string) error {
+func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zfsParent string, readWrapper func(io.ReadCloser) io.ReadCloser) error {
 	fields := strings.SplitN(s.container.Name(), shared.SnapshotDelimiter, 2)
 	args := []string{"send", fmt.Sprintf("%s/containers/%s@%s", s.zfs.zfsPool, fields[0], zfsName)}
 	if zfsParent != "" {
@@ -1260,6 +1261,11 @@ func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zf
 		return err
 	}
 
+	readPipe := io.ReadCloser(stdout)
+	if readWrapper != nil {
+		readPipe = readWrapper(stdout)
+	}
+
 	stderr, err := cmd.StderrPipe()
 	if err != nil {
 		return err
@@ -1269,7 +1275,7 @@ func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zf
 		return err
 	}
 
-	<-shared.WebsocketSendStream(conn, stdout, 4*1024*1024)
+	<-shared.WebsocketSendStream(conn, readPipe, 4*1024*1024)
 
 	output, err := ioutil.ReadAll(stderr)
 	if err != nil {
@@ -1284,11 +1290,12 @@ func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zf
 	return err
 }
 
-func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error {
+func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operation) error {
 	if s.container.IsSnapshot() {
 		fields := strings.SplitN(s.container.Name(), shared.SnapshotDelimiter, 2)
 		snapshotName := fmt.Sprintf("snapshot-%s", fields[1])
-		return s.send(conn, snapshotName, "")
+		wrapper := StorageProgressReader(op, "fs_progress", s.container.Name())
+		return s.send(conn, snapshotName, "", wrapper)
 	}
 
 	lastSnap := ""
@@ -1301,7 +1308,8 @@ func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error
 
 		lastSnap = snap
 
-		if err := s.send(conn, snap, prev); err != nil {
+		wrapper := StorageProgressReader(op, "fs_progress", snap)
+		if err := s.send(conn, snap, prev, wrapper); err != nil {
 			return err
 		}
 	}
@@ -1311,7 +1319,8 @@ func (s *zfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn) error
 		return err
 	}
 
-	if err := s.send(conn, s.runningSnapName, lastSnap); err != nil {
+	wrapper := StorageProgressReader(op, "fs_progress", s.container.Name())
+	if err := s.send(conn, s.runningSnapName, lastSnap, wrapper); err != nil {
 		return err
 	}
 
@@ -1324,7 +1333,7 @@ func (s *zfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) err
 		return err
 	}
 
-	if err := s.send(conn, s.stoppedSnapName, s.runningSnapName); err != nil {
+	if err := s.send(conn, s.stoppedSnapName, s.runningSnapName, nil); err != nil {
 		return err
 	}
 
@@ -1396,8 +1405,8 @@ func (s *storageZfs) MigrationSource(ct container) (MigrationStorageSourceDriver
 	return &driver, nil
 }
 
-func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet) error {
-	zfsRecv := func(zfsName string) error {
+func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*Snapshot, conn *websocket.Conn, srcIdmap *shared.IdmapSet, op *operation) error {
+	zfsRecv := func(zfsName string, writeWrapper func(io.WriteCloser) io.WriteCloser) error {
 		zfsFsName := fmt.Sprintf("%s/%s", s.zfsPool, zfsName)
 		args := []string{"receive", "-F", "-u", zfsFsName}
 		cmd := exec.Command("zfs", args...)
@@ -1416,7 +1425,12 @@ func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*
 			return err
 		}
 
-		<-shared.WebsocketRecvStream(stdin, conn)
+		writePipe := io.WriteCloser(stdin)
+		if writeWrapper != nil {
+			writePipe = writeWrapper(stdin)
+		}
+
+		<-shared.WebsocketRecvStream(writePipe, conn)
 
 		output, err := ioutil.ReadAll(stderr)
 		if err != nil {
@@ -1449,8 +1463,9 @@ func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*
 			return err
 		}
 
+		wrapper := StorageProgressWriter(op, "fs_progress", snap.GetName())
 		name := fmt.Sprintf("containers/%s at snapshot-%s", container.Name(), snap.GetName())
-		if err := zfsRecv(name); err != nil {
+		if err := zfsRecv(name, wrapper); err != nil {
 			return err
 		}
 
@@ -1484,13 +1499,15 @@ func (s *storageZfs) MigrationSink(live bool, container container, snapshots []*
 	}()
 
 	/* finally, do the real container */
-	if err := zfsRecv(zfsName); err != nil {
+	wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+	if err := zfsRecv(zfsName, wrapper); err != nil {
 		return err
 	}
 
 	if live {
 		/* and again for the post-running snapshot if this was a live migration */
-		if err := zfsRecv(zfsName); err != nil {
+		wrapper := StorageProgressWriter(op, "fs_progress", container.Name())
+		if err := zfsRecv(zfsName, wrapper); err != nil {
 			return err
 		}
 	}


More information about the lxc-devel mailing list