[lxc-devel] [lxd/stable-2.0] 2016 12 16/exec bits for stable 2.0

brauner on Github lxc-bot at linuxcontainers.org
Thu Dec 15 23:13:47 UTC 2016


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 301 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20161215/3061c25e/attachment.bin>
-------------- next part --------------
From 5c90a418798d42bf9417ad6ef237b2725a20a084 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 15 Dec 2016 23:55:23 +0100
Subject: [PATCH 1/5] exec: Exec() return attached PID && take bool arg

Give Exec() an explicit boolean argument "wait" which the user can use to tell
the function whether he wants it to wait on the command or not and return the
attached PID as an additional return value.
It's the callers responsibility to wait on the command. (Note. The PID returned
by Exec() can not be waited upon since it's a child of the lxd forkexec command.
It can however be used to e.g. forward signals.)

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 lxd/container.go      | 15 ++++++++--
 lxd/container_exec.go | 81 ++++++++++++++++++++++++++++++++++++++++-----------
 lxd/container_lxc.go  | 39 ++++++++++++++++++-------
 lxd/main_forkexec.go  | 41 ++++++++++++++++++++++++--
 4 files changed, 145 insertions(+), 31 deletions(-)

diff --git a/lxd/container.go b/lxd/container.go
index 995140a..8d4a9af 100644
--- a/lxd/container.go
+++ b/lxd/container.go
@@ -442,8 +442,19 @@ type container interface {
 	FilePush(srcpath string, dstpath string, uid int, gid int, mode int) error
 	FileRemove(path string) error
 
-	// Command execution
-	Exec(command []string, env map[string]string, stdin *os.File, stdout *os.File, stderr *os.File) (int, error)
+	/* Command execution:
+		 * 1. passing in false for wait
+		 *    - equivalent to calling cmd.Run()
+		 * 2. passing in true for wait
+	         *    - start the command and return its PID in the first return
+	         *      argument and the PID of the attached process in the second
+	         *      argument. It's the callers responsibility to wait on the
+	         *      command. (Note. The returned PID of the attached process can not
+	         *      be waited upon since it's a child of the lxd forkexec command
+	         *      (the PID returned in the first return argument). It can however
+	         *      be used to e.g. forward signals.)
+	*/
+	Exec(command []string, env map[string]string, stdin *os.File, stdout *os.File, stderr *os.File, wait bool) (int, int, error)
 
 	// Status
 	Render() (interface{}, error)
diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index ba7c7aa..0b3e440 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -9,6 +9,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"syscall"
 
 	"github.com/gorilla/mux"
 	"github.com/gorilla/websocket"
@@ -137,11 +138,14 @@ func (s *execWs) Do(op *operation) error {
 	}
 
 	controlExit := make(chan bool)
+	attachedChildIsBorn := make(chan int)
+	attachedChildIsDead := make(chan bool, 1)
 	var wgEOF sync.WaitGroup
 
 	if s.interactive {
 		wgEOF.Add(1)
 		go func() {
+			<-attachedChildIsBorn
 			select {
 			case <-s.controlConnected:
 				break
@@ -195,6 +199,7 @@ func (s *execWs) Do(op *operation) error {
 				}
 			}
 		}()
+
 		go func() {
 			readDone, writeDone := shared.WebsocketMirror(s.conns[0], ptys[0], ptys[0])
 			<-readDone
@@ -202,6 +207,7 @@ func (s *execWs) Do(op *operation) error {
 			s.conns[0].Close()
 			wgEOF.Done()
 		}()
+
 	} else {
 		wgEOF.Add(len(ttys) - 1)
 		for i := 0; i < len(ttys); i++ {
@@ -218,33 +224,74 @@ func (s *execWs) Do(op *operation) error {
 		}
 	}
 
-	cmdResult, cmdErr := s.container.Exec(s.command, s.env, stdin, stdout, stderr)
+	finisher := func(cmdResult int, cmdErr error) error {
+		for _, tty := range ttys {
+			tty.Close()
+		}
 
-	for _, tty := range ttys {
-		tty.Close()
-	}
+		if s.conns[-1] == nil {
+			if s.interactive {
+				controlExit <- true
+			}
+		} else {
+			s.conns[-1].Close()
+		}
+
+		attachedChildIsDead <- true
 
-	if s.conns[-1] == nil {
-		if s.interactive {
-			controlExit <- true
+		wgEOF.Wait()
+
+		for _, pty := range ptys {
+			pty.Close()
 		}
-	} else {
-		s.conns[-1].Close()
-	}
 
-	wgEOF.Wait()
+		metadata := shared.Jmap{"return": cmdResult}
+		err = op.UpdateMetadata(metadata)
+		if err != nil {
+			return err
+		}
 
-	for _, pty := range ptys {
-		pty.Close()
+		return cmdErr
 	}
 
-	metadata := shared.Jmap{"return": cmdResult}
-	err = op.UpdateMetadata(metadata)
+	pid, attachedPid, err := s.container.Exec(s.command, s.env, stdin, stdout, stderr, false)
 	if err != nil {
 		return err
 	}
 
-	return cmdErr
+	if s.interactive {
+		attachedChildIsBorn <- attachedPid
+	}
+
+	proc, err := os.FindProcess(pid)
+	if err != nil {
+		return finisher(-1, fmt.Errorf("Failed finding process: %q", err))
+	}
+
+	procState, err := proc.Wait()
+	if err != nil {
+		return finisher(-1, fmt.Errorf("Failed waiting on process %d: %q", pid, err))
+	}
+
+	if procState.Success() {
+		return finisher(0, nil)
+	}
+
+	status, ok := procState.Sys().(syscall.WaitStatus)
+	if ok {
+		if status.Exited() {
+			return finisher(status.ExitStatus(), nil)
+		}
+		// Backwards compatible behavior. Report success when we exited
+		// due to a signal. Otherwise this may break Jenkins, e.g. when
+		// lxc exec foo reboot receives SIGTERM and status.Exitstats()
+		// would report -1.
+		if status.Signaled() {
+			return finisher(0, nil)
+		}
+	}
+
+	return finisher(-1, nil)
 }
 
 func containerExecPost(d *Daemon, r *http.Request) Response {
@@ -337,7 +384,7 @@ func containerExecPost(d *Daemon, r *http.Request) Response {
 	}
 
 	run := func(op *operation) error {
-		cmdResult, cmdErr := c.Exec(post.Command, env, nil, nil, nil)
+		cmdResult, _, cmdErr := c.Exec(post.Command, env, nil, nil, nil, true)
 		metadata := shared.Jmap{"return": cmdResult}
 		err = op.UpdateMetadata(metadata)
 		if err != nil {
diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index a07d900..fcbc20a 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -4007,7 +4007,7 @@ func (c *containerLXC) FileRemove(path string) error {
 	return nil
 }
 
-func (c *containerLXC) Exec(command []string, env map[string]string, stdin *os.File, stdout *os.File, stderr *os.File) (int, error) {
+func (c *containerLXC) Exec(command []string, env map[string]string, stdin *os.File, stdout *os.File, stderr *os.File, wait bool) (int, int, error) {
 	envSlice := []string{}
 
 	for k, v := range env {
@@ -4031,25 +4031,44 @@ func (c *containerLXC) Exec(command []string, env map[string]string, stdin *os.F
 	cmd.Stdout = stdout
 	cmd.Stderr = stderr
 
-	shared.LogInfo("Executing command", log.Ctx{"environment": envSlice, "args": args})
+	r, w, err := shared.Pipe()
+	defer r.Close()
+	if err != nil {
+		shared.LogErrorf("s", err)
+		return -1, -1, err
+	}
 
-	err := cmd.Run()
+	cmd.ExtraFiles = []*os.File{w}
+	err = cmd.Start()
+	if err != nil {
+		w.Close()
+		return -1, -1, err
+	}
+	w.Close()
+	attachedPid := -1
+	if err := json.NewDecoder(r).Decode(&attachedPid); err != nil {
+		shared.LogErrorf("Failed to retrieve PID of executing child process: %s", err)
+		return -1, -1, err
+	}
+
+	// It's the callers responsibility to wait or not wait.
+	if !wait {
+		return cmd.Process.Pid, attachedPid, nil
+	}
+
+	err = cmd.Wait()
 	if err != nil {
 		exitErr, ok := err.(*exec.ExitError)
 		if ok {
 			status, ok := exitErr.Sys().(syscall.WaitStatus)
 			if ok {
-				shared.LogInfo("Executed command", log.Ctx{"environment": envSlice, "args": args, "exit_status": status.ExitStatus()})
-				return status.ExitStatus(), nil
+				return status.ExitStatus(), attachedPid, nil
 			}
 		}
-
-		shared.LogInfo("Failed executing command", log.Ctx{"environment": envSlice, "args": args, "err": err})
-		return -1, err
+		return -1, -1, err
 	}
 
-	shared.LogInfo("Executed command", log.Ctx{"environment": envSlice, "args": args})
-	return 0, nil
+	return 0, attachedPid, nil
 }
 
 func (c *containerLXC) diskState() map[string]shared.ContainerStateDisk {
diff --git a/lxd/main_forkexec.go b/lxd/main_forkexec.go
index e3ebcc6..8ebb0a6 100644
--- a/lxd/main_forkexec.go
+++ b/lxd/main_forkexec.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"encoding/json"
 	"fmt"
 	"os"
 	"strings"
@@ -89,10 +90,46 @@ func cmdForkExec(args []string) (int, error) {
 
 	opts.Env = env
 
-	status, err := c.RunCommandStatus(cmd, opts)
+	status, err := c.RunCommandNoWait(cmd, opts)
 	if err != nil {
 		return -1, fmt.Errorf("Failed running command: %q", err)
 	}
+	// Send the PID of the executing process.
+	w := os.NewFile(uintptr(3), "attachedPid")
+	defer w.Close()
 
-	return status >> 8, nil
+	err = json.NewEncoder(w).Encode(status)
+	if err != nil {
+		return -1, fmt.Errorf("Failed sending PID of executing command: %q", err)
+	}
+
+	proc, err := os.FindProcess(status)
+	if err != nil {
+		return -1, fmt.Errorf("Failed finding process: %q", err)
+	}
+
+	procState, err := proc.Wait()
+	if err != nil {
+		return -1, fmt.Errorf("Failed waiting on process %d: %q", status, err)
+	}
+
+	if procState.Success() {
+		return 0, nil
+	}
+
+	exCode, ok := procState.Sys().(syscall.WaitStatus)
+	if ok {
+		if exCode.Exited() {
+			return exCode.ExitStatus(), nil
+		}
+		// Backwards compatible behavior. Report success when we exited
+		// due to a signal. Otherwise this may break Jenkins, e.g. when
+		// lxc exec foo reboot receives SIGTERM and exCode.Exitstats()
+		// would report -1.
+		if exCode.Signaled() {
+			return 0, nil
+		}
+	}
+
+	return -1, fmt.Errorf("Command failed")
 }

From aef1c5a6962d662278b445c2a35d459537946be9 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 8 Dec 2016 14:34:50 +0100
Subject: [PATCH 2/5] shared/util_linux: add GetPollRevents()

It's a wrapper around a C function that polls on a single file descriptor and
returns the status and the detected revents.

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 shared/util_linux.go | 42 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git a/shared/util_linux.go b/shared/util_linux.go
index 51e0e43..0f83573 100644
--- a/shared/util_linux.go
+++ b/shared/util_linux.go
@@ -25,6 +25,7 @@ import (
 #include <errno.h>
 #include <fcntl.h>
 #include <limits.h>
+#include <poll.h>
 #include <string.h>
 #include <stdio.h>
 
@@ -181,9 +182,50 @@ int shiftowner(char *basepath, char *path, int uid, int gid) {
 	close(fd);
 	return 0;
 }
+
+int get_poll_revents(int lfd, int timeout, int flags, int *revents, int *saved_errno)
+{
+	int ret;
+	struct pollfd pfd = {lfd, flags, 0};
+
+again:
+	ret = poll(&pfd, 1, timeout);
+	if (ret < 0) {
+		if (errno == EINTR)
+			goto again;
+
+		*saved_errno = errno;
+		fprintf(stderr, "Failed to poll() on file descriptor.\n");
+		return -1;
+	}
+
+	*revents = pfd.revents;
+
+	return ret;
+}
 */
 import "C"
 
+const POLLIN int = C.POLLIN
+const POLLPRI int = C.POLLPRI
+const POLLNVAL int = C.POLLNVAL
+const POLLERR int = C.POLLERR
+const POLLHUP int = C.POLLHUP
+const POLLRDHUP int = C.POLLRDHUP
+
+func GetPollRevents(fd int, timeout int, flags int) (int, int, error) {
+	var err error
+	revents := C.int(0)
+	saved_errno := C.int(0)
+
+	ret := C.get_poll_revents(C.int(fd), C.int(timeout), C.int(flags), &revents, &saved_errno)
+	if int(ret) < 0 {
+		err = syscall.Errno(saved_errno)
+	}
+
+	return int(ret), int(revents), err
+}
+
 func ShiftOwner(basepath string, path string, uid int, gid int) error {
 	cbasepath := C.CString(basepath)
 	defer C.free(unsafe.Pointer(cbasepath))

From b1b32b01e46913b0f27fef274c57c66e81201446 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Sat, 10 Dec 2016 12:41:13 +0100
Subject: [PATCH 3/5] exec: detect background tasks to allow clean exit

This adds the function ExecReaderToChannel() to our linux specific utility
functions. It is the workhorse that reads from the master side of a pty file
descriptor. Background tasks are identified correctly. That means you can run:

	chb at conventiont|~/source/go/bin
	> ./lxc exec xen1 -- bash
	root at xen1:~# sleep infinity &

or

	chb at conventiont|~/source/go/bin
	> ./lxc exec xen1 -- bash
	root at xen1:~# yes &
	[1] 290
	root at xen1:~# y
	root at xen1:~# y
	root at xen1:~# y
	.
	.
	.

and still cleanly exit via "Ctrl+D" or "exit". The function is explained in
detail directly in the code.

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 shared/util_linux.go | 181 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 181 insertions(+)

diff --git a/shared/util_linux.go b/shared/util_linux.go
index 0f83573..77e4b23 100644
--- a/shared/util_linux.go
+++ b/shared/util_linux.go
@@ -6,9 +6,12 @@ package shared
 import (
 	"errors"
 	"fmt"
+	"io"
 	"os"
 	"os/exec"
 	"strings"
+	"sync"
+	"sync/atomic"
 	"syscall"
 	"unsafe"
 )
@@ -524,3 +527,181 @@ func GetAllXattr(path string) (xattrs map[string]string, err error) {
 
 	return xattrs, nil
 }
+
+// Extensively commented directly in the code. Please leave the comments!
+// Looking at this in a couple of months noone will know why and how this works
+// anymore.
+func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan bool, fd int) <-chan []byte {
+	if bufferSize <= (128 * 1024) {
+		bufferSize = (128 * 1024)
+	}
+
+	ch := make(chan ([]byte))
+	mutex := &sync.Mutex{}
+	chanClosed := false
+	onReturn := func() {
+		mutex.Lock()
+		if !chanClosed {
+			chanClosed = true
+			close(ch)
+		}
+		mutex.Unlock()
+	}
+
+	// COMMENT(brauner):
+	// [1]: This function has just one job: Dealing with the case where we
+	// are running an interactive shell session where we put a process in
+	// the background that does hold stdin/stdout open, but does not
+	// generate any output at all. This case cannot be dealt with in the
+	// following function call. Here's why: Assume the above case, now the
+	// attached child (the shell in this example) exits. This will not
+	// generate any poll() event: We won't get POLLHUP because the
+	// background process is holding stdin/stdout open and noone is writing
+	// to it. So we effectively block on GetPollRevents() in the function
+	// below. Hence, we use another go routine here who's only job is to
+	// handle that case: When we detect that the child has exited we check
+	// whether a POLLIN or POLLHUP event has been generated. If not, we know
+	// that there's nothing buffered on stdout and exit.
+	var attachedChildIsDead int32 = 0
+	go func() {
+		<-exited
+
+		atomic.StoreInt32(&attachedChildIsDead, 1)
+
+		ret, revents, err := GetPollRevents(fd, 0, (POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP))
+		if ret < 0 {
+			LogErrorf("Failed to poll(POLLIN | POLLPRI | POLLHUP | POLLRDHUP) on file descriptor: %s.", err)
+		} else if ret > 0 {
+			if (revents & POLLERR) > 0 {
+				LogWarnf("Detected poll(POLLERR) event.")
+			}
+		} else if ret == 0 {
+			LogDebugf("No data in stdout: exiting.")
+			onReturn()
+			return
+		}
+	}()
+
+	go func() {
+		readSize := (128 * 1024)
+		offset := 0
+		buf := make([]byte, bufferSize)
+		avoidAtomicLoad := false
+
+		defer onReturn()
+		for {
+			nr := 0
+			var err error
+
+			ret, revents, err := GetPollRevents(fd, -1, (POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP))
+			if ret < 0 {
+				// COMMENT(brauner):
+				// This condition is only reached in cases where we are massively f*cked since we even handle
+				// EINTR in the underlying C wrapper around poll(). So let's exit here.
+				LogErrorf("Failed to poll(POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP) on file descriptor: %s. Exiting.", err)
+				return
+			}
+
+			// COMMENT(brauner):
+			// [2]: If the process exits before all its data has been read by us and no other process holds stdin or
+			// stdout open, then we will observe a (POLLHUP | POLLRDHUP | POLLIN) event. This means, we need to
+			// keep on reading from the pty file descriptor until we get a simple POLLHUP back.
+			both := ((revents & (POLLIN | POLLPRI)) > 0) && ((revents & (POLLHUP | POLLRDHUP)) > 0)
+			if both {
+				LogDebugf("Detected poll(POLLIN | POLLPRI | POLLHUP | POLLRDHUP) event.")
+				read := buf[offset : offset+readSize]
+				nr, err = r.Read(read)
+			}
+
+			if (revents & POLLERR) > 0 {
+				LogWarnf("Detected poll(POLLERR) event: exiting.")
+				return
+			}
+
+			if ((revents & (POLLIN | POLLPRI)) > 0) && !both {
+				// COMMENT(brauner):
+				// This might appear unintuitive at first but is actually a nice trick: Assume we are running
+				// a shell session in a container and put a process in the background that is writing to
+				// stdout. Now assume the attached process (aka the shell in this example) exits because we
+				// used Ctrl+D to send EOF or something. If no other process would be holding stdout open we
+				// would expect to observe either a (POLLHUP | POLLRDHUP | POLLIN | POLLPRI) event if there
+				// is still data buffered from the previous process or a simple (POLLHUP | POLLRDHUP) if
+				// no data is buffered. The fact that we only observe a (POLLIN | POLLPRI) event means that
+				// another process is holding stdout open and is writing to it.
+				// One counter argument that can be leveraged is (brauner looks at tycho :))
+				// "Hey, you need to write at least one additional tty buffer to make sure that
+				// everything that the attached child has written is actually shown."
+				// The answer to that is:
+				// "This case can only happen if the process has exited and has left data in stdout which
+				// would generate a (POLLIN | POLLPRI | POLLHUP | POLLRDHUP) event and this case is already
+				// handled and triggers another codepath. (See [2].)"
+				if avoidAtomicLoad || atomic.LoadInt32(&attachedChildIsDead) == 1 {
+					avoidAtomicLoad = true
+					// COMMENT(brauner):
+					// Handle race between atomic.StorInt32() in the go routine
+					// explained in [1] and atomic.LoadInt32() in the go routine
+					// here:
+					// We need to check for (POLLHUP | POLLRDHUP) here again since we might
+					// still be handling a pure POLLIN event from a write prior to the childs
+					// exit. But the child might have exited right before and performed
+					// atomic.StoreInt32() to update attachedChildIsDead before we
+					// performed our atomic.LoadInt32(). This means we accidently hit this
+					// codepath and are misinformed about the available poll() events. So we
+					// need to perform a non-blocking poll() again to exclude that case:
+					//
+					// - If we detect no (POLLHUP | POLLRDHUP) event we know the child
+					//   has already exited but someone else is holding stdin/stdout open and
+					//   writing to it.
+					//   Note that his case should only ever be triggered in situations like
+					//   running a shell and doing stuff like:
+					//    > ./lxc exec xen1 -- bash
+					//   root at xen1:~# yes &
+					//   .
+					//   .
+					//   .
+					//   now send Ctrl+D or type "exit". By the time the Ctrl+D/exit event is
+					//   triggered, we will have read all of the childs data it has written to
+					//   stdout and so we can assume that anything that comes now belongs to
+					//   the process that is holding stdin/stdout open.
+					//
+					// - If we detect a (POLLHUP | POLLRDHUP) event we know that we've
+					//   hit this codepath on accident caused by the race between
+					//   atomic.StoreInt32() in the go routine explained in [1] and
+					//   atomic.LoadInt32() in this go routine. So the next call to
+					//   GetPollRevents() will either return
+					//   (POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP)
+					//   or (POLLHUP | POLLRDHUP). Both will trigger another codepath (See [2].)
+					//   that takes care that all data of the child that is buffered in
+					//   stdout is written out.
+					ret, revents, err := GetPollRevents(fd, 0, (POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP))
+					if ret < 0 {
+						LogErrorf("Failed to poll(POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP) on file descriptor: %s. Exiting.", err)
+						return
+					} else if (revents & (POLLHUP | POLLRDHUP)) == 0 {
+						LogDebugf("Exiting but background processes are still running.")
+						return
+					}
+				}
+				read := buf[offset : offset+readSize]
+				nr, err = r.Read(read)
+			}
+
+			// COMMENT(brauner):
+			// The attached process has exited and we have read all data that may have
+			// been buffered.
+			if ((revents & (POLLHUP | POLLRDHUP)) > 0) && !both {
+				LogDebugf("Detected poll(POLLHUP) event: exiting.")
+				return
+			}
+
+			offset += nr
+			if offset > 0 && (offset+readSize >= bufferSize || err != nil) {
+				ch <- buf[0:offset]
+				offset = 0
+				buf = make([]byte, bufferSize)
+			}
+		}
+	}()
+
+	return ch
+}

From bf619338e221a3b58f1704003db7f21ebd1e9729 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Thu, 8 Dec 2016 14:46:10 +0100
Subject: [PATCH 4/5] shared/network_linux: ad WebsocketExecMirror()

This function specifically deals with running commands attached to a pty in the
container. We need to put it in a separate file as it is Linux specific.

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 lxd/container_exec.go   |   2 +-
 lxd/rsync.go            |   4 +-
 shared/network.go       | 153 +++++++++++++++++++++++++++---------------------
 shared/network_linux.go |  48 +++++++++++++++
 4 files changed, 136 insertions(+), 71 deletions(-)
 create mode 100644 shared/network_linux.go

diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index 0b3e440..6101757 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -201,7 +201,7 @@ func (s *execWs) Do(op *operation) error {
 		}()
 
 		go func() {
-			readDone, writeDone := shared.WebsocketMirror(s.conns[0], ptys[0], ptys[0])
+			readDone, writeDone := shared.WebsocketExecMirror(s.conns[0], ptys[0], ptys[0], attachedChildIsDead, int(ptys[0].Fd()))
 			<-readDone
 			<-writeDone
 			s.conns[0].Close()
diff --git a/lxd/rsync.go b/lxd/rsync.go
index ee58fee..ba646ee 100644
--- a/lxd/rsync.go
+++ b/lxd/rsync.go
@@ -96,7 +96,7 @@ func RsyncSend(path string, conn *websocket.Conn) error {
 		return err
 	}
 
-	readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, dataSocket)
+	readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, dataSocket, nil, nil)
 
 	output, err := ioutil.ReadAll(stderr)
 	if err != nil {
@@ -146,7 +146,7 @@ func RsyncRecv(path string, conn *websocket.Conn) error {
 		return err
 	}
 
-	readDone, writeDone := shared.WebsocketMirror(conn, stdin, stdout)
+	readDone, writeDone := shared.WebsocketMirror(conn, stdin, stdout, nil, nil)
 	data, err2 := ioutil.ReadAll(stderr)
 	if err2 != nil {
 		shared.LogDebugf("error reading rsync stderr: %s", err2)
diff --git a/shared/network.go b/shared/network.go
index 39a7f58..20b14c0 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -204,84 +204,101 @@ func WebsocketRecvStream(w io.Writer, conn *websocket.Conn) chan bool {
 	return ch
 }
 
+func defaultReader(conn *websocket.Conn, r io.ReadCloser, readDone chan<- bool) {
+	/* For now, we don't need to adjust buffer sizes in
+	* WebsocketMirror, since it's used for interactive things like
+	* exec.
+	 */
+	in := ReaderToChannel(r, -1)
+	for {
+		buf, ok := <-in
+		if !ok {
+			r.Close()
+			LogDebugf("sending write barrier")
+			conn.WriteMessage(websocket.TextMessage, []byte{})
+			readDone <- true
+			return
+		}
+		w, err := conn.NextWriter(websocket.BinaryMessage)
+		if err != nil {
+			LogDebugf("Got error getting next writer %s", err)
+			break
+		}
+
+		_, err = w.Write(buf)
+		w.Close()
+		if err != nil {
+			LogDebugf("Got err writing %s", err)
+			break
+		}
+	}
+	closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
+	conn.WriteMessage(websocket.CloseMessage, closeMsg)
+	readDone <- true
+	r.Close()
+}
+
+func defaultWriter(conn *websocket.Conn, w io.WriteCloser, writeDone chan<- bool) {
+	for {
+		mt, r, err := conn.NextReader()
+		if err != nil {
+			LogDebugf("Got error getting next reader %s, %s", err, w)
+			break
+		}
+
+		if mt == websocket.CloseMessage {
+			LogDebugf("Got close message for reader")
+			break
+		}
+
+		if mt == websocket.TextMessage {
+			LogDebugf("Got message barrier, resetting stream")
+			break
+		}
+
+		buf, err := ioutil.ReadAll(r)
+		if err != nil {
+			LogDebugf("Got error writing to writer %s", err)
+			break
+		}
+		i, err := w.Write(buf)
+		if i != len(buf) {
+			LogDebugf("Didn't write all of buf")
+			break
+		}
+		if err != nil {
+			LogDebugf("Error writing buf %s", err)
+			break
+		}
+	}
+	writeDone <- true
+	w.Close()
+}
+
 // WebsocketMirror allows mirroring a reader to a websocket and taking the
 // result and writing it to a writer. This function allows for multiple
 // mirrorings and correctly negotiates stream endings. However, it means any
 // websocket.Conns passed to it are live when it returns, and must be closed
 // explicitly.
-func WebsocketMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser) (chan bool, chan bool) {
+type WebSocketMirrorReader func(conn *websocket.Conn, r io.ReadCloser, readDone chan<- bool)
+type WebSocketMirrorWriter func(conn *websocket.Conn, w io.WriteCloser, writeDone chan<- bool)
+
+func WebsocketMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser, Reader WebSocketMirrorReader, Writer WebSocketMirrorWriter) (chan bool, chan bool) {
 	readDone := make(chan bool, 1)
 	writeDone := make(chan bool, 1)
-	go func(conn *websocket.Conn, w io.WriteCloser) {
-		for {
-			mt, r, err := conn.NextReader()
-			if err != nil {
-				LogDebugf("Got error getting next reader %s, %s", err, w)
-				break
-			}
 
-			if mt == websocket.CloseMessage {
-				LogDebugf("Got close message for reader")
-				break
-			}
-
-			if mt == websocket.TextMessage {
-				LogDebugf("Got message barrier, resetting stream")
-				break
-			}
+	ReadFunc := Reader
+	if ReadFunc == nil {
+		ReadFunc = defaultReader
+	}
 
-			buf, err := ioutil.ReadAll(r)
-			if err != nil {
-				LogDebugf("Got error writing to writer %s", err)
-				break
-			}
-			i, err := w.Write(buf)
-			if i != len(buf) {
-				LogDebugf("Didn't write all of buf")
-				break
-			}
-			if err != nil {
-				LogDebugf("Error writing buf %s", err)
-				break
-			}
-		}
-		writeDone <- true
-		w.Close()
-	}(conn, w)
-
-	go func(conn *websocket.Conn, r io.ReadCloser) {
-		/* For now, we don't need to adjust buffer sizes in
-		 * WebsocketMirror, since it's used for interactive things like
-		 * exec.
-		 */
-		in := ReaderToChannel(r, -1)
-		for {
-			buf, ok := <-in
-			if !ok {
-				r.Close()
-				LogDebugf("sending write barrier")
-				conn.WriteMessage(websocket.TextMessage, []byte{})
-				readDone <- true
-				return
-			}
-			w, err := conn.NextWriter(websocket.BinaryMessage)
-			if err != nil {
-				LogDebugf("Got error getting next writer %s", err)
-				break
-			}
+	WriteFunc := Writer
+	if WriteFunc == nil {
+		WriteFunc = defaultWriter
+	}
 
-			_, err = w.Write(buf)
-			w.Close()
-			if err != nil {
-				LogDebugf("Got err writing %s", err)
-				break
-			}
-		}
-		closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
-		conn.WriteMessage(websocket.CloseMessage, closeMsg)
-		readDone <- true
-		r.Close()
-	}(conn, r)
+	go ReadFunc(conn, r, readDone)
+	go WriteFunc(conn, w, writeDone)
 
 	return readDone, writeDone
 }
diff --git a/shared/network_linux.go b/shared/network_linux.go
new file mode 100644
index 0000000..3bee792
--- /dev/null
+++ b/shared/network_linux.go
@@ -0,0 +1,48 @@
+// +build linux
+
+package shared
+
+import (
+	"io"
+
+	"github.com/gorilla/websocket"
+)
+
+func WebsocketExecMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser, exited chan bool, fd int) (chan bool, chan bool) {
+	readDone := make(chan bool, 1)
+	writeDone := make(chan bool, 1)
+
+	go defaultWriter(conn, w, writeDone)
+
+	go func(conn *websocket.Conn, r io.ReadCloser) {
+		in := ExecReaderToChannel(r, -1, exited, fd)
+		for {
+			buf, ok := <-in
+			if !ok {
+				r.Close()
+				LogDebugf("sending write barrier")
+				conn.WriteMessage(websocket.TextMessage, []byte{})
+				readDone <- true
+				return
+			}
+			w, err := conn.NextWriter(websocket.BinaryMessage)
+			if err != nil {
+				LogDebugf("Got error getting next writer %s", err)
+				break
+			}
+
+			_, err = w.Write(buf)
+			w.Close()
+			if err != nil {
+				LogDebugf("Got err writing %s", err)
+				break
+			}
+		}
+		closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
+		conn.WriteMessage(websocket.CloseMessage, closeMsg)
+		readDone <- true
+		r.Close()
+	}(conn, r)
+
+	return readDone, writeDone
+}

From 91b39c3ec77ebbb4bc2c7ac4d33de5062838e637 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brauner at ubuntu.com>
Date: Tue, 13 Dec 2016 02:35:23 +0100
Subject: [PATCH 5/5] util_linux: ExecReaderToChannel() use sync.Once

Takes care that the closeChannel() function is exactly executed once. This
allows us to avoid using a mutex.

Signed-off-by: Christian Brauner <christian.brauner at ubuntu.com>
---
 shared/util_linux.go | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)

diff --git a/shared/util_linux.go b/shared/util_linux.go
index 77e4b23..8ef5b82 100644
--- a/shared/util_linux.go
+++ b/shared/util_linux.go
@@ -537,15 +537,12 @@ func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan bool, fd int
 	}
 
 	ch := make(chan ([]byte))
-	mutex := &sync.Mutex{}
-	chanClosed := false
-	onReturn := func() {
-		mutex.Lock()
-		if !chanClosed {
-			chanClosed = true
-			close(ch)
-		}
-		mutex.Unlock()
+
+	// Takes care that the closeChannel() function is exactly executed once.
+	// This allows us to avoid using a mutex.
+	var once sync.Once
+	closeChannel := func() {
+		close(ch)
 	}
 
 	// COMMENT(brauner):
@@ -577,7 +574,7 @@ func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan bool, fd int
 			}
 		} else if ret == 0 {
 			LogDebugf("No data in stdout: exiting.")
-			onReturn()
+			once.Do(closeChannel)
 			return
 		}
 	}()
@@ -588,7 +585,7 @@ func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan bool, fd int
 		buf := make([]byte, bufferSize)
 		avoidAtomicLoad := false
 
-		defer onReturn()
+		defer once.Do(closeChannel)
 		for {
 			nr := 0
 			var err error


More information about the lxc-devel mailing list