[lxc-devel] [pylxd/master] Fix execution times in lxd 3.0.1 and add pipelining support to execute
lasizoillo on Github
lxc-bot at linuxcontainers.org
Fri Jul 13 11:31:34 UTC 2018
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 1635 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180713/8b2f2233/attachment.bin>
-------------- next part --------------
From 7ac870b66f33cacf7933aaa5178a28fc6ca5c20e Mon Sep 17 00:00:00 2001
From: lasizoillo <lasizoillo at gmail.com>
Date: Fri, 13 Jul 2018 13:20:04 +0200
Subject: [PATCH] Fix execution times in lxd 3.0.1 and add pipelining support
to execute
---
integration/test_containers.py | 19 ++++++++++++
pylxd/models/container.py | 68 +++++++++++++++++++++++++++++++++---------
2 files changed, 73 insertions(+), 14 deletions(-)
diff --git a/integration/test_containers.py b/integration/test_containers.py
index ae68109..5e3ff24 100644
--- a/integration/test_containers.py
+++ b/integration/test_containers.py
@@ -179,6 +179,25 @@ def test_execute_force_decode(self):
self.assertEqual('qué', result.stdout)
self.assertEqual('', result.stderr)
+ def test_execute_pipes(self):
+ """A command receives data from stdin and write to stdout handler"""
+ self.container.start(wait=True)
+ self.addCleanup(self.container.stop, wait=True)
+ test_msg = "Hello world!\n"
+ stdout_msgs = []
+
+ def stdout_handler(msg):
+ stdout_msgs.append(msg)
+
+ result = self.container.execute(
+ ['cat', '-'], stdin_payload=test_msg, stdout_handler=stdout_handler
+ )
+
+ self.assertEqual(0, result.exit_code)
+ self.assertEqual(test_msg, result.stdout)
+ self.assertEqual('', result.stderr)
+ self.assertEqual(stdout_msgs, [test_msg])
+
def test_publish(self):
"""A container is published."""
image = self.container.publish(wait=True)
diff --git a/pylxd/models/container.py b/pylxd/models/container.py
index 4034cb5..ccf357b 100644
--- a/pylxd/models/container.py
+++ b/pylxd/models/container.py
@@ -255,7 +255,11 @@ def unfreeze(self, timeout=30, force=True, wait=False):
force=force,
wait=wait)
- def execute(self, commands, environment={}, encoding=None, decode=True):
+ def execute(
+ self, commands, environment={}, encoding=None, decode=True,
+ stdin_payload=None, stdin_encoding="utf-8",
+ stdout_handler=None, stderr_handler=None
+ ):
"""Execute a command on the container.
In pylxd 2.2, this method will be renamed `execute` and the existing
@@ -272,6 +276,16 @@ def execute(self, commands, environment={}, encoding=None, decode=True):
:param decode: Whether to decode the stdout/stderr or just return the
raw buffers.
:type decode: bool
+ :param stdin_payload: Payload to pass via stdin
+ :type stdin_payload: Can be a file, string, bytearray, generator or
+ ws4py Message object
+ :param stdin_encoding: Encoding to pass text to stdin (default utf-8)
+ :param stdout_handler: Callable than receive as first parameter each
+ message recived via stdout
+ :type stdout_handler: Callable[[str], None]
+ :param stderr_handler: Callable than receive as first parameter each
+ message recived via stderr
+ :type stderr_handler: Callable[[str], None]
:raises ValueError: if the ws4py library is not installed.
:returns: The return value, stdout and stdin
:rtype: _ContainerExecuteResult() namedtuple
@@ -294,17 +308,20 @@ def execute(self, commands, environment={}, encoding=None, decode=True):
self.client.api.operations[operation_id].websocket._api_endpoint)
with managers.web_socket_manager(WebSocketManager()) as manager:
- stdin = _StdinWebsocket(self.client.websocket_url)
+ stdin = _StdinWebsocket(
+ self.client.websocket_url, payload=stdin_payload,
+ encoding=stdin_encoding
+ )
stdin.resource = '{}?secret={}'.format(parsed.path, fds['0'])
stdin.connect()
stdout = _CommandWebsocketClient(
manager, self.client.websocket_url,
- encoding=encoding, decode=decode)
+ encoding=encoding, decode=decode, handler=stdout_handler)
stdout.resource = '{}?secret={}'.format(parsed.path, fds['1'])
stdout.connect()
stderr = _CommandWebsocketClient(
manager, self.client.websocket_url,
- encoding=encoding, decode=decode)
+ encoding=encoding, decode=decode, handler=stderr_handler)
stderr.resource = '{}?secret={}'.format(parsed.path, fds['2'])
stderr.connect()
@@ -320,8 +337,6 @@ def execute(self, commands, environment={}, encoding=None, decode=True):
while len(manager.websockets.values()) > 0:
time.sleep(.1) # pragma: no cover
- stdout.close()
- stderr.close()
manager.stop()
manager.join()
@@ -408,6 +423,7 @@ def __init__(self, manager, *args, **kwargs):
self.manager = manager
self.decode = kwargs.pop('decode', True)
self.encoding = kwargs.pop('encoding', None)
+ self.handler = kwargs.pop('handler', None)
self.message_encoding = None
super(_CommandWebsocketClient, self).__init__(*args, **kwargs)
@@ -416,14 +432,17 @@ def handshake_ok(self):
self.buffer = []
def received_message(self, message):
+ if message.data is None or len(message.data) == 0:
+ self.manager.remove(self)
+ return
if message.encoding and self.message_encoding is None:
self.message_encoding = message.encoding
+ if self.handler:
+ self.handler(self._maybe_decode(message.data))
self.buffer.append(message.data)
- @property
- def data(self):
- buffer = b''.join(self.buffer)
- if self.decode:
+ def _maybe_decode(self, buffer):
+ if self.decode and buffer is not None:
if self.encoding:
return buffer.decode(self.encoding)
if self.message_encoding:
@@ -432,17 +451,38 @@ def data(self):
return buffer.decode('utf-8')
return buffer
+ @property
+ def data(self):
+ buffer = b''.join(self.buffer)
+ return self._maybe_decode(buffer)
+
class _StdinWebsocket(WebSocketBaseClient): # pragma: no cover
"""A websocket client for handling stdin.
- The nature of stdin in Container.execute means that we don't
- ever use this connection. It is closed as soon as it completes
- the handshake.
+ Allow comunicate with container commands via stdin
"""
+ def __init__(self, url, payload=None, **kwargs):
+ self.encoding = kwargs.pop('encoding', None)
+ self.payload = payload
+ super().__init__(url, **kwargs)
+
+ def _smart_encode(self, msg):
+ if type(msg) == six.text_type and self.encoding:
+ return msg.encode(self.encoding)
+ return msg
+
def handshake_ok(self):
- self.close()
+ if self.payload:
+ if hasattr(self.payload, "read"):
+ self.send(
+ (self._smart_encode(line) for line in self.payload),
+ binary=True
+ )
+ else:
+ self.send(self._smart_encode(self.payload), binary=True)
+ self.send(b"", binary=False)
class Snapshot(model.Model):
More information about the lxc-devel
mailing list