[lxc-devel] [pylxd/master] Fix execution times in lxd 3.0.1 and add pipelining support to execute

lasizoillo on Github lxc-bot at linuxcontainers.org
Fri Jul 13 11:31:34 UTC 2018


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 1635 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180713/8b2f2233/attachment.bin>
-------------- next part --------------
From 7ac870b66f33cacf7933aaa5178a28fc6ca5c20e Mon Sep 17 00:00:00 2001
From: lasizoillo <lasizoillo at gmail.com>
Date: Fri, 13 Jul 2018 13:20:04 +0200
Subject: [PATCH] Fix execution times in lxd 3.0.1 and add pipelining support
 to execute

---
 integration/test_containers.py | 19 ++++++++++++
 pylxd/models/container.py      | 68 +++++++++++++++++++++++++++++++++---------
 2 files changed, 73 insertions(+), 14 deletions(-)

diff --git a/integration/test_containers.py b/integration/test_containers.py
index ae68109..5e3ff24 100644
--- a/integration/test_containers.py
+++ b/integration/test_containers.py
@@ -179,6 +179,25 @@ def test_execute_force_decode(self):
         self.assertEqual('qué', result.stdout)
         self.assertEqual('', result.stderr)
 
+    def test_execute_pipes(self):
+        """A command receives data from stdin and write to stdout handler"""
+        self.container.start(wait=True)
+        self.addCleanup(self.container.stop, wait=True)
+        test_msg = "Hello world!\n"
+        stdout_msgs = []
+
+        def stdout_handler(msg):
+            stdout_msgs.append(msg)
+
+        result = self.container.execute(
+            ['cat', '-'], stdin_payload=test_msg, stdout_handler=stdout_handler
+        )
+
+        self.assertEqual(0, result.exit_code)
+        self.assertEqual(test_msg, result.stdout)
+        self.assertEqual('', result.stderr)
+        self.assertEqual(stdout_msgs, [test_msg])
+
     def test_publish(self):
         """A container is published."""
         image = self.container.publish(wait=True)
diff --git a/pylxd/models/container.py b/pylxd/models/container.py
index 4034cb5..ccf357b 100644
--- a/pylxd/models/container.py
+++ b/pylxd/models/container.py
@@ -255,7 +255,11 @@ def unfreeze(self, timeout=30, force=True, wait=False):
                                force=force,
                                wait=wait)
 
-    def execute(self, commands, environment={}, encoding=None, decode=True):
+    def execute(
+            self, commands, environment={}, encoding=None, decode=True,
+            stdin_payload=None, stdin_encoding="utf-8",
+            stdout_handler=None, stderr_handler=None
+    ):
         """Execute a command on the container.
 
         In pylxd 2.2, this method will be renamed `execute` and the existing
@@ -272,6 +276,16 @@ def execute(self, commands, environment={}, encoding=None, decode=True):
         :param decode: Whether to decode the stdout/stderr or just return the
             raw buffers.
         :type decode: bool
+        :param stdin_payload: Payload to pass via stdin
+        :type stdin_payload: Can be a file, string, bytearray, generator or
+            ws4py Message object
+        :param stdin_encoding: Encoding to pass text to stdin (default utf-8)
+        :param stdout_handler: Callable than receive as first parameter each
+            message recived via stdout
+        :type stdout_handler: Callable[[str], None]
+        :param stderr_handler: Callable than receive as first parameter each
+            message recived via stderr
+        :type stderr_handler: Callable[[str], None]
         :raises ValueError: if the ws4py library is not installed.
         :returns: The return value, stdout and stdin
         :rtype: _ContainerExecuteResult() namedtuple
@@ -294,17 +308,20 @@ def execute(self, commands, environment={}, encoding=None, decode=True):
             self.client.api.operations[operation_id].websocket._api_endpoint)
 
         with managers.web_socket_manager(WebSocketManager()) as manager:
-            stdin = _StdinWebsocket(self.client.websocket_url)
+            stdin = _StdinWebsocket(
+                self.client.websocket_url, payload=stdin_payload,
+                encoding=stdin_encoding
+            )
             stdin.resource = '{}?secret={}'.format(parsed.path, fds['0'])
             stdin.connect()
             stdout = _CommandWebsocketClient(
                 manager, self.client.websocket_url,
-                encoding=encoding, decode=decode)
+                encoding=encoding, decode=decode, handler=stdout_handler)
             stdout.resource = '{}?secret={}'.format(parsed.path, fds['1'])
             stdout.connect()
             stderr = _CommandWebsocketClient(
                 manager, self.client.websocket_url,
-                encoding=encoding, decode=decode)
+                encoding=encoding, decode=decode, handler=stderr_handler)
             stderr.resource = '{}?secret={}'.format(parsed.path, fds['2'])
             stderr.connect()
 
@@ -320,8 +337,6 @@ def execute(self, commands, environment={}, encoding=None, decode=True):
             while len(manager.websockets.values()) > 0:
                 time.sleep(.1)  # pragma: no cover
 
-            stdout.close()
-            stderr.close()
             manager.stop()
             manager.join()
 
@@ -408,6 +423,7 @@ def __init__(self, manager, *args, **kwargs):
         self.manager = manager
         self.decode = kwargs.pop('decode', True)
         self.encoding = kwargs.pop('encoding', None)
+        self.handler = kwargs.pop('handler', None)
         self.message_encoding = None
         super(_CommandWebsocketClient, self).__init__(*args, **kwargs)
 
@@ -416,14 +432,17 @@ def handshake_ok(self):
         self.buffer = []
 
     def received_message(self, message):
+        if message.data is None or len(message.data) == 0:
+            self.manager.remove(self)
+            return
         if message.encoding and self.message_encoding is None:
             self.message_encoding = message.encoding
+        if self.handler:
+            self.handler(self._maybe_decode(message.data))
         self.buffer.append(message.data)
 
-    @property
-    def data(self):
-        buffer = b''.join(self.buffer)
-        if self.decode:
+    def _maybe_decode(self, buffer):
+        if self.decode and buffer is not None:
             if self.encoding:
                 return buffer.decode(self.encoding)
             if self.message_encoding:
@@ -432,17 +451,38 @@ def data(self):
             return buffer.decode('utf-8')
         return buffer
 
+    @property
+    def data(self):
+        buffer = b''.join(self.buffer)
+        return self._maybe_decode(buffer)
+
 
 class _StdinWebsocket(WebSocketBaseClient):  # pragma: no cover
     """A websocket client for handling stdin.
 
-    The nature of stdin in Container.execute means that we don't
-    ever use this connection. It is closed as soon as it completes
-    the handshake.
+    Allow comunicate with container commands via stdin
     """
 
+    def __init__(self, url, payload=None, **kwargs):
+        self.encoding = kwargs.pop('encoding', None)
+        self.payload = payload
+        super().__init__(url, **kwargs)
+
+    def _smart_encode(self, msg):
+        if type(msg) == six.text_type and self.encoding:
+            return msg.encode(self.encoding)
+        return msg
+
     def handshake_ok(self):
-        self.close()
+        if self.payload:
+            if hasattr(self.payload, "read"):
+                self.send(
+                    (self._smart_encode(line) for line in self.payload),
+                    binary=True
+                )
+            else:
+                self.send(self._smart_encode(self.payload), binary=True)
+        self.send(b"", binary=False)
 
 
 class Snapshot(model.Model):


More information about the lxc-devel mailing list