[lxc-devel] [lxd/master] shared: add WebsocketExecMirror()

brauner on Github lxc-bot at linuxcontainers.org
Thu Dec 8 13:53:12 UTC 2016


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 813 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20161208/1beff061/attachment.bin>
-------------- next part --------------
From 1516320b999ef63673c6845968b1cf27935b0799 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 8 Dec 2016 14:34:50 +0100
Subject: [PATCH 1/2] shared: add function to poll() on file descriptor

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 shared/util_linux.go | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/shared/util_linux.go b/shared/util_linux.go
index 51e0e43..29465d8 100644
--- a/shared/util_linux.go
+++ b/shared/util_linux.go
@@ -25,6 +25,7 @@ import (
 #include <errno.h>
 #include <fcntl.h>
 #include <limits.h>
+#include <poll.h>
 #include <string.h>
 #include <stdio.h>
 
@@ -181,9 +182,25 @@ int shiftowner(char *basepath, char *path, int uid, int gid) {
 	close(fd);
 	return 0;
 }
+
+int fd_has_data(int lfd)
+{
+	struct pollfd pfd = {lfd, POLLIN, 0};
+	int ret;
+
+	ret = poll(&pfd, 1, 0);
+	if (ret < 0)
+		fprintf(stderr, "Failed to poll() on file descriptor.\n");
+
+	return ret;
+}
 */
 import "C"
 
+func FdHasData(fd int) int {
+	return int(C.fd_has_data(C.int(fd)))
+}
+
 func ShiftOwner(basepath string, path string, uid int, gid int) error {
 	cbasepath := C.CString(basepath)
 	defer C.free(unsafe.Pointer(cbasepath))

From a6b0dc3d82d27f176e93859add016354f5ddc0fd Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 8 Dec 2016 14:46:10 +0100
Subject: [PATCH 2/2] shared: add WebsocketExecMirror()

This function specifically deals with shoveling data from an interactive lxc
exec session. It allows to detect background processes. The idea is to detect
when the attached child has exited. We can assume it has only exited after it
was able to write everything it wanted to write into the tty buffer. That means
at maximum output the size of one tty buffer still belongs to the attached
child. Everything afterwards belongs to another (background) process. So we dump
65536 bytes at maximum and then exit.

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 lxd/container_exec.go |   7 ++-
 shared/network.go     | 123 ++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 129 insertions(+), 1 deletion(-)

diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index a070d3a..be30f51 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -141,6 +141,7 @@ func (s *execWs) Do(op *operation) error {
 
 	controlExit := make(chan bool)
 	receivePid := make(chan int)
+	attachedChildIsDead := make(chan bool, 1)
 	var wgEOF sync.WaitGroup
 
 	if s.interactive {
@@ -206,13 +207,15 @@ func (s *execWs) Do(op *operation) error {
 				}
 			}
 		}()
+
 		go func() {
-			readDone, writeDone := shared.WebsocketMirror(s.conns[0], ptys[0], ptys[0])
+			readDone, writeDone := shared.WebsocketExecMirror(s.conns[0], ptys[0], ptys[0], attachedChildIsDead, int(ptys[0].Fd()))
 			<-readDone
 			<-writeDone
 			s.conns[0].Close()
 			wgEOF.Done()
 		}()
+
 	} else {
 		wgEOF.Add(len(ttys) - 1)
 		for i := 0; i < len(ttys); i++ {
@@ -242,6 +245,8 @@ func (s *execWs) Do(op *operation) error {
 			s.conns[-1].Close()
 		}
 
+		attachedChildIsDead <- true
+
 		wgEOF.Wait()
 
 		for _, pty := range ptys {
diff --git a/shared/network.go b/shared/network.go
index ebc79c9..aa5859e 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -308,3 +308,126 @@ func WebsocketMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser) (c
 
 	return readDone, writeDone
 }
+
+func WebsocketExecMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser, exited chan bool, fd int) (chan bool, chan bool) {
+	readDone := make(chan bool, 1)
+	writeDone := make(chan bool, 1)
+
+	go func(conn *websocket.Conn, w io.WriteCloser) {
+		for {
+
+			mt, r, err := conn.NextReader()
+			if err != nil {
+				LogDebugf("Got error getting next reader %s, %s", err, w)
+				break
+			}
+
+			if mt == websocket.CloseMessage {
+				LogDebugf("Got close message for reader")
+				break
+			}
+
+			if mt == websocket.TextMessage {
+				LogDebugf("Got message barrier, resetting stream")
+				break
+			}
+
+			buf, err := ioutil.ReadAll(r)
+			if err != nil {
+				LogDebugf("Got error writing to writer %s", err)
+				break
+			}
+			i, err := w.Write(buf)
+			if i != len(buf) {
+				LogDebugf("Didn't write all of buf")
+				break
+			}
+			if err != nil {
+				LogDebugf("Error writing buf %s", err)
+				break
+			}
+		}
+		writeDone <- true
+		w.Close()
+	}(conn, w)
+
+	go func(conn *websocket.Conn, r io.ReadCloser) {
+		/* For now, we don't need to adjust buffer sizes in
+		 * WebsocketMirror, since it's used for interactive things like
+		 * exec.
+		 */
+		in := ReaderToChannel(r, -1)
+		written := 0
+		out := false
+		for {
+			var buf []byte
+			var ok bool
+
+			select {
+			case buf, ok = <-in:
+				if !ok {
+					r.Close()
+					LogDebugf("sending write barrier")
+					conn.WriteMessage(websocket.TextMessage, []byte{})
+					readDone <- true
+					return
+				}
+				if out {
+					/* If the attached child exited and a
+					* background process is still holding
+					* stdout open, we can assume that one
+					* full tty output buffer at maximum
+					* still holds output from the attached
+					* child so we spew that out. Everything
+					* after this is from the background
+					* process. The default buffer size seems
+					* to be 65536. Maybe we'll come up with
+					* a smarter way of handling this later.
+					*/
+					written += len(buf)
+					if written > 65536 {
+						r.Close()
+						LogDebugf("sending write barrier")
+						conn.WriteMessage(websocket.TextMessage, []byte{})
+						readDone <- true
+						return
+					}
+				}
+				break
+			case <-exited:
+				out = true
+				/* In case the attached child has exited before
+				* all data from the tty input or output buffer
+				* has been read, FdHasData() will always return
+				* 1.
+				*/
+				if FdHasData(fd) == 0 {
+					closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
+					conn.WriteMessage(websocket.CloseMessage, closeMsg)
+					readDone <- true
+					r.Close()
+					return
+				}
+			}
+
+			w, err := conn.NextWriter(websocket.BinaryMessage)
+			if err != nil {
+				LogDebugf("Got error getting next writer %s", err)
+				break
+			}
+
+			_, err = w.Write(buf)
+			w.Close()
+			if err != nil {
+				LogDebugf("Got err writing %s", err)
+				break
+			}
+		}
+		closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
+		conn.WriteMessage(websocket.CloseMessage, closeMsg)
+		readDone <- true
+		r.Close()
+	}(conn, r)
+
+	return readDone, writeDone
+}


More information about the lxc-devel mailing list