[lxc-devel] [lxd/master] Use buffers for zfs btrfs send

tych0 on Github lxc-bot at linuxcontainers.org
Tue May 31 20:38:18 UTC 2016


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 1130 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20160531/668427b9/attachment.bin>
-------------- next part --------------
From a72bce375117aa73895b61b504600c4019fb5a82 Mon Sep 17 00:00:00 2001
From: Tycho Andersen <tycho.andersen at canonical.com>
Date: Mon, 23 May 2016 15:21:23 -0600
Subject: [PATCH 1/2] use some buffering for zfs/btrfs send

For now, let's use a 4MB buffer.

Signed-off-by: Tycho Andersen <tycho.andersen at canonical.com>
---
 client.go             |  4 ++--
 lxd/container_exec.go |  2 +-
 lxd/storage_btrfs.go  |  2 +-
 lxd/storage_zfs.go    |  2 +-
 shared/network.go     | 10 +++++++---
 shared/util.go        | 22 ++++++++++++++++------
 6 files changed, 28 insertions(+), 14 deletions(-)

diff --git a/client.go b/client.go
index d1e08cb..213b9cb 100644
--- a/client.go
+++ b/client.go
@@ -1450,7 +1450,7 @@ func (c *Client) Exec(name string, cmd []string, env map[string]string,
 			return -1, err
 		}
 
-		shared.WebsocketSendStream(conn, stdin)
+		shared.WebsocketSendStream(conn, stdin, -1)
 		<-shared.WebsocketRecvStream(stdout, conn)
 		conn.Close()
 
@@ -1464,7 +1464,7 @@ func (c *Client) Exec(name string, cmd []string, env map[string]string,
 		}
 		defer conns[0].Close()
 
-		dones[0] = shared.WebsocketSendStream(conns[0], stdin)
+		dones[0] = shared.WebsocketSendStream(conns[0], stdin, -1)
 
 		outputs := []io.WriteCloser{stdout, stderr}
 		for i := 1; i < 3; i++ {
diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index 2e70605..71aee79 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -204,7 +204,7 @@ func (s *execWs) Do(op *operation) error {
 					<-shared.WebsocketRecvStream(ttys[i], s.conns[i])
 					ttys[i].Close()
 				} else {
-					<-shared.WebsocketSendStream(s.conns[i], ptys[i])
+					<-shared.WebsocketSendStream(s.conns[i], ptys[i], -1)
 					ptys[i].Close()
 					wgEOF.Done()
 				}
diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go
index e9903c0..298864d 100644
--- a/lxd/storage_btrfs.go
+++ b/lxd/storage_btrfs.go
@@ -850,7 +850,7 @@ func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string
 		return err
 	}
 
-	<-shared.WebsocketSendStream(conn, stdout)
+	<-shared.WebsocketSendStream(conn, stdout, 4 * 1024 * 1024)
 
 	output, err := ioutil.ReadAll(stderr)
 	if err != nil {
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index d138918..3604447 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -1227,7 +1227,7 @@ func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zf
 		return err
 	}
 
-	<-shared.WebsocketSendStream(conn, stdout)
+	<-shared.WebsocketSendStream(conn, stdout, 4 * 1024 * 1024)
 
 	output, err := ioutil.ReadAll(stderr)
 	if err != nil {
diff --git a/shared/network.go b/shared/network.go
index f64e5fe..08e7019 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -115,7 +115,7 @@ func IsLoopback(iface *net.Interface) bool {
 	return int(iface.Flags&net.FlagLoopback) > 0
 }
 
-func WebsocketSendStream(conn *websocket.Conn, r io.Reader) chan bool {
+func WebsocketSendStream(conn *websocket.Conn, r io.Reader, bufferSize int) chan bool {
 	ch := make(chan bool)
 
 	if r == nil {
@@ -124,7 +124,7 @@ func WebsocketSendStream(conn *websocket.Conn, r io.Reader) chan bool {
 	}
 
 	go func(conn *websocket.Conn, r io.Reader) {
-		in := ReaderToChannel(r)
+		in := ReaderToChannel(r, bufferSize)
 		for {
 			buf, ok := <-in
 			if !ok {
@@ -244,7 +244,11 @@ func WebsocketMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser) (c
 	}(conn, w)
 
 	go func(conn *websocket.Conn, r io.ReadCloser) {
-		in := ReaderToChannel(r)
+		/* For now, we don't need to adjust buffer sizes in
+		 * WebsocketMirror, since it's used for interactive things like
+		 * exec.
+		 */
+		in := ReaderToChannel(r, -1)
 		for {
 			buf, ok := <-in
 			if !ok {
diff --git a/shared/util.go b/shared/util.go
index a71cb91..9e5d5ab 100644
--- a/shared/util.go
+++ b/shared/util.go
@@ -129,16 +129,26 @@ func ReadToJSON(r io.Reader, req interface{}) error {
 	return json.Unmarshal(buf, req)
 }
 
-func ReaderToChannel(r io.Reader) <-chan []byte {
+func ReaderToChannel(r io.Reader, bufferSize int) <-chan []byte {
+	if bufferSize <= 128 * 1024 {
+		bufferSize = 128 * 1024
+	}
+
 	ch := make(chan ([]byte))
 
 	go func() {
+		readSize := 128 * 1024
+		offset := 0
+		buf := make([]byte, bufferSize)
+
 		for {
-			/* io.Copy uses a 32KB buffer, so we might as well too. */
-			buf := make([]byte, 32*1024)
-			nr, err := r.Read(buf)
-			if nr > 0 {
-				ch <- buf[0:nr]
+			read := buf[offset:offset+readSize]
+			nr, err := r.Read(read)
+			offset += nr
+			if offset + readSize >= bufferSize || err != nil {
+				ch <- buf[0:offset]
+				offset = 0
+				buf = make([]byte, bufferSize)
 			}
 
 			if err != nil {

From f4ea4b29559561f246eac991312c82dda8bfc350 Mon Sep 17 00:00:00 2001
From: Tycho Andersen <tycho.andersen at canonical.com>
Date: Tue, 24 May 2016 16:06:13 +0000
Subject: [PATCH 2/2] add a test for ReaderToChannel

Signed-off-by: Tycho Andersen <tycho.andersen at canonical.com>
---
 shared/util_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git a/shared/util_test.go b/shared/util_test.go
index 33e12b7..d5927ea 100644
--- a/shared/util_test.go
+++ b/shared/util_test.go
@@ -1,8 +1,10 @@
 package shared
 
 import (
+	"bytes"
 	"fmt"
 	"io/ioutil"
+	"math/rand"
 	"os"
 	"strings"
 	"testing"
@@ -100,3 +102,43 @@ func TestReadLastNLines(t *testing.T) {
 		}
 	}
 }
+
+func TestReaderToChannel(t *testing.T) {
+	buf := make([]byte, 64 * 1024 * 1024)
+	rand.Read(buf)
+
+	offset := 0
+	finished := false
+
+	ch := ReaderToChannel(bytes.NewBuffer(buf), -1)
+	for {
+		data, ok := <-ch
+		if len(data) > 0 {
+			for i := 0; i < len(data); i++ {
+				if buf[offset+i] != data[i] {
+					t.Error(fmt.Sprintf("byte %d didn't match", offset+i))
+					return
+				}
+			}
+
+			offset += len(data)
+			if offset > len(buf) {
+				t.Error("read too much data")
+				return
+			}
+
+			if offset == len(buf) {
+				finished = true
+			}
+		}
+
+		if !ok {
+			if !finished {
+				t.Error("connection closed too early")
+				return
+			} else {
+				break
+			}
+		}
+	}
+}


More information about the lxc-devel mailing list