[lxc-devel] [lxd/master] Use buffers for zfs btrfs send
tych0 on Github
lxc-bot at linuxcontainers.org
Tue May 31 20:38:18 UTC 2016
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 1130 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20160531/668427b9/attachment.bin>
-------------- next part --------------
From a72bce375117aa73895b61b504600c4019fb5a82 Mon Sep 17 00:00:00 2001
From: Tycho Andersen <tycho.andersen at canonical.com>
Date: Mon, 23 May 2016 15:21:23 -0600
Subject: [PATCH 1/2] use some buffering for zfs/btrfs send
For now, let's use a 4MB buffer.
Signed-off-by: Tycho Andersen <tycho.andersen at canonical.com>
---
client.go | 4 ++--
lxd/container_exec.go | 2 +-
lxd/storage_btrfs.go | 2 +-
lxd/storage_zfs.go | 2 +-
shared/network.go | 10 +++++++---
shared/util.go | 22 ++++++++++++++++------
6 files changed, 28 insertions(+), 14 deletions(-)
diff --git a/client.go b/client.go
index d1e08cb..213b9cb 100644
--- a/client.go
+++ b/client.go
@@ -1450,7 +1450,7 @@ func (c *Client) Exec(name string, cmd []string, env map[string]string,
return -1, err
}
- shared.WebsocketSendStream(conn, stdin)
+ shared.WebsocketSendStream(conn, stdin, -1)
<-shared.WebsocketRecvStream(stdout, conn)
conn.Close()
@@ -1464,7 +1464,7 @@ func (c *Client) Exec(name string, cmd []string, env map[string]string,
}
defer conns[0].Close()
- dones[0] = shared.WebsocketSendStream(conns[0], stdin)
+ dones[0] = shared.WebsocketSendStream(conns[0], stdin, -1)
outputs := []io.WriteCloser{stdout, stderr}
for i := 1; i < 3; i++ {
diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index 2e70605..71aee79 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -204,7 +204,7 @@ func (s *execWs) Do(op *operation) error {
<-shared.WebsocketRecvStream(ttys[i], s.conns[i])
ttys[i].Close()
} else {
- <-shared.WebsocketSendStream(s.conns[i], ptys[i])
+ <-shared.WebsocketSendStream(s.conns[i], ptys[i], -1)
ptys[i].Close()
wgEOF.Done()
}
diff --git a/lxd/storage_btrfs.go b/lxd/storage_btrfs.go
index e9903c0..298864d 100644
--- a/lxd/storage_btrfs.go
+++ b/lxd/storage_btrfs.go
@@ -850,7 +850,7 @@ func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string
return err
}
- <-shared.WebsocketSendStream(conn, stdout)
+ <-shared.WebsocketSendStream(conn, stdout, 4 * 1024 * 1024)
output, err := ioutil.ReadAll(stderr)
if err != nil {
diff --git a/lxd/storage_zfs.go b/lxd/storage_zfs.go
index d138918..3604447 100644
--- a/lxd/storage_zfs.go
+++ b/lxd/storage_zfs.go
@@ -1227,7 +1227,7 @@ func (s *zfsMigrationSourceDriver) send(conn *websocket.Conn, zfsName string, zf
return err
}
- <-shared.WebsocketSendStream(conn, stdout)
+ <-shared.WebsocketSendStream(conn, stdout, 4 * 1024 * 1024)
output, err := ioutil.ReadAll(stderr)
if err != nil {
diff --git a/shared/network.go b/shared/network.go
index f64e5fe..08e7019 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -115,7 +115,7 @@ func IsLoopback(iface *net.Interface) bool {
return int(iface.Flags&net.FlagLoopback) > 0
}
-func WebsocketSendStream(conn *websocket.Conn, r io.Reader) chan bool {
+func WebsocketSendStream(conn *websocket.Conn, r io.Reader, bufferSize int) chan bool {
ch := make(chan bool)
if r == nil {
@@ -124,7 +124,7 @@ func WebsocketSendStream(conn *websocket.Conn, r io.Reader) chan bool {
}
go func(conn *websocket.Conn, r io.Reader) {
- in := ReaderToChannel(r)
+ in := ReaderToChannel(r, bufferSize)
for {
buf, ok := <-in
if !ok {
@@ -244,7 +244,11 @@ func WebsocketMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser) (c
}(conn, w)
go func(conn *websocket.Conn, r io.ReadCloser) {
- in := ReaderToChannel(r)
+ /* For now, we don't need to adjust buffer sizes in
+ * WebsocketMirror, since it's used for interactive things like
+ * exec.
+ */
+ in := ReaderToChannel(r, -1)
for {
buf, ok := <-in
if !ok {
diff --git a/shared/util.go b/shared/util.go
index a71cb91..9e5d5ab 100644
--- a/shared/util.go
+++ b/shared/util.go
@@ -129,16 +129,26 @@ func ReadToJSON(r io.Reader, req interface{}) error {
return json.Unmarshal(buf, req)
}
-func ReaderToChannel(r io.Reader) <-chan []byte {
+func ReaderToChannel(r io.Reader, bufferSize int) <-chan []byte {
+ if bufferSize <= 128 * 1024 {
+ bufferSize = 128 * 1024
+ }
+
ch := make(chan ([]byte))
go func() {
+ readSize := 128 * 1024
+ offset := 0
+ buf := make([]byte, bufferSize)
+
for {
- /* io.Copy uses a 32KB buffer, so we might as well too. */
- buf := make([]byte, 32*1024)
- nr, err := r.Read(buf)
- if nr > 0 {
- ch <- buf[0:nr]
+ read := buf[offset:offset+readSize]
+ nr, err := r.Read(read)
+ offset += nr
+ if offset + readSize >= bufferSize || err != nil {
+ ch <- buf[0:offset]
+ offset = 0
+ buf = make([]byte, bufferSize)
}
if err != nil {
From f4ea4b29559561f246eac991312c82dda8bfc350 Mon Sep 17 00:00:00 2001
From: Tycho Andersen <tycho.andersen at canonical.com>
Date: Tue, 24 May 2016 16:06:13 +0000
Subject: [PATCH 2/2] add a test for ReaderToChannel
Signed-off-by: Tycho Andersen <tycho.andersen at canonical.com>
---
shared/util_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++
1 file changed, 42 insertions(+)
diff --git a/shared/util_test.go b/shared/util_test.go
index 33e12b7..d5927ea 100644
--- a/shared/util_test.go
+++ b/shared/util_test.go
@@ -1,8 +1,10 @@
package shared
import (
+ "bytes"
"fmt"
"io/ioutil"
+ "math/rand"
"os"
"strings"
"testing"
@@ -100,3 +102,43 @@ func TestReadLastNLines(t *testing.T) {
}
}
}
+
+func TestReaderToChannel(t *testing.T) {
+ buf := make([]byte, 64 * 1024 * 1024)
+ rand.Read(buf)
+
+ offset := 0
+ finished := false
+
+ ch := ReaderToChannel(bytes.NewBuffer(buf), -1)
+ for {
+ data, ok := <-ch
+ if len(data) > 0 {
+ for i := 0; i < len(data); i++ {
+ if buf[offset+i] != data[i] {
+ t.Error(fmt.Sprintf("byte %d didn't match", offset+i))
+ return
+ }
+ }
+
+ offset += len(data)
+ if offset > len(buf) {
+ t.Error("read too much data")
+ return
+ }
+
+ if offset == len(buf) {
+ finished = true
+ }
+ }
+
+ if !ok {
+ if !finished {
+ t.Error("connection closed too early")
+ return
+ } else {
+ break
+ }
+ }
+ }
+}
More information about the lxc-devel
mailing list