[lxc-devel] [lxd/master] VM: Fixes exec read loop when agent not started

tomponline on Github lxc-bot at linuxcontainers.org
Fri Mar 13 10:44:51 UTC 2020


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 1327 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20200313/de2ad8aa/attachment.bin>
-------------- next part --------------
From 3f7441e7a34a92cad426941a82a7778248dfc590 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Fri, 13 Mar 2020 10:28:38 +0000
Subject: [PATCH 1/5] shared/util/linux: Updates ExecReaderToChannel to accept
 a finisher chan as struct{}

This is because the action of closing such a channel suffices for indicating the channel is finished.

It also means that the sender doesn't need to initialise the channel as a buffered channel, as multiple reads are allowed on a closed channel.

The empty struct{} is used to indicate that the contents of the read message has no meaning, just the action of reading from it is the indicator.

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 shared/util_linux_cgo.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/shared/util_linux_cgo.go b/shared/util_linux_cgo.go
index 45f7c03e22..41d84b80ff 100644
--- a/shared/util_linux_cgo.go
+++ b/shared/util_linux_cgo.go
@@ -289,7 +289,7 @@ again:
 // Extensively commented directly in the code. Please leave the comments!
 // Looking at this in a couple of months noone will know why and how this works
 // anymore.
-func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan bool, fd int) <-chan []byte {
+func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan struct{}, fd int) <-chan []byte {
 	if bufferSize <= (128 * 1024) {
 		bufferSize = (128 * 1024)
 	}

From 03a71e64dd2dee8ab39cbcb25816a2d2e5238207 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Fri, 13 Mar 2020 10:30:14 +0000
Subject: [PATCH 2/5] lxd-agent/exec: Updates usage of ExecReaderToChannel
 channel definitions

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd-agent/exec.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/lxd-agent/exec.go b/lxd-agent/exec.go
index 1feddb8325..23a3fa5104 100644
--- a/lxd-agent/exec.go
+++ b/lxd-agent/exec.go
@@ -253,7 +253,7 @@ func (s *execWs) Do(op *operations.Operation) error {
 
 	controlExit := make(chan bool, 1)
 	attachedChildIsBorn := make(chan int)
-	attachedChildIsDead := make(chan bool, 1)
+	attachedChildIsDead := make(chan struct{})
 	var wgEOF sync.WaitGroup
 
 	if s.interactive {
@@ -400,7 +400,7 @@ func (s *execWs) Do(op *operations.Operation) error {
 			conn.Close()
 		}
 
-		attachedChildIsDead <- true
+		close(attachedChildIsDead)
 
 		wgEOF.Wait()
 

From ad51d1488bed7e3911901eb8942a95cf4e14b58b Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Fri, 13 Mar 2020 10:33:56 +0000
Subject: [PATCH 3/5] shared/network: Removes logging internal state of
 websocket in WebsocketRecvStream

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 shared/network.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/shared/network.go b/shared/network.go
index 71e5f09551..3a49052850 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -211,7 +211,7 @@ func WebsocketRecvStream(w io.Writer, conn *websocket.Conn) chan bool {
 			}
 
 			if err != nil {
-				logger.Debugf("Got error getting next reader %s, %s", err, w)
+				logger.Debugf("Got error getting next reader %s", err)
 				break
 			}
 

From 7e4468edad2977201ea0fc8ce41bb48a31b7a512 Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Fri, 13 Mar 2020 10:34:20 +0000
Subject: [PATCH 4/5] shared/netutils/network/linux: Updates
 WebsocketExecMirror to use struct{} exited indicator channel

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 shared/netutils/network_linux.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/shared/netutils/network_linux.go b/shared/netutils/network_linux.go
index 0dc7bb72b0..2f31ef70e6 100644
--- a/shared/netutils/network_linux.go
+++ b/shared/netutils/network_linux.go
@@ -173,7 +173,7 @@ func NetnsGetifaddrs(initPID int32) (map[string]api.InstanceStateNetwork, error)
 }
 
 // WebsocketExecMirror mirrors a websocket connection with a set of Writer/Reader.
-func WebsocketExecMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser, exited chan bool, fd int) (chan bool, chan bool) {
+func WebsocketExecMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser, exited chan struct{}, fd int) (chan bool, chan bool) {
 	readDone := make(chan bool, 1)
 	writeDone := make(chan bool, 1)
 

From ef63dd9af90281e2c484376ab619ced1ee37b8fd Mon Sep 17 00:00:00 2001
From: Thomas Parrott <thomas.parrott at canonical.com>
Date: Fri, 13 Mar 2020 10:35:19 +0000
Subject: [PATCH 5/5] lxd/instance/exec: Fixes VM read loop when agent not
 started

The websocket read go routines were being started before the instance.Exec process was confirmed to have started.

This was leaking go routines and leaving websocket readers running on closed connections which could result in infinite loops in the websocket reader functions.

Changes made:

1. Re-orders the code so that the websocket mirroring go routines are not started until the process being execed has started.
2. Changes any channels that were used for coodination only (i.e not actually sending any data) from bool to struct{} to indicate that the value has no meaning, and instead using close(ch) to indicate whatever coodination point was needed. This has the added benefit of not needing to use a buffered channel, as channels can be closed with no readers without blocking.
3. Properly uses the finisher() function for returning from the Do() function so that sockets and channels are closed.
4. Improves logging with instance name context.

Signed-off-by: Thomas Parrott <thomas.parrott at canonical.com>
---
 lxd/instance_exec.go | 146 +++++++++++++++++++++----------------------
 1 file changed, 70 insertions(+), 76 deletions(-)

diff --git a/lxd/instance_exec.go b/lxd/instance_exec.go
index 66b95f19fd..8f3e3d41a0 100644
--- a/lxd/instance_exec.go
+++ b/lxd/instance_exec.go
@@ -25,6 +25,7 @@ import (
 	"github.com/lxc/lxd/shared/api"
 	log "github.com/lxc/lxd/shared/log15"
 	"github.com/lxc/lxd/shared/logger"
+	"github.com/lxc/lxd/shared/logging"
 	"github.com/lxc/lxd/shared/netutils"
 	"github.com/lxc/lxd/shared/version"
 )
@@ -37,8 +38,8 @@ type execWs struct {
 	rootGid          int64
 	conns            map[int]*websocket.Conn
 	connsLock        sync.Mutex
-	allConnected     chan bool
-	controlConnected chan bool
+	allConnected     chan struct{}
+	controlConnected chan struct{}
 	fds              map[int]string
 }
 
@@ -78,7 +79,7 @@ func (s *execWs) Connect(op *operations.Operation, r *http.Request, w http.Respo
 			s.connsLock.Unlock()
 
 			if fd == -1 {
-				s.controlConnected <- true
+				close(s.controlConnected) // Control WS is now connected.
 				return nil
 			}
 
@@ -91,7 +92,7 @@ func (s *execWs) Connect(op *operations.Operation, r *http.Request, w http.Respo
 			}
 			s.connsLock.Unlock()
 
-			s.allConnected <- true
+			close(s.allConnected) // All WS not connected.
 			return nil
 		}
 	}
@@ -142,17 +143,59 @@ func (s *execWs) Do(op *operations.Operation) error {
 		stderr = ttys[2]
 	}
 
-	controlExit := make(chan bool)
-	attachedChildIsBorn := make(chan instance.Cmd)
-	attachedChildIsDead := make(chan bool, 1)
+	controlExit := make(chan struct{})
+	attachedChildIsDead := make(chan struct{})
 	var wgEOF sync.WaitGroup
 
+	// Define a function to clean up TTYs and sockets when done.
+	finisher := func(cmdResult int, cmdErr error) error {
+		for _, tty := range ttys {
+			tty.Close()
+		}
+
+		s.connsLock.Lock()
+		conn := s.conns[-1]
+		s.connsLock.Unlock()
+
+		if conn == nil {
+			if s.req.Interactive {
+				close(controlExit)
+			}
+		} else {
+			conn.Close()
+		}
+
+		close(attachedChildIsDead)
+
+		wgEOF.Wait()
+
+		for _, pty := range ptys {
+			pty.Close()
+		}
+
+		metadata := shared.Jmap{"return": cmdResult}
+		err = op.UpdateMetadata(metadata)
+		if err != nil {
+			return err
+		}
+
+		return cmdErr
+	}
+
+	cmd, err := s.instance.Exec(s.req, stdin, stdout, stderr)
+	if err != nil {
+		return finisher(-1, err)
+	}
+
+	logger := logging.AddContext(logger.Log, log.Ctx{"instance": s.instance.Name(), "PID": cmd.PID()})
+	logger.Debug("Instance process started")
+
+	// Now that process has started, we can start the mirroring of the process channels and websockets.
 	if s.req.Interactive {
 		wgEOF.Add(1)
 		go func() {
-			logger.Debugf("Interactive child process handler waiting")
-			defer logger.Debugf("Interactive child process handler finished")
-			attachedChild := <-attachedChildIsBorn
+			logger.Debug("Interactive child process handler started")
+			defer logger.Debug("Interactive child process handler finished")
 
 			select {
 			case <-s.controlConnected:
@@ -162,7 +205,6 @@ func (s *execWs) Do(op *operations.Operation) error {
 				return
 			}
 
-			logger.Debugf(`Interactive child process handler started for child PID "%d"`, attachedChild.PID())
 			for {
 				s.connsLock.Lock()
 				conn := s.conns[-1]
@@ -174,7 +216,7 @@ func (s *execWs) Do(op *operations.Operation) error {
 				}
 
 				if err != nil {
-					logger.Debugf("Got error getting next reader: %v", err)
+					logger.Debug("Got error getting next reader", log.Ctx{"err": err})
 					er, ok := err.(*websocket.CloseError)
 					if !ok {
 						break
@@ -185,50 +227,50 @@ func (s *execWs) Do(op *operations.Operation) error {
 					}
 
 					// If an abnormal closure occurred, kill the attached child.
-					err := attachedChild.Signal(unix.SIGKILL)
+					err := cmd.Signal(unix.SIGKILL)
 					if err != nil {
-						logger.Debugf(`Failed to send SIGKILL to PID "%d": %v`, attachedChild.PID(), err)
+						logger.Debug("Failed to send SIGKILL signal", log.Ctx{"err": err})
 					} else {
-						logger.Debugf(`Sent SIGKILL to PID "%d"`, attachedChild.PID())
+						logger.Debug("Sent SIGKILL signal")
 					}
 					return
 				}
 
 				buf, err := ioutil.ReadAll(r)
 				if err != nil {
-					logger.Debugf("Failed to read message: %v", err)
+					logger.Debug("Failed to read message", log.Ctx{"err": err})
 					break
 				}
 
 				command := api.InstanceExecControl{}
 
 				if err := json.Unmarshal(buf, &command); err != nil {
-					logger.Debugf("Failed to unmarshal control socket command: %v", err)
+					logger.Debug("Failed to unmarshal control socket command", log.Ctx{"err": err})
 					continue
 				}
 
 				if command.Command == "window-resize" {
 					winchWidth, err := strconv.Atoi(command.Args["width"])
 					if err != nil {
-						logger.Debugf("Unable to extract window width: %v", err)
+						logger.Debug("Unable to extract window width", log.Ctx{"err": err})
 						continue
 					}
 
 					winchHeight, err := strconv.Atoi(command.Args["height"])
 					if err != nil {
-						logger.Debugf("Unable to extract window height: %v", err)
+						logger.Debug("Unable to extract window height", log.Ctx{"err": err})
 						continue
 					}
 
-					err = attachedChild.WindowResize(int(ptys[0].Fd()), winchWidth, winchHeight)
+					err = cmd.WindowResize(int(ptys[0].Fd()), winchWidth, winchHeight)
 					if err != nil {
-						logger.Debugf(`Failed to set window size to "%dx%d": %v`, winchWidth, winchHeight, err)
+						logger.Debug("Failed to set window size", winchWidth, winchHeight, log.Ctx{"err": err, "width": winchWidth, "height": winchHeight})
 						continue
 					}
 				} else if command.Command == "signal" {
-					err := attachedChild.Signal(unix.Signal(command.Signal))
+					err := cmd.Signal(unix.Signal(command.Signal))
 					if err != nil {
-						logger.Debugf(`Failed forwarding signal "%d" to PID "%d": %v`, command.Signal, attachedChild.PID(), err)
+						logger.Debug("Failed forwarding signal", log.Ctx{"err": err, "signal": command.Signal})
 						continue
 					}
 				}
@@ -240,8 +282,8 @@ func (s *execWs) Do(op *operations.Operation) error {
 			conn := s.conns[0]
 			s.connsLock.Unlock()
 
-			logger.Debugf("Started mirroring websocket")
-			defer logger.Debugf("Finished mirroring websocket")
+			logger.Debug("Started mirroring websocket")
+			defer logger.Debug("Finished mirroring websocket")
 			readDone, writeDone := netutils.WebsocketExecMirror(conn, ptys[0], ptys[0], attachedChildIsDead, int(ptys[0].Fd()))
 
 			<-readDone
@@ -249,7 +291,6 @@ func (s *execWs) Do(op *operations.Operation) error {
 			conn.Close()
 			wgEOF.Done()
 		}()
-
 	} else {
 		wgEOF.Add(len(ttys) - 1)
 		for i := 0; i < len(ttys); i++ {
@@ -274,55 +315,8 @@ func (s *execWs) Do(op *operations.Operation) error {
 		}
 	}
 
-	finisher := func(cmdResult int, cmdErr error) error {
-		for _, tty := range ttys {
-			tty.Close()
-		}
-
-		s.connsLock.Lock()
-		conn := s.conns[-1]
-		s.connsLock.Unlock()
-
-		if conn == nil {
-			if s.req.Interactive {
-				controlExit <- true
-			}
-		} else {
-			conn.Close()
-		}
-
-		attachedChildIsDead <- true
-
-		wgEOF.Wait()
-
-		for _, pty := range ptys {
-			pty.Close()
-		}
-
-		metadata := shared.Jmap{"return": cmdResult}
-		err = op.UpdateMetadata(metadata)
-		if err != nil {
-			return err
-		}
-
-		return cmdErr
-	}
-
-	cmd, err := s.instance.Exec(s.req, stdin, stdout, stderr)
-	if err != nil {
-		return err
-	}
-
-	if s.req.Interactive {
-		// Start the interactive process handler.
-		attachedChildIsBorn <- cmd
-	}
-
 	exitCode, err := cmd.Wait()
-	if err != nil {
-		return err
-	}
-
+	logger.Debug("Instance process stopped")
 	return finisher(exitCode, err)
 }
 
@@ -444,8 +438,8 @@ func containerExecPost(d *Daemon, r *http.Request) response.Response {
 			ws.conns[1] = nil
 			ws.conns[2] = nil
 		}
-		ws.allConnected = make(chan bool, 1)
-		ws.controlConnected = make(chan bool, 1)
+		ws.allConnected = make(chan struct{})
+		ws.controlConnected = make(chan struct{})
 		for i := -1; i < len(ws.conns)-1; i++ {
 			ws.fds[i], err = shared.RandomCryptoString()
 			if err != nil {


More information about the lxc-devel mailing list