[lxc-devel] [lxd/master] exec: detect background tasks to allow clean exit
brauner on Github
lxc-bot at linuxcontainers.org
Sat Dec 10 17:14:00 UTC 2016
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 969 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20161210/92cbf147/attachment.bin>
-------------- next part --------------
From ecb6b48b4ea0b50bd2db8ade8b6fcc32c59e9a9b Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 8 Dec 2016 14:34:50 +0100
Subject: [PATCH 1/4] shared/util_linux: add GetPollRevents()
It's a wrapper around a C function that polls on a single file descriptor and
returns the status and the detected revents.
Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
shared/util_linux.go | 29 +++++++++++++++++++++++++++++
1 file changed, 29 insertions(+)
diff --git a/shared/util_linux.go b/shared/util_linux.go
index 51e0e43..0a22091 100644
--- a/shared/util_linux.go
+++ b/shared/util_linux.go
@@ -25,6 +25,7 @@ import (
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
+#include <poll.h>
#include <string.h>
#include <stdio.h>
@@ -181,9 +182,37 @@ int shiftowner(char *basepath, char *path, int uid, int gid) {
close(fd);
return 0;
}
+
+int get_poll_revents(int lfd, int timeout, int flags, int *revents)
+{
+ int ret;
+ struct pollfd pfd = {lfd, flags, 0};
+
+ ret = poll(&pfd, 1, timeout);
+ if (ret < 0) {
+ fprintf(stderr, "Failed to poll() on file descriptor.\n");
+ return -1;
+ }
+
+ *revents = pfd.revents;
+
+ return ret;
+}
*/
import "C"
+const POLLIN int = C.POLLIN
+const POLLPRI int = C.POLLPRI
+const POLLNVAL int = C.POLLNVAL
+const POLLERR int = C.POLLERR
+const POLLHUP int = C.POLLHUP
+
+func GetPollRevents(fd int, timeout int, flags int) (int, int) {
+ revents := C.int(0)
+ ret := C.get_poll_revents(C.int(fd), C.int(timeout), C.int(flags), &revents)
+ return int(ret), int(revents)
+}
+
func ShiftOwner(basepath string, path string, uid int, gid int) error {
cbasepath := C.CString(basepath)
defer C.free(unsafe.Pointer(cbasepath))
From 538b5606dfcafe73ac9f741259a67f1b5adb4f0d Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Sat, 10 Dec 2016 12:41:13 +0100
Subject: [PATCH 2/4] exec: detect background tasks to allow clean exit
This adds the function ExecReaderToChannel() to our linux specific utility
functions. It is the workhorse that reads from the master side of a pty file
descriptor. Background tasks are identified correctly. That means you can run:
chb at conventiont|~/source/go/bin
> ./lxc exec xen1 -- bash
root at xen1:~# sleep infinity &
or
chb at conventiont|~/source/go/bin
> ./lxc exec xen1 -- bash
root at xen1:~# yes &
[1] 290
root at xen1:~# y
root at xen1:~# y
root at xen1:~# y
.
.
.
and still cleanly exit via "Ctrl+D" or "exit". The function is explained in
detail directly in the code.
Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
shared/util_linux.go | 159 +++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 159 insertions(+)
diff --git a/shared/util_linux.go b/shared/util_linux.go
index 0a22091..e052a66 100644
--- a/shared/util_linux.go
+++ b/shared/util_linux.go
@@ -6,9 +6,11 @@ package shared
import (
"errors"
"fmt"
+ "io"
"os"
"os/exec"
"strings"
+ "sync"
"syscall"
"unsafe"
)
@@ -511,3 +513,160 @@ func GetAllXattr(path string) (xattrs map[string]string, err error) {
return xattrs, nil
}
+
+// Extensively commented directly in the code. Please leave the comments!
+// Looking at this in a couple of months noone will know why and how this works
+// anymore.
+func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan bool, fd int) <-chan []byte {
+ if bufferSize <= (128 * 1024) {
+ bufferSize = (128 * 1024)
+ }
+
+ mutex := &sync.Mutex{}
+ chanClosed := false
+ ch := make(chan ([]byte))
+ // COMMENT(brauner):
+ // I'm not sure the mutex is actually needed since the two functions
+ // below shouldn't race, since GetPollRevents() in the first function is
+ // either fired off when the second function is blocking on
+ // GetPollRevents() or is fired off asynchronously and races (A case the
+ // kernel will handle for us, unless the poll() API is broken.) with
+ // GetPollRevents() in the second function but the first function will
+ // only do something interesting when GetPollRevents() observes *no*
+ // events whereas the second function will only do something interesting
+ // when it observes *some* event. But better safe than sorry.
+ onReturn := func() {
+ if !chanClosed {
+ mutex.Lock()
+ chanClosed = true
+ close(ch)
+ mutex.Unlock()
+ }
+ }
+
+ // COMMENT(brauner):
+ // This function has just one job: Dealing with the case where we are
+ // running an interactive shell session where we put a process in the
+ // background that does hold stdin/stdout open, but does not generate
+ // any output at all. This case cannot be dealt with in the following
+ // function call. Here's why: Assume the above case, now the attached
+ // child (the shell in this example) exits. This will not generate any
+ // poll() event: We won't get POLLHUP because the background process is
+ // holding stdin/stdout open and noone is writing to it. So we
+ // effectively block on GetPollRevents() in the default case of the
+ // select statement in the function below. This means, we will never
+ // reach the <-exited case where we could fire-off another zero timeout
+ // GetPollRevents() to detect that noone has written to stdin/stout and
+ // simply exit. Hence, we use another go routine here who's only job is
+ // to handle that case: When we detect that the child has exited we
+ // check whether a POLLIN or POLLHUP event has been generated. If not,
+ // we know that there's nothing buffered on stdout and exit.
+ go func() {
+ <-exited
+ ret, _ := GetPollRevents(fd, (POLLIN | POLLHUP), 0)
+ if ret < 0 {
+ LogErrorf("Failed to poll(POLLIN | POLLHUP) on file descriptor: exiting.")
+ } else if ret == 0 {
+ LogDebugf("No more data in stdout: exiting.")
+ onReturn()
+ return
+ }
+ }()
+
+ go func() {
+ readSize := (128 * 1024)
+ offset := 0
+ buf := make([]byte, bufferSize)
+ attachedChildIsDead := false
+
+ defer onReturn()
+ for {
+ select {
+ case <-exited:
+ attachedChildIsDead = true
+ default:
+ nr := 0
+ var err error
+
+ ret, revents := GetPollRevents(fd, -1, (POLLIN | POLLPRI | POLLERR | POLLHUP))
+ if ret < 0 {
+ LogErrorf("Failed to poll(POLLIN | POLLPRI | POLLERR | POLLHUP) on file descriptor: exiting.")
+ return
+ } else {
+
+ // If the process exits before all its data has been read by us and no other
+ // process holds stdin or stdout open, then we will observe a
+ // (POLLHUP | POLLIN) event. This means, we need to keep on reading from the
+ // pty file descriptor until we get a simple POLLHUP back.
+ both := ((revents & (POLLHUP | POLLIN)) == (POLLHUP | POLLIN))
+ if both {
+ LogDebugf("Detected poll(POLLHUP | POLLIN) event.")
+ read := buf[offset : offset+readSize]
+ nr, err = r.Read(read)
+ }
+
+ if (revents & POLLERR) > 0 {
+ LogWarnf("Detected poll(POLLERR) event: exiting.")
+ return
+ }
+
+ if ((revents & POLLIN) > 0) && !both {
+ // COMMENT(brauner):
+ // This might appear unintuitive at first but is actually a
+ // nice trick: Assume we are running a shell session in a
+ // container and put a process in the background that is
+ // writing to stdout. Now assume the attached process (aka the
+ // shell in this example) exits because we used Ctrl+D to
+ // send EOF or something. If no other process would be
+ // holding stdout open we would expect to observe either a
+ // (POLLHUP | POLLIN) event if there is still data buffered
+ // from the previous process or a simple POLLHUP if no data
+ // is buffered. The fact that we only observe a POLLIN event
+ // means that another process is holding stdout open and is
+ // writing to it.
+ // One counter argument that can be leveraged is
+ // (brauner looks at tycho :)) "Hey, you need to write at
+ // least one additional tty buffer to make sure that
+ // everything that the attached child has written is actually
+ // shown." The answer to that is:
+ // "I'd argue that this codepath is only hit when we are
+ // running an interactive session and the shell will
+ // run commands consecutively since only one command can be
+ // actively in the foreground. So Ctrl+D can't cause a race
+ // since it will only be interpreted after the other
+ // command has stopped. This of course is different, if the
+ // attached child process exits before all output has been
+ // read by us but this codepath would trigger a
+ // (POLLHUP | POLLIN) event (See argument above) and is hence dealt
+ // with in another code path. So if I'm right this logic is
+ // not affected by either delay or frequency of writing to
+ // stdout.
+ if attachedChildIsDead {
+ LogDebugf("Exiting but background processes are still running.")
+ return
+ }
+ read := buf[offset : offset+readSize]
+ nr, err = r.Read(read)
+ }
+
+ // COMMENT(brauner):
+ // The attached process has exited and we have read all data that may have
+ // been buffered.
+ if ((revents & POLLHUP) > 0) && !both {
+ LogDebugf("Detected poll(POLLHUP) event: exiting.")
+ return
+ }
+ }
+
+ offset += nr
+ if offset > 0 && (offset+readSize >= bufferSize || err != nil) {
+ ch <- buf[0:offset]
+ offset = 0
+ buf = make([]byte, bufferSize)
+ }
+ }
+ }
+ }()
+
+ return ch
+}
From 6586dcee808eb0afd9e4aa116cef02cecd37881c Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 8 Dec 2016 14:46:10 +0100
Subject: [PATCH 3/4] shared/network: ad WebsocketExecMirror()
This function specifically deals with running commands attached to a pty in the
container.
Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
lxd/container_exec.go | 7 ++-
lxd/rsync.go | 4 +-
shared/network.go | 130 ++++++++++++++++++++++++++++++++++++--------------
3 files changed, 103 insertions(+), 38 deletions(-)
diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index a070d3a..be30f51 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -141,6 +141,7 @@ func (s *execWs) Do(op *operation) error {
controlExit := make(chan bool)
receivePid := make(chan int)
+ attachedChildIsDead := make(chan bool, 1)
var wgEOF sync.WaitGroup
if s.interactive {
@@ -206,13 +207,15 @@ func (s *execWs) Do(op *operation) error {
}
}
}()
+
go func() {
- readDone, writeDone := shared.WebsocketMirror(s.conns[0], ptys[0], ptys[0])
+ readDone, writeDone := shared.WebsocketExecMirror(s.conns[0], ptys[0], ptys[0], attachedChildIsDead, int(ptys[0].Fd()))
<-readDone
<-writeDone
s.conns[0].Close()
wgEOF.Done()
}()
+
} else {
wgEOF.Add(len(ttys) - 1)
for i := 0; i < len(ttys); i++ {
@@ -242,6 +245,8 @@ func (s *execWs) Do(op *operation) error {
s.conns[-1].Close()
}
+ attachedChildIsDead <- true
+
wgEOF.Wait()
for _, pty := range ptys {
diff --git a/lxd/rsync.go b/lxd/rsync.go
index 407fbcf..5be1354 100644
--- a/lxd/rsync.go
+++ b/lxd/rsync.go
@@ -102,7 +102,7 @@ func RsyncSend(path string, conn *websocket.Conn, readWrapper func(io.ReadCloser
readPipe = readWrapper(dataSocket)
}
- readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, readPipe)
+ readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, readPipe, nil, nil)
output, err := ioutil.ReadAll(stderr)
if err != nil {
@@ -157,7 +157,7 @@ func RsyncRecv(path string, conn *websocket.Conn, writeWrapper func(io.WriteClos
writePipe = writeWrapper(stdin)
}
- readDone, writeDone := shared.WebsocketMirror(conn, writePipe, stdout)
+ readDone, writeDone := shared.WebsocketMirror(conn, writePipe, stdout, nil, nil)
data, err2 := ioutil.ReadAll(stderr)
if err2 != nil {
shared.LogDebugf("error reading rsync stderr: %s", err2)
diff --git a/shared/network.go b/shared/network.go
index ebc79c9..705e88f 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -227,57 +227,117 @@ func WebsocketRecvStream(w io.Writer, conn *websocket.Conn) chan bool {
return ch
}
+func defaultReader(conn *websocket.Conn, r io.ReadCloser, readDone chan<- bool) {
+ /* For now, we don't need to adjust buffer sizes in
+ * WebsocketMirror, since it's used for interactive things like
+ * exec.
+ */
+ in := ReaderToChannel(r, -1)
+ for {
+ buf, ok := <-in
+ if !ok {
+ r.Close()
+ LogDebugf("sending write barrier")
+ conn.WriteMessage(websocket.TextMessage, []byte{})
+ readDone <- true
+ return
+ }
+ w, err := conn.NextWriter(websocket.BinaryMessage)
+ if err != nil {
+ LogDebugf("Got error getting next writer %s", err)
+ break
+ }
+
+ _, err = w.Write(buf)
+ w.Close()
+ if err != nil {
+ LogDebugf("Got err writing %s", err)
+ break
+ }
+ }
+ closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
+ conn.WriteMessage(websocket.CloseMessage, closeMsg)
+ readDone <- true
+ r.Close()
+}
+
+func defaultWriter(conn *websocket.Conn, w io.WriteCloser, writeDone chan<- bool) {
+ for {
+ mt, r, err := conn.NextReader()
+ if err != nil {
+ LogDebugf("Got error getting next reader %s, %s", err, w)
+ break
+ }
+
+ if mt == websocket.CloseMessage {
+ LogDebugf("Got close message for reader")
+ break
+ }
+
+ if mt == websocket.TextMessage {
+ LogDebugf("Got message barrier, resetting stream")
+ break
+ }
+
+ buf, err := ioutil.ReadAll(r)
+ if err != nil {
+ LogDebugf("Got error writing to writer %s", err)
+ break
+ }
+ i, err := w.Write(buf)
+ if i != len(buf) {
+ LogDebugf("Didn't write all of buf")
+ break
+ }
+ if err != nil {
+ LogDebugf("Error writing buf %s", err)
+ break
+ }
+ }
+ writeDone <- true
+ w.Close()
+}
+
// WebsocketMirror allows mirroring a reader to a websocket and taking the
// result and writing it to a writer. This function allows for multiple
// mirrorings and correctly negotiates stream endings. However, it means any
// websocket.Conns passed to it are live when it returns, and must be closed
// explicitly.
-func WebsocketMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser) (chan bool, chan bool) {
+type WebSocketMirrorReader func(conn *websocket.Conn, r io.ReadCloser, readDone chan<- bool)
+type WebSocketMirrorWriter func(conn *websocket.Conn, w io.WriteCloser, writeDone chan<- bool)
+
+func WebsocketMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser, Reader WebSocketMirrorReader, Writer WebSocketMirrorWriter) (chan bool, chan bool) {
readDone := make(chan bool, 1)
writeDone := make(chan bool, 1)
- go func(conn *websocket.Conn, w io.WriteCloser) {
- for {
- mt, r, err := conn.NextReader()
- if err != nil {
- LogDebugf("Got error getting next reader %s, %s", err, w)
- break
- }
- if mt == websocket.CloseMessage {
- LogDebugf("Got close message for reader")
- break
- }
+ ReadFunc := Reader
+ if ReadFunc == nil {
+ ReadFunc = defaultReader
+ }
- if mt == websocket.TextMessage {
- LogDebugf("Got message barrier, resetting stream")
- break
- }
+ WriteFunc := Writer
+ if WriteFunc == nil {
+ WriteFunc = defaultWriter
+ }
- buf, err := ioutil.ReadAll(r)
- if err != nil {
- LogDebugf("Got error writing to writer %s", err)
- break
- }
- i, err := w.Write(buf)
- if i != len(buf) {
- LogDebugf("Didn't write all of buf")
- break
- }
- if err != nil {
- LogDebugf("Error writing buf %s", err)
- break
- }
- }
- writeDone <- true
- w.Close()
- }(conn, w)
+ go ReadFunc(conn, r, readDone)
+ go WriteFunc(conn, w, writeDone)
+
+ return readDone, writeDone
+}
+
+func WebsocketExecMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser, exited chan bool, fd int) (chan bool, chan bool) {
+ readDone := make(chan bool, 1)
+ writeDone := make(chan bool, 1)
+
+ go defaultWriter(conn, w, writeDone)
go func(conn *websocket.Conn, r io.ReadCloser) {
/* For now, we don't need to adjust buffer sizes in
* WebsocketMirror, since it's used for interactive things like
* exec.
*/
- in := ReaderToChannel(r, -1)
+ in := ExecReaderToChannel(r, -1, exited, fd)
for {
buf, ok := <-in
if !ok {
From 6b2b02592f2054c64b99f5efbccdcd615c5987d9 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Sat, 10 Dec 2016 12:36:20 +0100
Subject: [PATCH 4/4] lxd/container_exec: improve variable naming
I know, the names are long and ugly and make us look like terrible baby-hating
humans but they should make what is happening clearer.
Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
lxd/container_exec.go | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index be30f51..66f4b9f 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -140,14 +140,14 @@ func (s *execWs) Do(op *operation) error {
}
controlExit := make(chan bool)
- receivePid := make(chan int)
- attachedChildIsDead := make(chan bool, 1)
+ attachedChildIsBorn := make(chan int)
+ attachedChildIsDead := make(chan bool, 2)
var wgEOF sync.WaitGroup
if s.interactive {
wgEOF.Add(1)
go func() {
- receivedPid := <-receivePid
+ receivedPid := <-attachedChildIsBorn
select {
case <-s.controlConnected:
break
@@ -245,6 +245,10 @@ func (s *execWs) Do(op *operation) error {
s.conns[-1].Close()
}
+ // COMMENT(brauner):
+ // Sending this value two times is intentional and needed in
+ // ExecReaderToChannel(). Do not remove!
+ attachedChildIsDead <- true
attachedChildIsDead <- true
wgEOF.Wait()
@@ -268,7 +272,7 @@ func (s *execWs) Do(op *operation) error {
}
if s.interactive {
- receivePid <- attachedPid
+ attachedChildIsBorn <- attachedPid
}
proc, err := os.FindProcess(pid)
More information about the lxc-devel
mailing list