[lxc-devel] [lxd/master] exec: detect background tasks to allow clean exit

brauner on Github lxc-bot at linuxcontainers.org
Sat Dec 10 17:14:00 UTC 2016


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 969 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20161210/92cbf147/attachment.bin>
-------------- next part --------------
From ecb6b48b4ea0b50bd2db8ade8b6fcc32c59e9a9b Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 8 Dec 2016 14:34:50 +0100
Subject: [PATCH 1/4] shared/util_linux: add GetPollRevents()

It's a wrapper around a C function that polls on a single file descriptor and
returns the status and the detected revents.

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 shared/util_linux.go | 29 +++++++++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/shared/util_linux.go b/shared/util_linux.go
index 51e0e43..0a22091 100644
--- a/shared/util_linux.go
+++ b/shared/util_linux.go
@@ -25,6 +25,7 @@ import (
 #include <errno.h>
 #include <fcntl.h>
 #include <limits.h>
+#include <poll.h>
 #include <string.h>
 #include <stdio.h>
 
@@ -181,9 +182,37 @@ int shiftowner(char *basepath, char *path, int uid, int gid) {
 	close(fd);
 	return 0;
 }
+
+int get_poll_revents(int lfd, int timeout, int flags, int *revents)
+{
+	int ret;
+	struct pollfd pfd = {lfd, flags, 0};
+
+	ret = poll(&pfd, 1, timeout);
+	if (ret < 0) {
+		fprintf(stderr, "Failed to poll() on file descriptor.\n");
+		return -1;
+	}
+
+	*revents = pfd.revents;
+
+	return ret;
+}
 */
 import "C"
 
+const POLLIN int = C.POLLIN
+const POLLPRI int = C.POLLPRI
+const POLLNVAL int = C.POLLNVAL
+const POLLERR int = C.POLLERR
+const POLLHUP int = C.POLLHUP
+
+func GetPollRevents(fd int, timeout int, flags int) (int, int) {
+	revents := C.int(0)
+	ret := C.get_poll_revents(C.int(fd), C.int(timeout), C.int(flags), &revents)
+	return int(ret), int(revents)
+}
+
 func ShiftOwner(basepath string, path string, uid int, gid int) error {
 	cbasepath := C.CString(basepath)
 	defer C.free(unsafe.Pointer(cbasepath))

From 538b5606dfcafe73ac9f741259a67f1b5adb4f0d Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Sat, 10 Dec 2016 12:41:13 +0100
Subject: [PATCH 2/4] exec: detect background tasks to allow clean exit

This adds the function ExecReaderToChannel() to our linux specific utility
functions. It is the workhorse that reads from the master side of a pty file
descriptor. Background tasks are identified correctly. That means you can run:

	chb at conventiont|~/source/go/bin
	> ./lxc exec xen1 -- bash
	root at xen1:~# sleep infinity &

or

	chb at conventiont|~/source/go/bin
	> ./lxc exec xen1 -- bash
	root at xen1:~# yes &
	[1] 290
	root at xen1:~# y
	root at xen1:~# y
	root at xen1:~# y
	.
	.
	.

and still cleanly exit via "Ctrl+D" or "exit". The function is explained in
detail directly in the code.

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 shared/util_linux.go | 159 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 159 insertions(+)

diff --git a/shared/util_linux.go b/shared/util_linux.go
index 0a22091..e052a66 100644
--- a/shared/util_linux.go
+++ b/shared/util_linux.go
@@ -6,9 +6,11 @@ package shared
 import (
 	"errors"
 	"fmt"
+	"io"
 	"os"
 	"os/exec"
 	"strings"
+	"sync"
 	"syscall"
 	"unsafe"
 )
@@ -511,3 +513,160 @@ func GetAllXattr(path string) (xattrs map[string]string, err error) {
 
 	return xattrs, nil
 }
+
+// Extensively commented directly in the code. Please leave the comments!
+// Looking at this in a couple of months noone will know why and how this works
+// anymore.
+func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan bool, fd int) <-chan []byte {
+	if bufferSize <= (128 * 1024) {
+		bufferSize = (128 * 1024)
+	}
+
+	mutex := &sync.Mutex{}
+	chanClosed := false
+	ch := make(chan ([]byte))
+	// COMMENT(brauner):
+	// I'm not sure the mutex is actually needed since the two functions
+	// below shouldn't race, since GetPollRevents() in the first function is
+	// either fired off when the second function is blocking on
+	// GetPollRevents() or is fired off asynchronously and races (A case the
+	// kernel will handle for us, unless the poll() API is broken.) with
+	// GetPollRevents() in the second function but the first function will
+	// only do something interesting when GetPollRevents() observes *no*
+	// events whereas the second function will only do something interesting
+	// when it observes *some* event. But better safe than sorry.
+	onReturn := func() {
+		if !chanClosed {
+			mutex.Lock()
+			chanClosed = true
+			close(ch)
+			mutex.Unlock()
+		}
+	}
+
+	// COMMENT(brauner):
+	// This function has just one job: Dealing with the case where we are
+	// running an interactive shell session where we put a process in the
+	// background that does hold stdin/stdout open, but does not generate
+	// any output at all. This case cannot be dealt with in the following
+	// function call. Here's why: Assume the above case, now the attached
+	// child (the shell in this example) exits. This will not generate any
+	// poll() event: We won't get POLLHUP because the background process is
+	// holding stdin/stdout open and noone is writing to it. So we
+	// effectively block on GetPollRevents() in the default case of the
+	// select statement in the function below. This means, we will never
+	// reach the <-exited case where we could fire-off another zero timeout
+	// GetPollRevents() to detect that noone has written to stdin/stout and
+	// simply exit. Hence, we use another go routine here who's only job is
+	// to handle that case: When we detect that the child has exited we
+	// check whether a POLLIN or POLLHUP event has been generated. If not,
+	// we know that there's nothing buffered on stdout and exit.
+	go func() {
+		<-exited
+		ret, _ := GetPollRevents(fd, (POLLIN | POLLHUP), 0)
+		if ret < 0 {
+			LogErrorf("Failed to poll(POLLIN | POLLHUP) on file descriptor: exiting.")
+		} else if ret == 0 {
+			LogDebugf("No more data in stdout: exiting.")
+			onReturn()
+			return
+		}
+	}()
+
+	go func() {
+		readSize := (128 * 1024)
+		offset := 0
+		buf := make([]byte, bufferSize)
+		attachedChildIsDead := false
+
+		defer onReturn()
+		for {
+			select {
+			case <-exited:
+				attachedChildIsDead = true
+			default:
+				nr := 0
+				var err error
+
+				ret, revents := GetPollRevents(fd, -1, (POLLIN | POLLPRI | POLLERR | POLLHUP))
+				if ret < 0 {
+					LogErrorf("Failed to poll(POLLIN | POLLPRI | POLLERR | POLLHUP) on file descriptor: exiting.")
+					return
+				} else {
+
+					// If the process exits before all its data has been read by us and no other
+					// process holds stdin or stdout open, then we will observe a
+					// (POLLHUP | POLLIN) event. This means, we need to keep on reading from the
+					// pty file descriptor until we get a simple POLLHUP back.
+					both := ((revents & (POLLHUP | POLLIN)) == (POLLHUP | POLLIN))
+					if both {
+						LogDebugf("Detected poll(POLLHUP | POLLIN) event.")
+						read := buf[offset : offset+readSize]
+						nr, err = r.Read(read)
+					}
+
+					if (revents & POLLERR) > 0 {
+						LogWarnf("Detected poll(POLLERR) event: exiting.")
+						return
+					}
+
+					if ((revents & POLLIN) > 0) && !both {
+						// COMMENT(brauner):
+						// This might appear unintuitive at first but is actually a
+						// nice trick: Assume we are running a shell session in a
+						// container and put a process in the background that is
+						// writing to stdout. Now assume the attached process (aka the
+						// shell in this example) exits because we used Ctrl+D to
+						// send EOF or something. If no other process would be
+						// holding stdout open we would expect to observe either a
+						// (POLLHUP | POLLIN) event if there is still data buffered
+						// from the previous process or a simple POLLHUP if no data
+						// is buffered. The fact that we only observe a POLLIN event
+						// means that another process is holding stdout open and is
+						// writing to it.
+						// One counter argument that can be leveraged is
+						// (brauner looks at tycho :)) "Hey, you need to write at
+						// least one additional tty buffer to make sure that
+						// everything that the attached child has written is actually
+						// shown." The answer to that is:
+						// "I'd argue that this codepath is only hit when we are
+						// running an interactive session and the shell will
+						// run commands consecutively since only one command can be
+						// actively in the foreground.  So Ctrl+D can't cause a race
+						// since it will only be interpreted after the other
+						// command has stopped. This of course is different, if the
+						// attached child process exits before all output has been
+						// read by us but this codepath would trigger a
+						// (POLLHUP | POLLIN) event (See argument above) and is hence dealt
+						// with in another code path. So if I'm right this logic is
+						// not affected by either delay or frequency of writing to
+						// stdout.
+						if attachedChildIsDead {
+							LogDebugf("Exiting but background processes are still running.")
+							return
+						}
+						read := buf[offset : offset+readSize]
+						nr, err = r.Read(read)
+					}
+
+					// COMMENT(brauner):
+					// The attached process has exited and we have read all data that may have
+					// been buffered.
+					if ((revents & POLLHUP) > 0) && !both {
+						LogDebugf("Detected poll(POLLHUP) event: exiting.")
+						return
+					}
+				}
+
+				offset += nr
+				if offset > 0 && (offset+readSize >= bufferSize || err != nil) {
+					ch <- buf[0:offset]
+					offset = 0
+					buf = make([]byte, bufferSize)
+				}
+			}
+		}
+	}()
+
+	return ch
+}

From 6586dcee808eb0afd9e4aa116cef02cecd37881c Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 8 Dec 2016 14:46:10 +0100
Subject: [PATCH 3/4] shared/network: ad WebsocketExecMirror()

This function specifically deals with running commands attached to a pty in the
container.

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 lxd/container_exec.go |   7 ++-
 lxd/rsync.go          |   4 +-
 shared/network.go     | 130 ++++++++++++++++++++++++++++++++++++--------------
 3 files changed, 103 insertions(+), 38 deletions(-)

diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index a070d3a..be30f51 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -141,6 +141,7 @@ func (s *execWs) Do(op *operation) error {
 
 	controlExit := make(chan bool)
 	receivePid := make(chan int)
+	attachedChildIsDead := make(chan bool, 1)
 	var wgEOF sync.WaitGroup
 
 	if s.interactive {
@@ -206,13 +207,15 @@ func (s *execWs) Do(op *operation) error {
 				}
 			}
 		}()
+
 		go func() {
-			readDone, writeDone := shared.WebsocketMirror(s.conns[0], ptys[0], ptys[0])
+			readDone, writeDone := shared.WebsocketExecMirror(s.conns[0], ptys[0], ptys[0], attachedChildIsDead, int(ptys[0].Fd()))
 			<-readDone
 			<-writeDone
 			s.conns[0].Close()
 			wgEOF.Done()
 		}()
+
 	} else {
 		wgEOF.Add(len(ttys) - 1)
 		for i := 0; i < len(ttys); i++ {
@@ -242,6 +245,8 @@ func (s *execWs) Do(op *operation) error {
 			s.conns[-1].Close()
 		}
 
+		attachedChildIsDead <- true
+
 		wgEOF.Wait()
 
 		for _, pty := range ptys {
diff --git a/lxd/rsync.go b/lxd/rsync.go
index 407fbcf..5be1354 100644
--- a/lxd/rsync.go
+++ b/lxd/rsync.go
@@ -102,7 +102,7 @@ func RsyncSend(path string, conn *websocket.Conn, readWrapper func(io.ReadCloser
 		readPipe = readWrapper(dataSocket)
 	}
 
-	readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, readPipe)
+	readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, readPipe, nil, nil)
 
 	output, err := ioutil.ReadAll(stderr)
 	if err != nil {
@@ -157,7 +157,7 @@ func RsyncRecv(path string, conn *websocket.Conn, writeWrapper func(io.WriteClos
 		writePipe = writeWrapper(stdin)
 	}
 
-	readDone, writeDone := shared.WebsocketMirror(conn, writePipe, stdout)
+	readDone, writeDone := shared.WebsocketMirror(conn, writePipe, stdout, nil, nil)
 	data, err2 := ioutil.ReadAll(stderr)
 	if err2 != nil {
 		shared.LogDebugf("error reading rsync stderr: %s", err2)
diff --git a/shared/network.go b/shared/network.go
index ebc79c9..705e88f 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -227,57 +227,117 @@ func WebsocketRecvStream(w io.Writer, conn *websocket.Conn) chan bool {
 	return ch
 }
 
+func defaultReader(conn *websocket.Conn, r io.ReadCloser, readDone chan<- bool) {
+	/* For now, we don't need to adjust buffer sizes in
+	* WebsocketMirror, since it's used for interactive things like
+	* exec.
+	 */
+	in := ReaderToChannel(r, -1)
+	for {
+		buf, ok := <-in
+		if !ok {
+			r.Close()
+			LogDebugf("sending write barrier")
+			conn.WriteMessage(websocket.TextMessage, []byte{})
+			readDone <- true
+			return
+		}
+		w, err := conn.NextWriter(websocket.BinaryMessage)
+		if err != nil {
+			LogDebugf("Got error getting next writer %s", err)
+			break
+		}
+
+		_, err = w.Write(buf)
+		w.Close()
+		if err != nil {
+			LogDebugf("Got err writing %s", err)
+			break
+		}
+	}
+	closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
+	conn.WriteMessage(websocket.CloseMessage, closeMsg)
+	readDone <- true
+	r.Close()
+}
+
+func defaultWriter(conn *websocket.Conn, w io.WriteCloser, writeDone chan<- bool) {
+	for {
+		mt, r, err := conn.NextReader()
+		if err != nil {
+			LogDebugf("Got error getting next reader %s, %s", err, w)
+			break
+		}
+
+		if mt == websocket.CloseMessage {
+			LogDebugf("Got close message for reader")
+			break
+		}
+
+		if mt == websocket.TextMessage {
+			LogDebugf("Got message barrier, resetting stream")
+			break
+		}
+
+		buf, err := ioutil.ReadAll(r)
+		if err != nil {
+			LogDebugf("Got error writing to writer %s", err)
+			break
+		}
+		i, err := w.Write(buf)
+		if i != len(buf) {
+			LogDebugf("Didn't write all of buf")
+			break
+		}
+		if err != nil {
+			LogDebugf("Error writing buf %s", err)
+			break
+		}
+	}
+	writeDone <- true
+	w.Close()
+}
+
 // WebsocketMirror allows mirroring a reader to a websocket and taking the
 // result and writing it to a writer. This function allows for multiple
 // mirrorings and correctly negotiates stream endings. However, it means any
 // websocket.Conns passed to it are live when it returns, and must be closed
 // explicitly.
-func WebsocketMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser) (chan bool, chan bool) {
+type WebSocketMirrorReader func(conn *websocket.Conn, r io.ReadCloser, readDone chan<- bool)
+type WebSocketMirrorWriter func(conn *websocket.Conn, w io.WriteCloser, writeDone chan<- bool)
+
+func WebsocketMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser, Reader WebSocketMirrorReader, Writer WebSocketMirrorWriter) (chan bool, chan bool) {
 	readDone := make(chan bool, 1)
 	writeDone := make(chan bool, 1)
-	go func(conn *websocket.Conn, w io.WriteCloser) {
-		for {
-			mt, r, err := conn.NextReader()
-			if err != nil {
-				LogDebugf("Got error getting next reader %s, %s", err, w)
-				break
-			}
 
-			if mt == websocket.CloseMessage {
-				LogDebugf("Got close message for reader")
-				break
-			}
+	ReadFunc := Reader
+	if ReadFunc == nil {
+		ReadFunc = defaultReader
+	}
 
-			if mt == websocket.TextMessage {
-				LogDebugf("Got message barrier, resetting stream")
-				break
-			}
+	WriteFunc := Writer
+	if WriteFunc == nil {
+		WriteFunc = defaultWriter
+	}
 
-			buf, err := ioutil.ReadAll(r)
-			if err != nil {
-				LogDebugf("Got error writing to writer %s", err)
-				break
-			}
-			i, err := w.Write(buf)
-			if i != len(buf) {
-				LogDebugf("Didn't write all of buf")
-				break
-			}
-			if err != nil {
-				LogDebugf("Error writing buf %s", err)
-				break
-			}
-		}
-		writeDone <- true
-		w.Close()
-	}(conn, w)
+	go ReadFunc(conn, r, readDone)
+	go WriteFunc(conn, w, writeDone)
+
+	return readDone, writeDone
+}
+
+func WebsocketExecMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser, exited chan bool, fd int) (chan bool, chan bool) {
+	readDone := make(chan bool, 1)
+	writeDone := make(chan bool, 1)
+
+	go defaultWriter(conn, w, writeDone)
 
 	go func(conn *websocket.Conn, r io.ReadCloser) {
 		/* For now, we don't need to adjust buffer sizes in
 		 * WebsocketMirror, since it's used for interactive things like
 		 * exec.
 		 */
-		in := ReaderToChannel(r, -1)
+		in := ExecReaderToChannel(r, -1, exited, fd)
 		for {
 			buf, ok := <-in
 			if !ok {

From 6b2b02592f2054c64b99f5efbccdcd615c5987d9 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Sat, 10 Dec 2016 12:36:20 +0100
Subject: [PATCH 4/4] lxd/container_exec: improve variable naming

I know, the names are long and ugly and make us look like terrible baby-hating
humans but they should make what is happening clearer.

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 lxd/container_exec.go | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index be30f51..66f4b9f 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -140,14 +140,14 @@ func (s *execWs) Do(op *operation) error {
 	}
 
 	controlExit := make(chan bool)
-	receivePid := make(chan int)
-	attachedChildIsDead := make(chan bool, 1)
+	attachedChildIsBorn := make(chan int)
+	attachedChildIsDead := make(chan bool, 2)
 	var wgEOF sync.WaitGroup
 
 	if s.interactive {
 		wgEOF.Add(1)
 		go func() {
-			receivedPid := <-receivePid
+			receivedPid := <-attachedChildIsBorn
 			select {
 			case <-s.controlConnected:
 				break
@@ -245,6 +245,10 @@ func (s *execWs) Do(op *operation) error {
 			s.conns[-1].Close()
 		}
 
+		// COMMENT(brauner):
+		// Sending this value two times is intentional and needed in
+		// ExecReaderToChannel(). Do not remove!
+		attachedChildIsDead <- true
 		attachedChildIsDead <- true
 
 		wgEOF.Wait()
@@ -268,7 +272,7 @@ func (s *execWs) Do(op *operation) error {
 	}
 
 	if s.interactive {
-		receivePid <- attachedPid
+		attachedChildIsBorn <- attachedPid
 	}
 
 	proc, err := os.FindProcess(pid)


More information about the lxc-devel mailing list