[lxc-devel] [lxd/master] Migration using ReadWriteCloser

tomponline on Github lxc-bot at linuxcontainers.org
Mon Oct 28 14:26:11 UTC 2019


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20191028/30bfa191/attachment.bin>
-------------- next part --------------
From 5f631ad4eff8345ba3835c426f1e8c71a91bf0be Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Fri, 25 Oct 2019 15:12:35 -0400
Subject: [PATCH 1/7] lxd/rsync: Switch to using io.ReadWriteCloser
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/rsync/rsync.go | 68 ++++++++++++++++++----------------------------
 1 file changed, 27 insertions(+), 41 deletions(-)

diff --git a/lxd/rsync/rsync.go b/lxd/rsync/rsync.go
index 0fed6d5f44..17cf13f8f6 100644
--- a/lxd/rsync/rsync.go
+++ b/lxd/rsync/rsync.go
@@ -10,11 +10,11 @@ import (
 	"syscall"
 	"time"
 
-	"github.com/gorilla/websocket"
 	"github.com/pborman/uuid"
 
 	"github.com/lxc/lxd/lxd/daemon"
 	"github.com/lxc/lxd/shared"
+	"github.com/lxc/lxd/shared/ioprogress"
 	"github.com/lxc/lxd/shared/logger"
 )
 
@@ -180,30 +180,35 @@ func sendSetup(name string, path string, bwlimit string, execPath string, featur
 
 // Send sets up the sending half of an rsync, to recursively send the
 // directory pointed to by path over the websocket.
-func Send(name string, path string, conn *websocket.Conn, readWrapper func(io.ReadCloser) io.ReadCloser, features []string, bwlimit string, execPath string) error {
-	cmd, dataSocket, stderr, err := sendSetup(name, path, bwlimit, execPath, features)
+func Send(name string, path string, conn io.ReadWriteCloser, tracker *ioprogress.ProgressTracker, features []string, bwlimit string, execPath string) error {
+	cmd, netcatConn, stderr, err := sendSetup(name, path, bwlimit, execPath, features)
 	if err != nil {
 		return err
 	}
+	defer netcatConn.Close()
 
-	if dataSocket != nil {
-		defer dataSocket.Close()
+	// Setup progress tracker
+	readPipe := io.ReadCloser(netcatConn)
+	if tracker != nil {
+		readPipe = &ioprogress.ProgressReader{netcatConn, tracker}
 	}
 
-	readPipe := io.ReadCloser(dataSocket)
-	if readWrapper != nil {
-		readPipe = readWrapper(dataSocket)
-	}
-
-	readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, readPipe, nil, nil)
+	// Forward from netcat to target
+	chCopy := make(chan error, 1)
+	go func() {
+		_, err := io.Copy(conn, readPipe)
+		chCopy <- err
+	}()
 
+	// Wait for rsync to complete
 	chError := make(chan error, 1)
 	go func() {
 		err = cmd.Wait()
 		if err != nil {
-			dataSocket.Close()
+			netcatConn.Close()
 			readPipe.Close()
 		}
+
 		chError <- err
 	}()
 
@@ -217,8 +222,7 @@ func Send(name string, path string, conn *websocket.Conn, readWrapper func(io.Re
 		logger.Errorf("Rsync send failed: %s: %s: %s", path, err, string(output))
 	}
 
-	<-readDone
-	<-writeDone
+	<-chCopy
 
 	return err
 }
@@ -226,7 +230,7 @@ func Send(name string, path string, conn *websocket.Conn, readWrapper func(io.Re
 // Recv sets up the receiving half of the websocket to rsync (the other
 // half set up by rsync.Send), putting the contents in the directory specified
 // by path.
-func Recv(path string, conn *websocket.Conn, writeWrapper func(io.WriteCloser) io.WriteCloser, features []string) error {
+func Recv(path string, conn io.ReadWriteCloser, tracker *ioprogress.ProgressTracker, features []string) error {
 	args := []string{
 		"--server",
 		"-vlogDtpre.iLsfx",
@@ -243,48 +247,30 @@ func Recv(path string, conn *websocket.Conn, writeWrapper func(io.WriteCloser) i
 	args = append(args, []string{".", path}...)
 
 	cmd := exec.Command("rsync", args...)
-
-	stdin, err := cmd.StdinPipe()
-	if err != nil {
-		return err
-	}
-
-	stdout, err := cmd.StdoutPipe()
-	if err != nil {
-		return err
+	if tracker != nil {
+		cmd.Stdin = &ioprogress.ProgressReader{conn, tracker}
 	}
+	cmd.Stdin = conn
+	cmd.Stdout = conn
 
 	stderr, err := cmd.StderrPipe()
 	if err != nil {
 		return err
 	}
 
-	if err := cmd.Start(); err != nil {
-		return err
-	}
-
-	writePipe := io.WriteCloser(stdin)
-	if writeWrapper != nil {
-		writePipe = writeWrapper(stdin)
-	}
-
-	readDone, writeDone := shared.WebsocketMirror(conn, writePipe, stdout, nil, nil)
-	output, err := ioutil.ReadAll(stderr)
+	err = cmd.Start()
 	if err != nil {
-		cmd.Process.Kill()
-		cmd.Wait()
 		return err
 	}
 
 	err = cmd.Wait()
+	output, err := ioutil.ReadAll(stderr)
 	if err != nil {
 		logger.Errorf("Rsync receive failed: %s: %s: %s", path, err, string(output))
+		return err
 	}
 
-	<-readDone
-	<-writeDone
-
-	return err
+	return nil
 }
 
 func rsyncFeatureArgs(features []string) []string {

From 4ea34114bc033e39c5a8d2ad0d4a7bef4f7c7b91 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Fri, 25 Oct 2019 15:46:34 -0400
Subject: [PATCH 2/7] shared: Implement a WebsocketIO ReadWriteCloser
 abstraction
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 shared/network.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 66 insertions(+)

diff --git a/shared/network.go b/shared/network.go
index 00e200001e..c73677df93 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -363,6 +363,72 @@ func DefaultWriter(conn *websocket.Conn, w io.WriteCloser, writeDone chan<- bool
 	w.Close()
 }
 
+// WebsocketIO is a wrapper implementing ReadWriteCloser on top of websocket
+type WebsocketIO struct {
+	Conn   *websocket.Conn
+	reader io.Reader
+}
+
+func (w *WebsocketIO) Read(p []byte) (n int, err error) {
+	for {
+		// First read from this message
+		if w.reader == nil {
+			var mt int
+
+			mt, w.reader, err = w.Conn.NextReader()
+			if err != nil {
+				return -1, err
+			}
+
+			if mt == websocket.CloseMessage {
+				return 0, io.EOF
+			}
+
+			if mt == websocket.TextMessage {
+				return 0, io.EOF
+			}
+		}
+
+		// Perform the read itself
+		n, err := w.reader.Read(p)
+		if err == io.EOF {
+
+			// At the end of the message, reset reader
+			w.reader = nil
+
+			return n, nil
+		}
+
+		if err != nil {
+			return -1, err
+		}
+
+		return n, nil
+	}
+}
+
+func (w *WebsocketIO) Write(p []byte) (n int, err error) {
+
+	wr, err := w.Conn.NextWriter(websocket.BinaryMessage)
+	if err != nil {
+		return -1, err
+	}
+
+	n, err = wr.Write(p)
+	if err != nil {
+		return -1, err
+	}
+	wr.Close()
+
+	return n, nil
+}
+
+func (w *WebsocketIO) Close() error {
+	closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
+	w.Conn.WriteMessage(websocket.CloseMessage, closeMsg)
+	return w.Conn.Close()
+}
+
 // WebsocketMirror allows mirroring a reader to a websocket and taking the
 // result and writing it to a writer. This function allows for multiple
 // mirrorings and correctly negotiates stream endings. However, it means any

From f6653d302022dbb06846e9366d249dacb2527dc9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Fri, 25 Oct 2019 15:46:53 -0400
Subject: [PATCH 3/7] lxd/migration: Introduce ProgressTracker
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/migration/migration_volumes.go | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/lxd/migration/migration_volumes.go b/lxd/migration/migration_volumes.go
index b7ecb3aebc..1fd63a502b 100644
--- a/lxd/migration/migration_volumes.go
+++ b/lxd/migration/migration_volumes.go
@@ -188,3 +188,16 @@ func ProgressWriter(op *operations.Operation, key string, description string) fu
 		return writePipe
 	}
 }
+
+// ProgressTracker returns a migration I/O tracker
+func ProgressTracker(op *operations.Operation, key string, description string) *ioprogress.ProgressTracker {
+	progress := func(progressInt int64, speedInt int64) {
+		progressWrapperRender(op, key, description, progressInt, speedInt)
+	}
+
+	tracker := &ioprogress.ProgressTracker{
+		Handler: progress,
+	}
+
+	return tracker
+}

From ea252b4616c02e38ae4f8f8f168caf4b61287232 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Fri, 25 Oct 2019 15:47:16 -0400
Subject: [PATCH 4/7] lxd/migration: Switch over to ReadWriteCloser for rsync
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/migrate_container.go |  8 +++----
 lxd/storage_migration.go | 46 ++++++++++++++++++++--------------------
 2 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/lxd/migrate_container.go b/lxd/migrate_container.go
index 69b5d362ff..fda13de567 100644
--- a/lxd/migrate_container.go
+++ b/lxd/migrate_container.go
@@ -267,7 +267,7 @@ func (s *migrationSourceWs) preDumpLoop(args *preDumpLoopArgs) (bool, error) {
 	// Send the pre-dump.
 	ctName, _, _ := shared.ContainerGetParentAndSnapshotName(s.instance.Name())
 	state := s.instance.DaemonState()
-	err = rsync.Send(ctName, shared.AddSlash(args.checkpointDir), s.criuConn, nil, args.rsyncFeatures, args.bwlimit, state.OS.ExecPath)
+	err = rsync.Send(ctName, shared.AddSlash(args.checkpointDir), &shared.WebsocketIO{Conn: s.criuConn}, nil, args.rsyncFeatures, args.bwlimit, state.OS.ExecPath)
 	if err != nil {
 		return final, err
 	}
@@ -682,7 +682,7 @@ func (s *migrationSourceWs) Do(migrateOp *operations.Operation) error {
 		 */
 		ctName, _, _ := shared.ContainerGetParentAndSnapshotName(s.instance.Name())
 		state := s.instance.DaemonState()
-		err = rsync.Send(ctName, shared.AddSlash(checkpointDir), s.criuConn, nil, rsyncFeatures, bwlimit, state.OS.ExecPath)
+		err = rsync.Send(ctName, shared.AddSlash(checkpointDir), &shared.WebsocketIO{Conn: s.criuConn}, nil, rsyncFeatures, bwlimit, state.OS.ExecPath)
 		if err != nil {
 			return abort(err)
 		}
@@ -1062,7 +1062,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 				for !sync.GetFinalPreDump() {
 					logger.Debugf("About to receive rsync")
 					// Transfer a CRIU pre-dump
-					err = rsync.Recv(shared.AddSlash(imagesDir), criuConn, nil, rsyncFeatures)
+					err = rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures)
 					if err != nil {
 						restore <- err
 						return
@@ -1090,7 +1090,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
 			}
 
 			// Final CRIU dump
-			err = rsync.Recv(shared.AddSlash(imagesDir), criuConn, nil, rsyncFeatures)
+			err = rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures)
 			if err != nil {
 				restore <- err
 				return
diff --git a/lxd/storage_migration.go b/lxd/storage_migration.go
index 5dae246681..6011f304ce 100644
--- a/lxd/storage_migration.go
+++ b/lxd/storage_migration.go
@@ -68,23 +68,23 @@ func (s rsyncStorageSourceDriver) SendStorageVolume(conn *websocket.Conn, op *op
 		}
 
 		for _, snap := range snapshots {
-			wrapper := migration.ProgressReader(op, "fs_progress", snap.Name)
+			wrapper := migration.ProgressTracker(op, "fs_progress", snap.Name)
 			path := driver.GetStoragePoolVolumeSnapshotMountPoint(pool.Name, snap.Name)
 			path = shared.AddSlash(path)
 			logger.Debugf("Starting to send storage volume snapshot %s on storage pool %s from %s", snap.Name, pool.Name, path)
 
-			err = rsync.Send(volume.Name, path, conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
+			err = rsync.Send(volume.Name, path, &shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
 			if err != nil {
 				return err
 			}
 		}
 	}
 
-	wrapper := migration.ProgressReader(op, "fs_progress", volume.Name)
+	wrapper := migration.ProgressTracker(op, "fs_progress", volume.Name)
 	path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name)
 	path = shared.AddSlash(path)
 	logger.Debugf("Starting to send storage volume %s on storage pool %s from %s", volume.Name, pool.Name, path)
-	err = rsync.Send(volume.Name, path, conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
+	err = rsync.Send(volume.Name, path, &shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
 	if err != nil {
 		return err
 	}
@@ -106,16 +106,16 @@ func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn, op *ope
 			}
 
 			path := send.Path()
-			wrapper := migration.ProgressReader(op, "fs_progress", send.Name())
+			wrapper := migration.ProgressTracker(op, "fs_progress", send.Name())
 			state := s.container.DaemonState()
-			err = rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(path), conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
+			err = rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(path), &shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
 			if err != nil {
 				return err
 			}
 		}
 	}
 
-	wrapper := migration.ProgressReader(op, "fs_progress", s.container.Name())
+	wrapper := migration.ProgressTracker(op, "fs_progress", s.container.Name())
 	state := s.container.DaemonState()
 
 	// Attempt to freeze the container to avoid changing files during transfer
@@ -128,14 +128,14 @@ func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn, op *ope
 		}
 	}
 
-	return rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(s.container.Path()), conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
+	return rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(s.container.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
 }
 
 func (s rsyncStorageSourceDriver) SendAfterCheckpoint(conn *websocket.Conn, bwlimit string) error {
 	ctName, _, _ := shared.ContainerGetParentAndSnapshotName(s.container.Name())
 	// resync anything that changed between our first send and the checkpoint
 	state := s.container.DaemonState()
-	return rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(s.container.Path()), conn, nil, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
+	return rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(s.container.Path()), &shared.WebsocketIO{Conn: conn}, nil, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
 }
 
 func (s rsyncStorageSourceDriver) Cleanup() {
@@ -259,12 +259,12 @@ func rsyncStorageMigrationSink(conn *websocket.Conn, op *operations.Operation, a
 				return err
 			}
 
-			wrapper := migration.ProgressWriter(op, "fs_progress", target.Name)
+			wrapper := migration.ProgressTracker(op, "fs_progress", target.Name)
 			path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name)
 			path = shared.AddSlash(path)
 			logger.Debugf("Starting to receive storage volume snapshot %s on storage pool %s into %s", target.Name, pool.Name, path)
 
-			err = rsync.Recv(path, conn, wrapper, args.RsyncFeatures)
+			err = rsync.Recv(path, &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
 			if err != nil {
 				return err
 			}
@@ -276,11 +276,11 @@ func rsyncStorageMigrationSink(conn *websocket.Conn, op *operations.Operation, a
 		}
 	}
 
-	wrapper := migration.ProgressWriter(op, "fs_progress", volume.Name)
+	wrapper := migration.ProgressTracker(op, "fs_progress", volume.Name)
 	path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name)
 	path = shared.AddSlash(path)
 	logger.Debugf("Starting to receive storage volume %s on storage pool %s into %s", volume.Name, pool.Name, path)
-	return rsync.Recv(path, conn, wrapper, args.RsyncFeatures)
+	return rsync.Recv(path, &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
 }
 
 func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
@@ -356,8 +356,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
 					}
 				}
 
-				wrapper := migration.ProgressWriter(op, "fs_progress", s.Name())
-				if err := rsync.Recv(shared.AddSlash(s.Path()), conn, wrapper, args.RsyncFeatures); err != nil {
+				wrapper := migration.ProgressTracker(op, "fs_progress", s.Name())
+				if err := rsync.Recv(shared.AddSlash(s.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures); err != nil {
 					return err
 				}
 
@@ -371,8 +371,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
 			}
 		}
 
-		wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name())
-		err = rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures)
+		wrapper := migration.ProgressTracker(op, "fs_progress", args.Instance.Name())
+		err = rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
 		if err != nil {
 			return err
 		}
@@ -409,8 +409,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
 					}
 				}
 
-				wrapper := migration.ProgressWriter(op, "fs_progress", snap.GetName())
-				err := rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures)
+				wrapper := migration.ProgressTracker(op, "fs_progress", snap.GetName())
+				err := rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
 				if err != nil {
 					return err
 				}
@@ -434,8 +434,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
 			}
 		}
 
-		wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name())
-		err = rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures)
+		wrapper := migration.ProgressTracker(op, "fs_progress", args.Instance.Name())
+		err = rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
 		if err != nil {
 			return err
 		}
@@ -443,8 +443,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
 
 	if args.Live {
 		/* now receive the final sync */
-		wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name())
-		err := rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures)
+		wrapper := migration.ProgressTracker(op, "fs_progress", args.Instance.Name())
+		err := rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
 		if err != nil {
 			return err
 		}

From 489b82dc0d6ebaf8aea4cacf7af7de4cdc4ce863 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 28 Oct 2019 10:39:46 +0100
Subject: [PATCH 5/7] lxd/rsync: DEBUG
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 lxd/rsync/rsync.go | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/lxd/rsync/rsync.go b/lxd/rsync/rsync.go
index 17cf13f8f6..241915f7ee 100644
--- a/lxd/rsync/rsync.go
+++ b/lxd/rsync/rsync.go
@@ -231,6 +231,7 @@ func Send(name string, path string, conn io.ReadWriteCloser, tracker *ioprogress
 // half set up by rsync.Send), putting the contents in the directory specified
 // by path.
 func Recv(path string, conn io.ReadWriteCloser, tracker *ioprogress.ProgressTracker, features []string) error {
+	logger.Errorf("stgraber: in rsync recv")
 	args := []string{
 		"--server",
 		"-vlogDtpre.iLsfx",
@@ -258,18 +259,23 @@ func Recv(path string, conn io.ReadWriteCloser, tracker *ioprogress.ProgressTrac
 		return err
 	}
 
+	logger.Errorf("stgraber: starting")
 	err = cmd.Start()
 	if err != nil {
 		return err
 	}
+	logger.Errorf("stgraber: started, waiting")
 
 	err = cmd.Wait()
+	logger.Errorf("stgraber: done waiting, reading stderr")
 	output, err := ioutil.ReadAll(stderr)
+	logger.Errorf("stgraber: done reading stderr")
 	if err != nil {
 		logger.Errorf("Rsync receive failed: %s: %s: %s", path, err, string(output))
 		return err
 	}
 
+	logger.Errorf("stgraber: out")
 	return nil
 }
 

From 9ae17d4816cb97817a4ce2e9e2a666d5f1d9e274 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 28 Oct 2019 10:41:10 +0100
Subject: [PATCH 6/7] shared: DEBUG
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
 shared/network.go | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/shared/network.go b/shared/network.go
index c73677df93..2eb885e3c8 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -370,6 +370,7 @@ type WebsocketIO struct {
 }
 
 func (w *WebsocketIO) Read(p []byte) (n int, err error) {
+	logger.Errorf("stgraber: read: entering: %v", w.Conn.LocalAddr())
 	for {
 		// First read from this message
 		if w.reader == nil {
@@ -377,14 +378,18 @@ func (w *WebsocketIO) Read(p []byte) (n int, err error) {
 
 			mt, w.reader, err = w.Conn.NextReader()
 			if err != nil {
+				logger.Errorf("stgraber: read: got bad reader")
 				return -1, err
 			}
+			logger.Errorf("stgraber: read: new reader: %v", mt)
 
 			if mt == websocket.CloseMessage {
+				logger.Errorf("stgraber: read: got close message")
 				return 0, io.EOF
 			}
 
 			if mt == websocket.TextMessage {
+				logger.Errorf("stgraber: read: got barrier")
 				return 0, io.EOF
 			}
 		}
@@ -392,38 +397,47 @@ func (w *WebsocketIO) Read(p []byte) (n int, err error) {
 		// Perform the read itself
 		n, err := w.reader.Read(p)
 		if err == io.EOF {
+			logger.Errorf("stgraber: read: got message EOF")
 
 			// At the end of the message, reset reader
 			w.reader = nil
 
+			logger.Errorf("stgraber: read: EOF forwarded: %d", n)
 			return n, nil
 		}
 
 		if err != nil {
+			logger.Errorf("stgraber: read: failed to read")
 			return -1, err
 		}
 
+		logger.Errorf("stgraber: read: forwarded: %d", n)
 		return n, nil
 	}
 }
 
 func (w *WebsocketIO) Write(p []byte) (n int, err error) {
+	logger.Errorf("stgraber: write: entering: %v", w.Conn.LocalAddr())
 
 	wr, err := w.Conn.NextWriter(websocket.BinaryMessage)
 	if err != nil {
+		logger.Errorf("stgraber: write: failed to setup writer: %v", err)
 		return -1, err
 	}
 
 	n, err = wr.Write(p)
 	if err != nil {
+		logger.Errorf("stgraber: write: failed to write: %v", err)
 		return -1, err
 	}
 	wr.Close()
 
+	logger.Errorf("stgraber: write: sent: %d", n)
 	return n, nil
 }
 
 func (w *WebsocketIO) Close() error {
+	logger.Errorf("stgraber: close")
 	closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
 	w.Conn.WriteMessage(websocket.CloseMessage, closeMsg)
 	return w.Conn.Close()

From 1d7e3075bf65bdefc929e2fdf033eb35ced355fe Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Mon, 28 Oct 2019 14:25:06 +0000
Subject: [PATCH 7/7] lxd/rsync/rsync: Updates Send() to work with
 ReadWriteClose

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/rsync/rsync.go | 36 ++++++++++++++++++++++--------------
 1 file changed, 22 insertions(+), 14 deletions(-)

diff --git a/lxd/rsync/rsync.go b/lxd/rsync/rsync.go
index 241915f7ee..86e1248fc8 100644
--- a/lxd/rsync/rsync.go
+++ b/lxd/rsync/rsync.go
@@ -186,30 +186,37 @@ func Send(name string, path string, conn io.ReadWriteCloser, tracker *ioprogress
 		return err
 	}
 	defer netcatConn.Close()
+	defer conn.Close()
 
-	// Setup progress tracker
-	readPipe := io.ReadCloser(netcatConn)
+	// Setup progress tracker.
+	readNetcatPipe := io.ReadCloser(netcatConn)
 	if tracker != nil {
-		readPipe = &ioprogress.ProgressReader{netcatConn, tracker}
+		readNetcatPipe = &ioprogress.ProgressReader{netcatConn, tracker}
 	}
 
-	// Forward from netcat to target
-	chCopy := make(chan error, 1)
+	// Forward from netcat to target.
+	chCopyNetcat := make(chan error, 1)
 	go func() {
-		_, err := io.Copy(conn, readPipe)
-		chCopy <- err
+		_, err := io.Copy(conn, readNetcatPipe)
+		chCopyNetcat <- err
+		readNetcatPipe.Close()
 	}()
 
-	// Wait for rsync to complete
+	// Forward from target to netcat.
+	writeNetcatPipe := io.WriteCloser(netcatConn)
+	chCopyTarget := make(chan error, 1)
+	go func() {
+		_, err := io.Copy(writeNetcatPipe, conn)
+		chCopyTarget <- err
+		writeNetcatPipe.Close()
+	}()
+
+	// Wait for rsync to complete.
 	chError := make(chan error, 1)
 	go func() {
 		err = cmd.Wait()
-		if err != nil {
-			netcatConn.Close()
-			readPipe.Close()
-		}
-
 		chError <- err
+		netcatConn.Close()
 	}()
 
 	output, err := ioutil.ReadAll(stderr)
@@ -222,7 +229,8 @@ func Send(name string, path string, conn io.ReadWriteCloser, tracker *ioprogress
 		logger.Errorf("Rsync send failed: %s: %s: %s", path, err, string(output))
 	}
 
-	<-chCopy
+	<-chCopyNetcat
+	<-chCopyTarget
 
 	return err
 }


More information about the lxc-devel mailing list