[lxc-devel] [lxd/master] Migration using ReadWriteCloser
tomponline on Github
lxc-bot at linuxcontainers.org
Mon Oct 28 14:26:11 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20191028/30bfa191/attachment.bin>
-------------- next part --------------
From 5f631ad4eff8345ba3835c426f1e8c71a91bf0be Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Fri, 25 Oct 2019 15:12:35 -0400
Subject: [PATCH 1/7] lxd/rsync: Switch to using io.ReadWriteCloser
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/rsync/rsync.go | 68 ++++++++++++++++++----------------------------
1 file changed, 27 insertions(+), 41 deletions(-)
diff --git a/lxd/rsync/rsync.go b/lxd/rsync/rsync.go
index 0fed6d5f44..17cf13f8f6 100644
--- a/lxd/rsync/rsync.go
+++ b/lxd/rsync/rsync.go
@@ -10,11 +10,11 @@ import (
"syscall"
"time"
- "github.com/gorilla/websocket"
"github.com/pborman/uuid"
"github.com/lxc/lxd/lxd/daemon"
"github.com/lxc/lxd/shared"
+ "github.com/lxc/lxd/shared/ioprogress"
"github.com/lxc/lxd/shared/logger"
)
@@ -180,30 +180,35 @@ func sendSetup(name string, path string, bwlimit string, execPath string, featur
// Send sets up the sending half of an rsync, to recursively send the
// directory pointed to by path over the websocket.
-func Send(name string, path string, conn *websocket.Conn, readWrapper func(io.ReadCloser) io.ReadCloser, features []string, bwlimit string, execPath string) error {
- cmd, dataSocket, stderr, err := sendSetup(name, path, bwlimit, execPath, features)
+func Send(name string, path string, conn io.ReadWriteCloser, tracker *ioprogress.ProgressTracker, features []string, bwlimit string, execPath string) error {
+ cmd, netcatConn, stderr, err := sendSetup(name, path, bwlimit, execPath, features)
if err != nil {
return err
}
+ defer netcatConn.Close()
- if dataSocket != nil {
- defer dataSocket.Close()
+ // Setup progress tracker
+ readPipe := io.ReadCloser(netcatConn)
+ if tracker != nil {
+ readPipe = &ioprogress.ProgressReader{netcatConn, tracker}
}
- readPipe := io.ReadCloser(dataSocket)
- if readWrapper != nil {
- readPipe = readWrapper(dataSocket)
- }
-
- readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, readPipe, nil, nil)
+ // Forward from netcat to target
+ chCopy := make(chan error, 1)
+ go func() {
+ _, err := io.Copy(conn, readPipe)
+ chCopy <- err
+ }()
+ // Wait for rsync to complete
chError := make(chan error, 1)
go func() {
err = cmd.Wait()
if err != nil {
- dataSocket.Close()
+ netcatConn.Close()
readPipe.Close()
}
+
chError <- err
}()
@@ -217,8 +222,7 @@ func Send(name string, path string, conn *websocket.Conn, readWrapper func(io.Re
logger.Errorf("Rsync send failed: %s: %s: %s", path, err, string(output))
}
- <-readDone
- <-writeDone
+ <-chCopy
return err
}
@@ -226,7 +230,7 @@ func Send(name string, path string, conn *websocket.Conn, readWrapper func(io.Re
// Recv sets up the receiving half of the websocket to rsync (the other
// half set up by rsync.Send), putting the contents in the directory specified
// by path.
-func Recv(path string, conn *websocket.Conn, writeWrapper func(io.WriteCloser) io.WriteCloser, features []string) error {
+func Recv(path string, conn io.ReadWriteCloser, tracker *ioprogress.ProgressTracker, features []string) error {
args := []string{
"--server",
"-vlogDtpre.iLsfx",
@@ -243,48 +247,30 @@ func Recv(path string, conn *websocket.Conn, writeWrapper func(io.WriteCloser) i
args = append(args, []string{".", path}...)
cmd := exec.Command("rsync", args...)
-
- stdin, err := cmd.StdinPipe()
- if err != nil {
- return err
- }
-
- stdout, err := cmd.StdoutPipe()
- if err != nil {
- return err
+ if tracker != nil {
+ cmd.Stdin = &ioprogress.ProgressReader{conn, tracker}
}
+ cmd.Stdin = conn
+ cmd.Stdout = conn
stderr, err := cmd.StderrPipe()
if err != nil {
return err
}
- if err := cmd.Start(); err != nil {
- return err
- }
-
- writePipe := io.WriteCloser(stdin)
- if writeWrapper != nil {
- writePipe = writeWrapper(stdin)
- }
-
- readDone, writeDone := shared.WebsocketMirror(conn, writePipe, stdout, nil, nil)
- output, err := ioutil.ReadAll(stderr)
+ err = cmd.Start()
if err != nil {
- cmd.Process.Kill()
- cmd.Wait()
return err
}
err = cmd.Wait()
+ output, err := ioutil.ReadAll(stderr)
if err != nil {
logger.Errorf("Rsync receive failed: %s: %s: %s", path, err, string(output))
+ return err
}
- <-readDone
- <-writeDone
-
- return err
+ return nil
}
func rsyncFeatureArgs(features []string) []string {
From 4ea34114bc033e39c5a8d2ad0d4a7bef4f7c7b91 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Fri, 25 Oct 2019 15:46:34 -0400
Subject: [PATCH 2/7] shared: Implement a WebsocketIO ReadWriteCloser
abstraction
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
shared/network.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 66 insertions(+)
diff --git a/shared/network.go b/shared/network.go
index 00e200001e..c73677df93 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -363,6 +363,72 @@ func DefaultWriter(conn *websocket.Conn, w io.WriteCloser, writeDone chan<- bool
w.Close()
}
+// WebsocketIO is a wrapper implementing ReadWriteCloser on top of websocket
+type WebsocketIO struct {
+ Conn *websocket.Conn
+ reader io.Reader
+}
+
+func (w *WebsocketIO) Read(p []byte) (n int, err error) {
+ for {
+ // First read from this message
+ if w.reader == nil {
+ var mt int
+
+ mt, w.reader, err = w.Conn.NextReader()
+ if err != nil {
+ return -1, err
+ }
+
+ if mt == websocket.CloseMessage {
+ return 0, io.EOF
+ }
+
+ if mt == websocket.TextMessage {
+ return 0, io.EOF
+ }
+ }
+
+ // Perform the read itself
+ n, err := w.reader.Read(p)
+ if err == io.EOF {
+
+ // At the end of the message, reset reader
+ w.reader = nil
+
+ return n, nil
+ }
+
+ if err != nil {
+ return -1, err
+ }
+
+ return n, nil
+ }
+}
+
+func (w *WebsocketIO) Write(p []byte) (n int, err error) {
+
+ wr, err := w.Conn.NextWriter(websocket.BinaryMessage)
+ if err != nil {
+ return -1, err
+ }
+
+ n, err = wr.Write(p)
+ if err != nil {
+ return -1, err
+ }
+ wr.Close()
+
+ return n, nil
+}
+
+func (w *WebsocketIO) Close() error {
+ closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
+ w.Conn.WriteMessage(websocket.CloseMessage, closeMsg)
+ return w.Conn.Close()
+}
+
// WebsocketMirror allows mirroring a reader to a websocket and taking the
// result and writing it to a writer. This function allows for multiple
// mirrorings and correctly negotiates stream endings. However, it means any
From f6653d302022dbb06846e9366d249dacb2527dc9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Fri, 25 Oct 2019 15:46:53 -0400
Subject: [PATCH 3/7] lxd/migration: Introduce ProgressTracker
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/migration/migration_volumes.go | 13 +++++++++++++
1 file changed, 13 insertions(+)
diff --git a/lxd/migration/migration_volumes.go b/lxd/migration/migration_volumes.go
index b7ecb3aebc..1fd63a502b 100644
--- a/lxd/migration/migration_volumes.go
+++ b/lxd/migration/migration_volumes.go
@@ -188,3 +188,16 @@ func ProgressWriter(op *operations.Operation, key string, description string) fu
return writePipe
}
}
+
+// ProgressTracker returns a migration I/O tracker
+func ProgressTracker(op *operations.Operation, key string, description string) *ioprogress.ProgressTracker {
+ progress := func(progressInt int64, speedInt int64) {
+ progressWrapperRender(op, key, description, progressInt, speedInt)
+ }
+
+ tracker := &ioprogress.ProgressTracker{
+ Handler: progress,
+ }
+
+ return tracker
+}
From ea252b4616c02e38ae4f8f8f168caf4b61287232 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Fri, 25 Oct 2019 15:47:16 -0400
Subject: [PATCH 4/7] lxd/migration: Switch over to ReadWriteCloser for rsync
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/migrate_container.go | 8 +++----
lxd/storage_migration.go | 46 ++++++++++++++++++++--------------------
2 files changed, 27 insertions(+), 27 deletions(-)
diff --git a/lxd/migrate_container.go b/lxd/migrate_container.go
index 69b5d362ff..fda13de567 100644
--- a/lxd/migrate_container.go
+++ b/lxd/migrate_container.go
@@ -267,7 +267,7 @@ func (s *migrationSourceWs) preDumpLoop(args *preDumpLoopArgs) (bool, error) {
// Send the pre-dump.
ctName, _, _ := shared.ContainerGetParentAndSnapshotName(s.instance.Name())
state := s.instance.DaemonState()
- err = rsync.Send(ctName, shared.AddSlash(args.checkpointDir), s.criuConn, nil, args.rsyncFeatures, args.bwlimit, state.OS.ExecPath)
+ err = rsync.Send(ctName, shared.AddSlash(args.checkpointDir), &shared.WebsocketIO{Conn: s.criuConn}, nil, args.rsyncFeatures, args.bwlimit, state.OS.ExecPath)
if err != nil {
return final, err
}
@@ -682,7 +682,7 @@ func (s *migrationSourceWs) Do(migrateOp *operations.Operation) error {
*/
ctName, _, _ := shared.ContainerGetParentAndSnapshotName(s.instance.Name())
state := s.instance.DaemonState()
- err = rsync.Send(ctName, shared.AddSlash(checkpointDir), s.criuConn, nil, rsyncFeatures, bwlimit, state.OS.ExecPath)
+ err = rsync.Send(ctName, shared.AddSlash(checkpointDir), &shared.WebsocketIO{Conn: s.criuConn}, nil, rsyncFeatures, bwlimit, state.OS.ExecPath)
if err != nil {
return abort(err)
}
@@ -1062,7 +1062,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
for !sync.GetFinalPreDump() {
logger.Debugf("About to receive rsync")
// Transfer a CRIU pre-dump
- err = rsync.Recv(shared.AddSlash(imagesDir), criuConn, nil, rsyncFeatures)
+ err = rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures)
if err != nil {
restore <- err
return
@@ -1090,7 +1090,7 @@ func (c *migrationSink) Do(migrateOp *operations.Operation) error {
}
// Final CRIU dump
- err = rsync.Recv(shared.AddSlash(imagesDir), criuConn, nil, rsyncFeatures)
+ err = rsync.Recv(shared.AddSlash(imagesDir), &shared.WebsocketIO{Conn: criuConn}, nil, rsyncFeatures)
if err != nil {
restore <- err
return
diff --git a/lxd/storage_migration.go b/lxd/storage_migration.go
index 5dae246681..6011f304ce 100644
--- a/lxd/storage_migration.go
+++ b/lxd/storage_migration.go
@@ -68,23 +68,23 @@ func (s rsyncStorageSourceDriver) SendStorageVolume(conn *websocket.Conn, op *op
}
for _, snap := range snapshots {
- wrapper := migration.ProgressReader(op, "fs_progress", snap.Name)
+ wrapper := migration.ProgressTracker(op, "fs_progress", snap.Name)
path := driver.GetStoragePoolVolumeSnapshotMountPoint(pool.Name, snap.Name)
path = shared.AddSlash(path)
logger.Debugf("Starting to send storage volume snapshot %s on storage pool %s from %s", snap.Name, pool.Name, path)
- err = rsync.Send(volume.Name, path, conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
+ err = rsync.Send(volume.Name, path, &shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
if err != nil {
return err
}
}
}
- wrapper := migration.ProgressReader(op, "fs_progress", volume.Name)
+ wrapper := migration.ProgressTracker(op, "fs_progress", volume.Name)
path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name)
path = shared.AddSlash(path)
logger.Debugf("Starting to send storage volume %s on storage pool %s from %s", volume.Name, pool.Name, path)
- err = rsync.Send(volume.Name, path, conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
+ err = rsync.Send(volume.Name, path, &shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
if err != nil {
return err
}
@@ -106,16 +106,16 @@ func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn, op *ope
}
path := send.Path()
- wrapper := migration.ProgressReader(op, "fs_progress", send.Name())
+ wrapper := migration.ProgressTracker(op, "fs_progress", send.Name())
state := s.container.DaemonState()
- err = rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(path), conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
+ err = rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(path), &shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
if err != nil {
return err
}
}
}
- wrapper := migration.ProgressReader(op, "fs_progress", s.container.Name())
+ wrapper := migration.ProgressTracker(op, "fs_progress", s.container.Name())
state := s.container.DaemonState()
// Attempt to freeze the container to avoid changing files during transfer
@@ -128,14 +128,14 @@ func (s rsyncStorageSourceDriver) SendWhileRunning(conn *websocket.Conn, op *ope
}
}
- return rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(s.container.Path()), conn, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
+ return rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(s.container.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
}
func (s rsyncStorageSourceDriver) SendAfterCheckpoint(conn *websocket.Conn, bwlimit string) error {
ctName, _, _ := shared.ContainerGetParentAndSnapshotName(s.container.Name())
// resync anything that changed between our first send and the checkpoint
state := s.container.DaemonState()
- return rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(s.container.Path()), conn, nil, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
+ return rsync.Send(project.Prefix(s.container.Project(), ctName), shared.AddSlash(s.container.Path()), &shared.WebsocketIO{Conn: conn}, nil, s.rsyncFeatures, bwlimit, state.OS.ExecPath)
}
func (s rsyncStorageSourceDriver) Cleanup() {
@@ -259,12 +259,12 @@ func rsyncStorageMigrationSink(conn *websocket.Conn, op *operations.Operation, a
return err
}
- wrapper := migration.ProgressWriter(op, "fs_progress", target.Name)
+ wrapper := migration.ProgressTracker(op, "fs_progress", target.Name)
path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name)
path = shared.AddSlash(path)
logger.Debugf("Starting to receive storage volume snapshot %s on storage pool %s into %s", target.Name, pool.Name, path)
- err = rsync.Recv(path, conn, wrapper, args.RsyncFeatures)
+ err = rsync.Recv(path, &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
if err != nil {
return err
}
@@ -276,11 +276,11 @@ func rsyncStorageMigrationSink(conn *websocket.Conn, op *operations.Operation, a
}
}
- wrapper := migration.ProgressWriter(op, "fs_progress", volume.Name)
+ wrapper := migration.ProgressTracker(op, "fs_progress", volume.Name)
path := driver.GetStoragePoolVolumeMountPoint(pool.Name, volume.Name)
path = shared.AddSlash(path)
logger.Debugf("Starting to receive storage volume %s on storage pool %s into %s", volume.Name, pool.Name, path)
- return rsync.Recv(path, conn, wrapper, args.RsyncFeatures)
+ return rsync.Recv(path, &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
}
func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args MigrationSinkArgs) error {
@@ -356,8 +356,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
}
}
- wrapper := migration.ProgressWriter(op, "fs_progress", s.Name())
- if err := rsync.Recv(shared.AddSlash(s.Path()), conn, wrapper, args.RsyncFeatures); err != nil {
+ wrapper := migration.ProgressTracker(op, "fs_progress", s.Name())
+ if err := rsync.Recv(shared.AddSlash(s.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures); err != nil {
return err
}
@@ -371,8 +371,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
}
}
- wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name())
- err = rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures)
+ wrapper := migration.ProgressTracker(op, "fs_progress", args.Instance.Name())
+ err = rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
if err != nil {
return err
}
@@ -409,8 +409,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
}
}
- wrapper := migration.ProgressWriter(op, "fs_progress", snap.GetName())
- err := rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures)
+ wrapper := migration.ProgressTracker(op, "fs_progress", snap.GetName())
+ err := rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
if err != nil {
return err
}
@@ -434,8 +434,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
}
}
- wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name())
- err = rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures)
+ wrapper := migration.ProgressTracker(op, "fs_progress", args.Instance.Name())
+ err = rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
if err != nil {
return err
}
@@ -443,8 +443,8 @@ func rsyncMigrationSink(conn *websocket.Conn, op *operations.Operation, args Mig
if args.Live {
/* now receive the final sync */
- wrapper := migration.ProgressWriter(op, "fs_progress", args.Instance.Name())
- err := rsync.Recv(shared.AddSlash(args.Instance.Path()), conn, wrapper, args.RsyncFeatures)
+ wrapper := migration.ProgressTracker(op, "fs_progress", args.Instance.Name())
+ err := rsync.Recv(shared.AddSlash(args.Instance.Path()), &shared.WebsocketIO{Conn: conn}, wrapper, args.RsyncFeatures)
if err != nil {
return err
}
From 489b82dc0d6ebaf8aea4cacf7af7de4cdc4ce863 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 28 Oct 2019 10:39:46 +0100
Subject: [PATCH 5/7] lxd/rsync: DEBUG
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
lxd/rsync/rsync.go | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/lxd/rsync/rsync.go b/lxd/rsync/rsync.go
index 17cf13f8f6..241915f7ee 100644
--- a/lxd/rsync/rsync.go
+++ b/lxd/rsync/rsync.go
@@ -231,6 +231,7 @@ func Send(name string, path string, conn io.ReadWriteCloser, tracker *ioprogress
// half set up by rsync.Send), putting the contents in the directory specified
// by path.
func Recv(path string, conn io.ReadWriteCloser, tracker *ioprogress.ProgressTracker, features []string) error {
+ logger.Errorf("stgraber: in rsync recv")
args := []string{
"--server",
"-vlogDtpre.iLsfx",
@@ -258,18 +259,23 @@ func Recv(path string, conn io.ReadWriteCloser, tracker *ioprogress.ProgressTrac
return err
}
+ logger.Errorf("stgraber: starting")
err = cmd.Start()
if err != nil {
return err
}
+ logger.Errorf("stgraber: started, waiting")
err = cmd.Wait()
+ logger.Errorf("stgraber: done waiting, reading stderr")
output, err := ioutil.ReadAll(stderr)
+ logger.Errorf("stgraber: done reading stderr")
if err != nil {
logger.Errorf("Rsync receive failed: %s: %s: %s", path, err, string(output))
return err
}
+ logger.Errorf("stgraber: out")
return nil
}
From 9ae17d4816cb97817a4ce2e9e2a666d5f1d9e274 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgraber at ubuntu.com>
Date: Mon, 28 Oct 2019 10:41:10 +0100
Subject: [PATCH 6/7] shared: DEBUG
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Stéphane Graber <stgraber at ubuntu.com>
---
shared/network.go | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git a/shared/network.go b/shared/network.go
index c73677df93..2eb885e3c8 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -370,6 +370,7 @@ type WebsocketIO struct {
}
func (w *WebsocketIO) Read(p []byte) (n int, err error) {
+ logger.Errorf("stgraber: read: entering: %v", w.Conn.LocalAddr())
for {
// First read from this message
if w.reader == nil {
@@ -377,14 +378,18 @@ func (w *WebsocketIO) Read(p []byte) (n int, err error) {
mt, w.reader, err = w.Conn.NextReader()
if err != nil {
+ logger.Errorf("stgraber: read: got bad reader")
return -1, err
}
+ logger.Errorf("stgraber: read: new reader: %v", mt)
if mt == websocket.CloseMessage {
+ logger.Errorf("stgraber: read: got close message")
return 0, io.EOF
}
if mt == websocket.TextMessage {
+ logger.Errorf("stgraber: read: got barrier")
return 0, io.EOF
}
}
@@ -392,38 +397,47 @@ func (w *WebsocketIO) Read(p []byte) (n int, err error) {
// Perform the read itself
n, err := w.reader.Read(p)
if err == io.EOF {
+ logger.Errorf("stgraber: read: got message EOF")
// At the end of the message, reset reader
w.reader = nil
+ logger.Errorf("stgraber: read: EOF forwarded: %d", n)
return n, nil
}
if err != nil {
+ logger.Errorf("stgraber: read: failed to read")
return -1, err
}
+ logger.Errorf("stgraber: read: forwarded: %d", n)
return n, nil
}
}
func (w *WebsocketIO) Write(p []byte) (n int, err error) {
+ logger.Errorf("stgraber: write: entering: %v", w.Conn.LocalAddr())
wr, err := w.Conn.NextWriter(websocket.BinaryMessage)
if err != nil {
+ logger.Errorf("stgraber: write: failed to setup writer: %v", err)
return -1, err
}
n, err = wr.Write(p)
if err != nil {
+ logger.Errorf("stgraber: write: failed to write: %v", err)
return -1, err
}
wr.Close()
+ logger.Errorf("stgraber: write: sent: %d", n)
return n, nil
}
func (w *WebsocketIO) Close() error {
+ logger.Errorf("stgraber: close")
closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
w.Conn.WriteMessage(websocket.CloseMessage, closeMsg)
return w.Conn.Close()
From 1d7e3075bf65bdefc929e2fdf033eb35ced355fe Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Mon, 28 Oct 2019 14:25:06 +0000
Subject: [PATCH 7/7] lxd/rsync/rsync: Updates Send() to work with
ReadWriteClose
Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
lxd/rsync/rsync.go | 36 ++++++++++++++++++++++--------------
1 file changed, 22 insertions(+), 14 deletions(-)
diff --git a/lxd/rsync/rsync.go b/lxd/rsync/rsync.go
index 241915f7ee..86e1248fc8 100644
--- a/lxd/rsync/rsync.go
+++ b/lxd/rsync/rsync.go
@@ -186,30 +186,37 @@ func Send(name string, path string, conn io.ReadWriteCloser, tracker *ioprogress
return err
}
defer netcatConn.Close()
+ defer conn.Close()
- // Setup progress tracker
- readPipe := io.ReadCloser(netcatConn)
+ // Setup progress tracker.
+ readNetcatPipe := io.ReadCloser(netcatConn)
if tracker != nil {
- readPipe = &ioprogress.ProgressReader{netcatConn, tracker}
+ readNetcatPipe = &ioprogress.ProgressReader{netcatConn, tracker}
}
- // Forward from netcat to target
- chCopy := make(chan error, 1)
+ // Forward from netcat to target.
+ chCopyNetcat := make(chan error, 1)
go func() {
- _, err := io.Copy(conn, readPipe)
- chCopy <- err
+ _, err := io.Copy(conn, readNetcatPipe)
+ chCopyNetcat <- err
+ readNetcatPipe.Close()
}()
- // Wait for rsync to complete
+ // Forward from target to netcat.
+ writeNetcatPipe := io.WriteCloser(netcatConn)
+ chCopyTarget := make(chan error, 1)
+ go func() {
+ _, err := io.Copy(writeNetcatPipe, conn)
+ chCopyTarget <- err
+ writeNetcatPipe.Close()
+ }()
+
+ // Wait for rsync to complete.
chError := make(chan error, 1)
go func() {
err = cmd.Wait()
- if err != nil {
- netcatConn.Close()
- readPipe.Close()
- }
-
chError <- err
+ netcatConn.Close()
}()
output, err := ioutil.ReadAll(stderr)
@@ -222,7 +229,8 @@ func Send(name string, path string, conn io.ReadWriteCloser, tracker *ioprogress
logger.Errorf("Rsync send failed: %s: %s: %s", path, err, string(output))
}
- <-chCopy
+ <-chCopyNetcat
+ <-chCopyTarget
return err
}
More information about the lxc-devel
mailing list