[lxc-devel] [pylxd/master] Move models
rockstar on Github
lxc-bot at linuxcontainers.org
Wed Sep 7 18:17:53 UTC 2016
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 403 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20160907/ffacc6ad/attachment.bin>
-------------- next part --------------
From 29de53082122fdc52ec6ff59b7f35ab1cab7a5d8 Mon Sep 17 00:00:00 2001
From: Paul Hummer <paul.hummer at canonical.com>
Date: Wed, 7 Sep 2016 10:48:12 -0600
Subject: [PATCH 1/9] Add pylxd.models package
---
pylxd/models/__init__.py | 0
1 file changed, 0 insertions(+), 0 deletions(-)
create mode 100644 pylxd/models/__init__.py
diff --git a/pylxd/models/__init__.py b/pylxd/models/__init__.py
new file mode 100644
index 0000000..e69de29
From 44bf149136f6ae896735fd783f7a72228c9c2859 Mon Sep 17 00:00:00 2001
From: Paul Hummer <paul.hummer at canonical.com>
Date: Wed, 7 Sep 2016 10:58:27 -0600
Subject: [PATCH 2/9] Move pylxd.model to pylxd.models._model
---
pylxd/certificate.py | 2 +-
pylxd/container.py | 3 +-
pylxd/image.py | 2 +-
pylxd/model.py | 195 ----------------------------------------------
pylxd/models/_model.py | 195 ++++++++++++++++++++++++++++++++++++++++++++++
pylxd/network.py | 2 +-
pylxd/profile.py | 2 +-
pylxd/tests/test_model.py | 2 +-
8 files changed, 202 insertions(+), 201 deletions(-)
delete mode 100644 pylxd/model.py
create mode 100644 pylxd/models/_model.py
diff --git a/pylxd/certificate.py b/pylxd/certificate.py
index 7f4fc1e..b2c0a6c 100644
--- a/pylxd/certificate.py
+++ b/pylxd/certificate.py
@@ -18,7 +18,7 @@
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import Encoding
-from pylxd import model
+from pylxd.models import _model as model
class Certificate(model.Model):
diff --git a/pylxd/container.py b/pylxd/container.py
index 116355a..8f73a3e 100644
--- a/pylxd/container.py
+++ b/pylxd/container.py
@@ -18,8 +18,9 @@
from ws4py.client import WebSocketBaseClient
from ws4py.manager import WebSocketManager
-from pylxd import managers, model
+from pylxd import managers
from pylxd.deprecation import deprecated
+from pylxd.models import _model as model
from pylxd.operation import Operation
diff --git a/pylxd/image.py b/pylxd/image.py
index 518e3a8..00b0479 100644
--- a/pylxd/image.py
+++ b/pylxd/image.py
@@ -13,7 +13,7 @@
# under the License.
import hashlib
-from pylxd import model
+from pylxd.models import _model as model
from pylxd.operation import Operation
diff --git a/pylxd/model.py b/pylxd/model.py
deleted file mode 100644
index 3ca5ebf..0000000
--- a/pylxd/model.py
+++ /dev/null
@@ -1,195 +0,0 @@
-# Copyright (c) 2016 Canonical Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import warnings
-
-import six
-
-from pylxd.deprecation import deprecated
-from pylxd.operation import Operation
-
-
-class Attribute(object):
- """A metadata class for model attributes."""
-
- def __init__(self, validator=None, readonly=False, optional=False):
- self.validator = validator
- self.readonly = readonly
- self.optional = optional
-
-
-class Manager(object):
- """A manager declaration.
-
- This class signals to the model that it will have a Manager
- attribute.
- """
-
-
-class Parent(object):
- """A parent declaration.
-
- Child managers must keep a reference to their parent.
- """
-
-
-class ModelType(type):
- """A Model metaclass.
-
- This metaclass converts the declarative Attribute style
- to attributes on the model instance itself.
- """
-
- def __new__(cls, name, bases, attrs):
- if '__slots__' in attrs and name != 'Model': # pragma: no cover
- raise TypeError('__slots__ should not be specified.')
- attributes = {}
- for_removal = []
- managers = []
-
- for key, val in attrs.items():
- if type(val) == Attribute:
- attributes[key] = val
- for_removal.append(key)
- if type(val) in (Manager, Parent):
- managers.append(key)
- for_removal.append(key)
- for key in for_removal:
- del attrs[key]
-
- slots = list(attributes.keys())
- if '__slots__' in attrs:
- slots = slots + attrs['__slots__']
- for base in bases:
- if '__slots__' in dir(base):
- slots = slots + base.__slots__
- if len(managers) > 0:
- slots = slots + managers
- attrs['__slots__'] = slots
- attrs['__attributes__'] = attributes
-
- return super(ModelType, cls).__new__(cls, name, bases, attrs)
-
-
- at six.add_metaclass(ModelType)
-class Model(object):
- """A Base LXD object model.
-
- Objects fetched from the LXD API have state, which allows
- the objects to be used transactionally, with E-tag support,
- and be smart about I/O.
-
- The model lifecycle is this: A model's get/create methods will
- return an instance. That instance may or may not be a partial
- instance. If it is a partial instance, `sync` will be called
- and the rest of the object retrieved from the server when
- un-initialized attributes are read. When attributes are modified,
- the instance is marked as dirty. `save` will save the changes
- to the server.
- """
- __slots__ = ['client', '__dirty__']
-
- def __init__(self, client, **kwargs):
- self.__dirty__ = []
- self.client = client
-
- for key, val in kwargs.items():
- try:
- setattr(self, key, val)
- except AttributeError:
- warnings.warn(
- 'Attempted to set unknown attribute "{}" '
- 'on instance of "{}"'.format(
- key, self.__class__.__name__
- ))
- del self.__dirty__[:]
-
- def __getattribute__(self, name):
- try:
- return super(Model, self).__getattribute__(name)
- except AttributeError:
- if name in self.__attributes__:
- self.sync()
- return super(Model, self).__getattribute__(name)
- else:
- raise
-
- def __setattr__(self, name, value):
- if name in self.__attributes__:
- attribute = self.__attributes__[name]
-
- if attribute.validator is not None:
- if attribute.validator is not type(value):
- value = attribute.validator(value)
- self.__dirty__.append(name)
- return super(Model, self).__setattr__(name, value)
-
- @property
- def dirty(self):
- return len(self.__dirty__) > 0
-
- def sync(self, rollback=False):
- """Sync from the server.
-
- When collections of objects are retrieved from the server, they
- are often partial objects. The full object must be retrieved before
- it can modified. This method is called when getattr is called on
- a non-initaliazed object.
- """
- # XXX: rockstar (25 Jun 2016) - This has the potential to step
- # on existing attributes.
- response = self.api.get()
- for key, val in response.json()['metadata'].items():
- if key not in self.__dirty__ or rollback:
- setattr(self, key, val)
- if rollback:
- del self.__dirty__[:]
- fetch = deprecated("fetch is deprecated; please use sync")(sync)
-
- def rollback(self):
- """Reset the object from the server."""
- return self.sync(rollback=True)
-
- def save(self, wait=False):
- """Save data to the server.
-
- This method should write the new data to the server via marshalling.
- It should be a no-op when the object is not dirty, to prevent needless
- I/O.
- """
- marshalled = self.marshall()
- response = self.api.put(json=marshalled)
-
- if response.json()['type'] == 'async' and wait:
- Operation.wait_for_operation(
- self.client, response.json()['operation'])
- del self.__dirty__[:]
- update = deprecated('update is deprecated; please use save')(save)
-
- def delete(self, wait=False):
- """Delete an object from the server."""
- response = self.api.delete()
-
- if response.json()['type'] == 'async' and wait:
- Operation.wait_for_operation(
- self.client, response.json()['operation'])
- self.client = None
-
- def marshall(self):
- """Marshall the object in preparation for updating to the server."""
- marshalled = {}
- for key, val in self.__attributes__.items():
- if ((not val.readonly and not val.optional) or
- (val.optional and hasattr(self, key))):
- marshalled[key] = getattr(self, key)
- return marshalled
diff --git a/pylxd/models/_model.py b/pylxd/models/_model.py
new file mode 100644
index 0000000..3ca5ebf
--- /dev/null
+++ b/pylxd/models/_model.py
@@ -0,0 +1,195 @@
+# Copyright (c) 2016 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import warnings
+
+import six
+
+from pylxd.deprecation import deprecated
+from pylxd.operation import Operation
+
+
+class Attribute(object):
+ """A metadata class for model attributes."""
+
+ def __init__(self, validator=None, readonly=False, optional=False):
+ self.validator = validator
+ self.readonly = readonly
+ self.optional = optional
+
+
+class Manager(object):
+ """A manager declaration.
+
+ This class signals to the model that it will have a Manager
+ attribute.
+ """
+
+
+class Parent(object):
+ """A parent declaration.
+
+ Child managers must keep a reference to their parent.
+ """
+
+
+class ModelType(type):
+ """A Model metaclass.
+
+ This metaclass converts the declarative Attribute style
+ to attributes on the model instance itself.
+ """
+
+ def __new__(cls, name, bases, attrs):
+ if '__slots__' in attrs and name != 'Model': # pragma: no cover
+ raise TypeError('__slots__ should not be specified.')
+ attributes = {}
+ for_removal = []
+ managers = []
+
+ for key, val in attrs.items():
+ if type(val) == Attribute:
+ attributes[key] = val
+ for_removal.append(key)
+ if type(val) in (Manager, Parent):
+ managers.append(key)
+ for_removal.append(key)
+ for key in for_removal:
+ del attrs[key]
+
+ slots = list(attributes.keys())
+ if '__slots__' in attrs:
+ slots = slots + attrs['__slots__']
+ for base in bases:
+ if '__slots__' in dir(base):
+ slots = slots + base.__slots__
+ if len(managers) > 0:
+ slots = slots + managers
+ attrs['__slots__'] = slots
+ attrs['__attributes__'] = attributes
+
+ return super(ModelType, cls).__new__(cls, name, bases, attrs)
+
+
+ at six.add_metaclass(ModelType)
+class Model(object):
+ """A Base LXD object model.
+
+ Objects fetched from the LXD API have state, which allows
+ the objects to be used transactionally, with E-tag support,
+ and be smart about I/O.
+
+ The model lifecycle is this: A model's get/create methods will
+ return an instance. That instance may or may not be a partial
+ instance. If it is a partial instance, `sync` will be called
+ and the rest of the object retrieved from the server when
+ un-initialized attributes are read. When attributes are modified,
+ the instance is marked as dirty. `save` will save the changes
+ to the server.
+ """
+ __slots__ = ['client', '__dirty__']
+
+ def __init__(self, client, **kwargs):
+ self.__dirty__ = []
+ self.client = client
+
+ for key, val in kwargs.items():
+ try:
+ setattr(self, key, val)
+ except AttributeError:
+ warnings.warn(
+ 'Attempted to set unknown attribute "{}" '
+ 'on instance of "{}"'.format(
+ key, self.__class__.__name__
+ ))
+ del self.__dirty__[:]
+
+ def __getattribute__(self, name):
+ try:
+ return super(Model, self).__getattribute__(name)
+ except AttributeError:
+ if name in self.__attributes__:
+ self.sync()
+ return super(Model, self).__getattribute__(name)
+ else:
+ raise
+
+ def __setattr__(self, name, value):
+ if name in self.__attributes__:
+ attribute = self.__attributes__[name]
+
+ if attribute.validator is not None:
+ if attribute.validator is not type(value):
+ value = attribute.validator(value)
+ self.__dirty__.append(name)
+ return super(Model, self).__setattr__(name, value)
+
+ @property
+ def dirty(self):
+ return len(self.__dirty__) > 0
+
+ def sync(self, rollback=False):
+ """Sync from the server.
+
+ When collections of objects are retrieved from the server, they
+ are often partial objects. The full object must be retrieved before
+ it can modified. This method is called when getattr is called on
+ a non-initaliazed object.
+ """
+ # XXX: rockstar (25 Jun 2016) - This has the potential to step
+ # on existing attributes.
+ response = self.api.get()
+ for key, val in response.json()['metadata'].items():
+ if key not in self.__dirty__ or rollback:
+ setattr(self, key, val)
+ if rollback:
+ del self.__dirty__[:]
+ fetch = deprecated("fetch is deprecated; please use sync")(sync)
+
+ def rollback(self):
+ """Reset the object from the server."""
+ return self.sync(rollback=True)
+
+ def save(self, wait=False):
+ """Save data to the server.
+
+ This method should write the new data to the server via marshalling.
+ It should be a no-op when the object is not dirty, to prevent needless
+ I/O.
+ """
+ marshalled = self.marshall()
+ response = self.api.put(json=marshalled)
+
+ if response.json()['type'] == 'async' and wait:
+ Operation.wait_for_operation(
+ self.client, response.json()['operation'])
+ del self.__dirty__[:]
+ update = deprecated('update is deprecated; please use save')(save)
+
+ def delete(self, wait=False):
+ """Delete an object from the server."""
+ response = self.api.delete()
+
+ if response.json()['type'] == 'async' and wait:
+ Operation.wait_for_operation(
+ self.client, response.json()['operation'])
+ self.client = None
+
+ def marshall(self):
+ """Marshall the object in preparation for updating to the server."""
+ marshalled = {}
+ for key, val in self.__attributes__.items():
+ if ((not val.readonly and not val.optional) or
+ (val.optional and hasattr(self, key))):
+ marshalled[key] = getattr(self, key)
+ return marshalled
diff --git a/pylxd/network.py b/pylxd/network.py
index 5b2c5e9..2b9af32 100644
--- a/pylxd/network.py
+++ b/pylxd/network.py
@@ -11,7 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-from pylxd import model
+from pylxd.models import _model as model
class Network(model.Model):
diff --git a/pylxd/profile.py b/pylxd/profile.py
index f3c658e..c5d564c 100644
--- a/pylxd/profile.py
+++ b/pylxd/profile.py
@@ -11,7 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-from pylxd import model
+from pylxd.models import _model as model
class Profile(model.Model):
diff --git a/pylxd/tests/test_model.py b/pylxd/tests/test_model.py
index 2f6e7f1..ca5f1b7 100644
--- a/pylxd/tests/test_model.py
+++ b/pylxd/tests/test_model.py
@@ -11,7 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-from pylxd import model
+from pylxd.models import _model as model
from pylxd.tests import testing
From fb76dc8e6cc1e88794603dbe835061d481092ac4 Mon Sep 17 00:00:00 2001
From: Paul Hummer <paul.hummer at canonical.com>
Date: Wed, 7 Sep 2016 11:14:39 -0600
Subject: [PATCH 3/9] Move pylxd.certificate to pylxd.models.certificate
---
pylxd/certificate.py | 71 -----------------------------------------
pylxd/managers.py | 2 +-
pylxd/models/__init__.py | 1 +
pylxd/models/certificate.py | 71 +++++++++++++++++++++++++++++++++++++++++
pylxd/tests/test_certificate.py | 8 ++---
5 files changed, 77 insertions(+), 76 deletions(-)
delete mode 100644 pylxd/certificate.py
create mode 100644 pylxd/models/certificate.py
diff --git a/pylxd/certificate.py b/pylxd/certificate.py
deleted file mode 100644
index b2c0a6c..0000000
--- a/pylxd/certificate.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# Copyright (c) 2016 Canonical Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import binascii
-
-from cryptography import x509
-from cryptography.hazmat.backends import default_backend
-from cryptography.hazmat.primitives import hashes
-from cryptography.hazmat.primitives.serialization import Encoding
-
-from pylxd.models import _model as model
-
-
-class Certificate(model.Model):
- """A LXD certificate."""
-
- certificate = model.Attribute()
- fingerprint = model.Attribute()
- type = model.Attribute()
-
- @classmethod
- def get(cls, client, fingerprint):
- """Get a certificate by fingerprint."""
- response = client.api.certificates[fingerprint].get()
-
- return cls(client, **response.json()['metadata'])
-
- @classmethod
- def all(cls, client):
- """Get all certificates."""
- response = client.api.certificates.get()
-
- certs = []
- for cert in response.json()['metadata']:
- fingerprint = cert.split('/')[-1]
- certs.append(cls(client, fingerprint=fingerprint))
- return certs
-
- @classmethod
- def create(cls, client, password, cert_data):
- """Create a new certificate."""
- cert = x509.load_pem_x509_certificate(cert_data, default_backend())
- base64_cert = cert.public_bytes(Encoding.PEM).decode('utf-8')
- # STRIP OUT CERT META "-----BEGIN CERTIFICATE-----"
- base64_cert = '\n'.join(base64_cert.split('\n')[1:-2])
- data = {
- 'type': 'client',
- 'certificate': base64_cert,
- 'password': password,
- }
- client.api.certificates.post(json=data)
-
- # XXX: rockstar (08 Jun 2016) - Please see the open lxd bug here:
- # https://github.com/lxc/lxd/issues/2092
- fingerprint = binascii.hexlify(
- cert.fingerprint(hashes.SHA256())).decode('utf-8')
- return cls.get(client, fingerprint)
-
- @property
- def api(self):
- return self.client.api.certificates[self.fingerprint]
diff --git a/pylxd/managers.py b/pylxd/managers.py
index b8c047e..06a2849 100644
--- a/pylxd/managers.py
+++ b/pylxd/managers.py
@@ -26,7 +26,7 @@ def __init__(self, *args, **kwargs):
class CertificateManager(BaseManager):
- manager_for = 'pylxd.certificate.Certificate'
+ manager_for = 'pylxd.models.Certificate'
class ContainerManager(BaseManager):
diff --git a/pylxd/models/__init__.py b/pylxd/models/__init__.py
index e69de29..8f720a1 100644
--- a/pylxd/models/__init__.py
+++ b/pylxd/models/__init__.py
@@ -0,0 +1 @@
+from pylxd.models.certificate import Certificate # NOQA
diff --git a/pylxd/models/certificate.py b/pylxd/models/certificate.py
new file mode 100644
index 0000000..b2c0a6c
--- /dev/null
+++ b/pylxd/models/certificate.py
@@ -0,0 +1,71 @@
+# Copyright (c) 2016 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import binascii
+
+from cryptography import x509
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.serialization import Encoding
+
+from pylxd.models import _model as model
+
+
+class Certificate(model.Model):
+ """A LXD certificate."""
+
+ certificate = model.Attribute()
+ fingerprint = model.Attribute()
+ type = model.Attribute()
+
+ @classmethod
+ def get(cls, client, fingerprint):
+ """Get a certificate by fingerprint."""
+ response = client.api.certificates[fingerprint].get()
+
+ return cls(client, **response.json()['metadata'])
+
+ @classmethod
+ def all(cls, client):
+ """Get all certificates."""
+ response = client.api.certificates.get()
+
+ certs = []
+ for cert in response.json()['metadata']:
+ fingerprint = cert.split('/')[-1]
+ certs.append(cls(client, fingerprint=fingerprint))
+ return certs
+
+ @classmethod
+ def create(cls, client, password, cert_data):
+ """Create a new certificate."""
+ cert = x509.load_pem_x509_certificate(cert_data, default_backend())
+ base64_cert = cert.public_bytes(Encoding.PEM).decode('utf-8')
+ # STRIP OUT CERT META "-----BEGIN CERTIFICATE-----"
+ base64_cert = '\n'.join(base64_cert.split('\n')[1:-2])
+ data = {
+ 'type': 'client',
+ 'certificate': base64_cert,
+ 'password': password,
+ }
+ client.api.certificates.post(json=data)
+
+ # XXX: rockstar (08 Jun 2016) - Please see the open lxd bug here:
+ # https://github.com/lxc/lxd/issues/2092
+ fingerprint = binascii.hexlify(
+ cert.fingerprint(hashes.SHA256())).decode('utf-8')
+ return cls.get(client, fingerprint)
+
+ @property
+ def api(self):
+ return self.client.api.certificates[self.fingerprint]
diff --git a/pylxd/tests/test_certificate.py b/pylxd/tests/test_certificate.py
index ef5a57f..c9b0479 100644
--- a/pylxd/tests/test_certificate.py
+++ b/pylxd/tests/test_certificate.py
@@ -13,12 +13,12 @@
# under the License.
import os
-from pylxd import certificate
+from pylxd import models
from pylxd.tests import testing
class TestCertificate(testing.PyLXDTestCase):
- """Tests for pylxd.certificate.Certificate."""
+ """Tests for pylxd.models.Certificate."""
def test_get(self):
"""A certificate is retrieved."""
@@ -45,7 +45,7 @@ def test_create(self):
def test_fetch(self):
"""A partial object is fully fetched."""
- an_certificate = certificate.Certificate(
+ an_certificate = models.Certificate(
self.client, fingerprint='an-certificate')
an_certificate.sync()
@@ -56,7 +56,7 @@ def test_delete(self):
"""A certificate is deleted."""
# XXX: rockstar (08 Jun 2016) - This just executes a code path. An
# assertion should be added.
- an_certificate = certificate.Certificate(
+ an_certificate = models.Certificate(
self.client, fingerprint='an-certificate')
an_certificate.delete()
From 015eed07e275db8bb9467cf168728fe3b4bd03db Mon Sep 17 00:00:00 2001
From: Paul Hummer <paul.hummer at canonical.com>
Date: Wed, 7 Sep 2016 11:21:42 -0600
Subject: [PATCH 4/9] Move pylxd.container to pylxd.models.container
---
pylxd/container.py | 428 ------------------------------------------
pylxd/managers.py | 4 +-
pylxd/models/__init__.py | 1 +
pylxd/models/container.py | 428 ++++++++++++++++++++++++++++++++++++++++++
pylxd/tests/test_container.py | 72 +++----
5 files changed, 467 insertions(+), 466 deletions(-)
delete mode 100644 pylxd/container.py
create mode 100644 pylxd/models/container.py
diff --git a/pylxd/container.py b/pylxd/container.py
deleted file mode 100644
index 8f73a3e..0000000
--- a/pylxd/container.py
+++ /dev/null
@@ -1,428 +0,0 @@
-# Copyright (c) 2016 Canonical Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import time
-
-import six
-from six.moves.urllib import parse
-from ws4py.client import WebSocketBaseClient
-from ws4py.manager import WebSocketManager
-
-from pylxd import managers
-from pylxd.deprecation import deprecated
-from pylxd.models import _model as model
-from pylxd.operation import Operation
-
-
-class ContainerState(object):
- """A simple object for representing container state."""
-
- def __init__(self, **kwargs):
- for key, value in six.iteritems(kwargs):
- setattr(self, key, value)
-
-
-class Container(model.Model):
- """An LXD Container.
-
- This class is not intended to be used directly, but rather to be used
- via `Client.containers.create`.
- """
-
- architecture = model.Attribute()
- config = model.Attribute()
- created_at = model.Attribute()
- devices = model.Attribute()
- ephemeral = model.Attribute()
- expanded_config = model.Attribute()
- expanded_devices = model.Attribute()
- name = model.Attribute(readonly=True)
- profiles = model.Attribute()
- status = model.Attribute(readonly=True)
- last_used_at = model.Attribute(readonly=True)
-
- status_code = model.Attribute(readonly=True)
- stateful = model.Attribute(readonly=True)
-
- snapshots = model.Manager()
- files = model.Manager()
-
- @property
- def api(self):
- return self.client.api.containers[self.name]
-
- class FilesManager(object):
- """A pseudo-manager for namespacing file operations."""
-
- def __init__(self, client, container):
- self._client = client
- self._container = container
-
- def put(self, filepath, data):
- response = self._client.api.containers[
- self._container.name].files.post(
- params={'path': filepath}, data=data)
- return response.status_code == 200
-
- def get(self, filepath):
- response = self._client.api.containers[
- self._container.name].files.get(
- params={'path': filepath})
- return response.content
-
- @classmethod
- def get(cls, client, name):
- """Get a container by name."""
- response = client.api.containers[name].get()
-
- container = cls(client, **response.json()['metadata'])
- return container
-
- @classmethod
- def all(cls, client):
- """Get all containers.
-
- Containers returned from this method will only have the name
- set, as that is the only property returned from LXD. If more
- information is needed, `Container.sync` is the method call
- that should be used.
- """
- response = client.api.containers.get()
-
- containers = []
- for url in response.json()['metadata']:
- name = url.split('/')[-1]
- containers.append(cls(client, name=name))
- return containers
-
- @classmethod
- def create(cls, client, config, wait=False):
- """Create a new container config."""
- response = client.api.containers.post(json=config)
-
- if wait:
- Operation.wait_for_operation(client, response.json()['operation'])
- return cls(client, name=config['name'])
-
- def __init__(self, *args, **kwargs):
- super(Container, self).__init__(*args, **kwargs)
-
- self.snapshots = managers.SnapshotManager(self.client, self)
- self.files = self.FilesManager(self.client, self)
-
- # XXX: rockstar (28 Mar 2016) - This method was named improperly
- # originally. It's being kept here for backwards compatibility.
- reload = deprecated(
- "Container.reload is deprecated. Please use Container.sync")(
- model.Model.sync)
-
- def rename(self, name, wait=False):
- """Rename a container."""
- response = self.api.post(json={'name': name})
-
- if wait:
- Operation.wait_for_operation(
- self.client, response.json()['operation'])
- self.name = name
-
- def _set_state(self, state, timeout=30, force=True, wait=False):
- response = self.api.state.put(json={
- 'action': state,
- 'timeout': timeout,
- 'force': force
- })
- if wait:
- Operation.wait_for_operation(
- self.client, response.json()['operation'])
- self.sync()
-
- def state(self):
- response = self.api.state.get()
- state = ContainerState(**response.json()['metadata'])
- return state
-
- def start(self, timeout=30, force=True, wait=False):
- """Start the container."""
- return self._set_state('start',
- timeout=timeout,
- force=force,
- wait=wait)
-
- def stop(self, timeout=30, force=True, wait=False):
- """Stop the container."""
- return self._set_state('stop',
- timeout=timeout,
- force=force,
- wait=wait)
-
- def restart(self, timeout=30, force=True, wait=False):
- """Restart the container."""
- return self._set_state('restart',
- timeout=timeout,
- force=force,
- wait=wait)
-
- def freeze(self, timeout=30, force=True, wait=False):
- """Freeze the container."""
- return self._set_state('freeze',
- timeout=timeout,
- force=force,
- wait=wait)
-
- def unfreeze(self, timeout=30, force=True, wait=False):
- """Unfreeze the container."""
- return self._set_state('unfreeze',
- timeout=timeout,
- force=force,
- wait=wait)
-
- @deprecated('Container.snapshot is deprecated. Please use Container.snapshots.create') # NOQA
- def snapshot(self, name, stateful=False, wait=False): # pragma: no cover
- """Take a snapshot of the container."""
- self.snapshots.create(name, stateful, wait)
-
- @deprecated('Container.list_snapshots is deprecated. Please use Container.snapshots.all') # NOQA
- def list_snapshots(self): # pragma: no cover
- """List all container snapshots."""
- return [s.name for s in self.snapshots.all()]
-
- @deprecated('Container.rename_snapshot is deprecated. Please use Snapshot.rename') # NOQA
- def rename_snapshot(self, old, new, wait=False): # pragma: no cover
- """Rename a snapshot."""
- snapshot = self.snapshots.get(old)
- snapshot.rename(new, wait=wait)
-
- @deprecated('Container.delete_snapshot is deprecated. Please use Snapshot.delete') # NOQA
- def delete_snapshot(self, name, wait=False): # pragma: no cover
- """Delete a snapshot."""
- snapshot = self.snapshots.get(name)
- snapshot.delete(wait=wait)
-
- @deprecated('Container.get_file is deprecated. Please use Container.files.get') # NOQA
- def get_file(self, filepath): # pragma: no cover
- """Get a file from the container."""
- return self.files.get(filepath)
-
- @deprecated('Container.put_file is deprecated. Please use Container.files.put') # NOQA
- def put_file(self, filepath, data): # pragma: no cover
- """Put a file on the container."""
- return self.files.put(filepath, data)
-
- def execute(self, commands, environment={}):
- """Execute a command on the container."""
- if isinstance(commands, six.string_types):
- raise TypeError("First argument must be a list.")
- response = self.api['exec'].post(json={
- 'command': commands,
- 'environment': environment,
- 'wait-for-websocket': True,
- 'interactive': False,
- })
-
- fds = response.json()['metadata']['metadata']['fds']
- operation_id = response.json()['operation'].split('/')[-1]
- parsed = parse.urlparse(
- self.client.api.operations[operation_id].websocket._api_endpoint)
-
- manager = WebSocketManager()
-
- stdin = _StdinWebsocket(manager, self.client.websocket_url)
- stdin.resource = '{}?secret={}'.format(parsed.path, fds['0'])
- stdin.connect()
- stdout = _CommandWebsocketClient(manager, self.client.websocket_url)
- stdout.resource = '{}?secret={}'.format(parsed.path, fds['1'])
- stdout.connect()
- stderr = _CommandWebsocketClient(manager, self.client.websocket_url)
- stderr.resource = '{}?secret={}'.format(parsed.path, fds['2'])
- stderr.connect()
-
- manager.start()
-
- while True: # pragma: no cover
- for websocket in manager.websockets.values():
- if not websocket.terminated:
- break
- else:
- break
- time.sleep(1)
-
- return stdout.data, stderr.data
-
- def migrate(self, new_client, wait=False):
- """Migrate a container.
-
- Destination host information is contained in the client
- connection passed in.
-
- If the container is running, it either must be shut down
- first or criu must be installed on the source and destination
- machines.
- """
- self.sync() # Make sure the object isn't stale
- response = self.api.post(json={'migration': True})
- operation = self.client.operations.get(response.json()['operation'])
- operation_url = self.client.api.operations[operation.id]._api_endpoint
- secrets = response.json()['metadata']['metadata']
- cert = self.client.host_info['environment']['certificate']
-
- config = {
- 'name': self.name,
- 'architecture': self.architecture,
- 'config': self.config,
- 'devices': self.devices,
- 'epehemeral': self.ephemeral,
- 'default': self.profiles,
- 'source': {
- 'type': 'migration',
- 'operation': operation_url,
- 'mode': 'pull',
- 'certificate': cert,
- 'secrets': secrets,
- }
- }
- return new_client.containers.create(config, wait=wait)
-
- def publish(self, public=False, wait=False):
- """Publish a container as an image.
-
- The container must be stopped in order publish it as an image. This
- method does not enforce that constraint, so a LXDAPIException may be
- raised if this method is called on a running container.
-
- If wait=True, an Image is returned.
- """
- data = {
- 'public': public,
- 'source': {
- 'type': 'container',
- 'name': self.name,
- }
- }
-
- response = self.client.api.images.post(json=data)
- if wait:
- operation = Operation.wait_for_operation(
- self.client, response.json()['operation'])
-
- return self.client.images.get(operation.metadata['fingerprint'])
-
-
-class _CommandWebsocketClient(WebSocketBaseClient): # pragma: no cover
- def __init__(self, manager, *args, **kwargs):
- self.manager = manager
- super(_CommandWebsocketClient, self).__init__(*args, **kwargs)
-
- def handshake_ok(self):
- self.manager.add(self)
- self.buffer = []
-
- def received_message(self, message):
- if message.encoding:
- self.buffer.append(message.data.decode(message.encoding))
- else:
- self.buffer.append(message.data.decode('utf-8'))
-
- @property
- def data(self):
- return ''.join(self.buffer)
-
-
-class _StdinWebsocket(WebSocketBaseClient): # pragma: no cover
- """A websocket client for handling stdin."""
-
- def __init__(self, manager, *args, **kwargs):
- self.manager = manager
- super(_StdinWebsocket, self).__init__(*args, **kwargs)
-
- def handshake_ok(self):
- self.manager.add(self)
- self.close()
-
-
-class Snapshot(model.Model):
- """A container snapshot."""
-
- name = model.Attribute()
- stateful = model.Attribute()
-
- container = model.Parent()
-
- @property
- def api(self):
- return self.client.api.containers[
- self.container.name].snapshots[self.name]
-
- @classmethod
- def get(cls, client, container, name):
- response = client.api.containers[
- container.name].snapshots[name].get()
-
- snapshot = cls(
- client, container=container,
- **response.json()['metadata'])
- # Snapshot names are namespaced in LXD, as
- # container-name/snapshot-name. We hide that implementation
- # detail.
- snapshot.name = snapshot.name.split('/')[-1]
- return snapshot
-
- @classmethod
- def all(cls, client, container):
- response = client.api.containers[container.name].snapshots.get()
-
- return [cls(
- client, name=snapshot.split('/')[-1],
- container=container)
- for snapshot in response.json()['metadata']]
-
- @classmethod
- def create(cls, client, container, name, stateful=False, wait=False):
- response = client.api.containers[container.name].snapshots.post(json={
- 'name': name, 'stateful': stateful})
-
- snapshot = cls(client, container=container, name=name)
- if wait:
- Operation.wait_for_operation(client, response.json()['operation'])
- return snapshot
-
- def rename(self, new_name, wait=False):
- """Rename a snapshot."""
- response = self.api.post(json={'name': new_name})
- if wait:
- Operation.wait_for_operation(
- self.client, response.json()['operation'])
- self.name = new_name
-
- def publish(self, public=False, wait=False):
- """Publish a snapshot as an image.
-
- If wait=True, an Image is returned.
-
- This functionality is currently broken in LXD. Please see
- https://github.com/lxc/lxd/issues/2201 - The implementation
- here is mostly a guess. Once that bug is fixed, we can verify
- that this works, or file a bug to fix it appropriately.
- """
- data = {
- 'public': public,
- 'source': {
- 'type': 'snapshot',
- 'name': '{}/{}'.format(self.container.name, self.name),
- }
- }
-
- response = self.client.api.images.post(json=data)
- if wait:
- operation = Operation.wait_for_operation(
- self.client, response.json()['operation'])
- return self.client.images.get(operation.metadata['fingerprint'])
diff --git a/pylxd/managers.py b/pylxd/managers.py
index 06a2849..59bd9d5 100644
--- a/pylxd/managers.py
+++ b/pylxd/managers.py
@@ -30,7 +30,7 @@ class CertificateManager(BaseManager):
class ContainerManager(BaseManager):
- manager_for = 'pylxd.container.Container'
+ manager_for = 'pylxd.models.Container'
class ImageManager(BaseManager):
@@ -50,4 +50,4 @@ class ProfileManager(BaseManager):
class SnapshotManager(BaseManager):
- manager_for = 'pylxd.container.Snapshot'
+ manager_for = 'pylxd.models.Snapshot'
diff --git a/pylxd/models/__init__.py b/pylxd/models/__init__.py
index 8f720a1..ad30cf2 100644
--- a/pylxd/models/__init__.py
+++ b/pylxd/models/__init__.py
@@ -1 +1,2 @@
from pylxd.models.certificate import Certificate # NOQA
+from pylxd.models.container import Container, Snapshot # NOQA
diff --git a/pylxd/models/container.py b/pylxd/models/container.py
new file mode 100644
index 0000000..8f73a3e
--- /dev/null
+++ b/pylxd/models/container.py
@@ -0,0 +1,428 @@
+# Copyright (c) 2016 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import time
+
+import six
+from six.moves.urllib import parse
+from ws4py.client import WebSocketBaseClient
+from ws4py.manager import WebSocketManager
+
+from pylxd import managers
+from pylxd.deprecation import deprecated
+from pylxd.models import _model as model
+from pylxd.operation import Operation
+
+
+class ContainerState(object):
+ """A simple object for representing container state."""
+
+ def __init__(self, **kwargs):
+ for key, value in six.iteritems(kwargs):
+ setattr(self, key, value)
+
+
+class Container(model.Model):
+ """An LXD Container.
+
+ This class is not intended to be used directly, but rather to be used
+ via `Client.containers.create`.
+ """
+
+ architecture = model.Attribute()
+ config = model.Attribute()
+ created_at = model.Attribute()
+ devices = model.Attribute()
+ ephemeral = model.Attribute()
+ expanded_config = model.Attribute()
+ expanded_devices = model.Attribute()
+ name = model.Attribute(readonly=True)
+ profiles = model.Attribute()
+ status = model.Attribute(readonly=True)
+ last_used_at = model.Attribute(readonly=True)
+
+ status_code = model.Attribute(readonly=True)
+ stateful = model.Attribute(readonly=True)
+
+ snapshots = model.Manager()
+ files = model.Manager()
+
+ @property
+ def api(self):
+ return self.client.api.containers[self.name]
+
+ class FilesManager(object):
+ """A pseudo-manager for namespacing file operations."""
+
+ def __init__(self, client, container):
+ self._client = client
+ self._container = container
+
+ def put(self, filepath, data):
+ response = self._client.api.containers[
+ self._container.name].files.post(
+ params={'path': filepath}, data=data)
+ return response.status_code == 200
+
+ def get(self, filepath):
+ response = self._client.api.containers[
+ self._container.name].files.get(
+ params={'path': filepath})
+ return response.content
+
+ @classmethod
+ def get(cls, client, name):
+ """Get a container by name."""
+ response = client.api.containers[name].get()
+
+ container = cls(client, **response.json()['metadata'])
+ return container
+
+ @classmethod
+ def all(cls, client):
+ """Get all containers.
+
+ Containers returned from this method will only have the name
+ set, as that is the only property returned from LXD. If more
+ information is needed, `Container.sync` is the method call
+ that should be used.
+ """
+ response = client.api.containers.get()
+
+ containers = []
+ for url in response.json()['metadata']:
+ name = url.split('/')[-1]
+ containers.append(cls(client, name=name))
+ return containers
+
+ @classmethod
+ def create(cls, client, config, wait=False):
+ """Create a new container config."""
+ response = client.api.containers.post(json=config)
+
+ if wait:
+ Operation.wait_for_operation(client, response.json()['operation'])
+ return cls(client, name=config['name'])
+
+ def __init__(self, *args, **kwargs):
+ super(Container, self).__init__(*args, **kwargs)
+
+ self.snapshots = managers.SnapshotManager(self.client, self)
+ self.files = self.FilesManager(self.client, self)
+
+ # XXX: rockstar (28 Mar 2016) - This method was named improperly
+ # originally. It's being kept here for backwards compatibility.
+ reload = deprecated(
+ "Container.reload is deprecated. Please use Container.sync")(
+ model.Model.sync)
+
+ def rename(self, name, wait=False):
+ """Rename a container."""
+ response = self.api.post(json={'name': name})
+
+ if wait:
+ Operation.wait_for_operation(
+ self.client, response.json()['operation'])
+ self.name = name
+
+ def _set_state(self, state, timeout=30, force=True, wait=False):
+ response = self.api.state.put(json={
+ 'action': state,
+ 'timeout': timeout,
+ 'force': force
+ })
+ if wait:
+ Operation.wait_for_operation(
+ self.client, response.json()['operation'])
+ self.sync()
+
+ def state(self):
+ response = self.api.state.get()
+ state = ContainerState(**response.json()['metadata'])
+ return state
+
+ def start(self, timeout=30, force=True, wait=False):
+ """Start the container."""
+ return self._set_state('start',
+ timeout=timeout,
+ force=force,
+ wait=wait)
+
+ def stop(self, timeout=30, force=True, wait=False):
+ """Stop the container."""
+ return self._set_state('stop',
+ timeout=timeout,
+ force=force,
+ wait=wait)
+
+ def restart(self, timeout=30, force=True, wait=False):
+ """Restart the container."""
+ return self._set_state('restart',
+ timeout=timeout,
+ force=force,
+ wait=wait)
+
+ def freeze(self, timeout=30, force=True, wait=False):
+ """Freeze the container."""
+ return self._set_state('freeze',
+ timeout=timeout,
+ force=force,
+ wait=wait)
+
+ def unfreeze(self, timeout=30, force=True, wait=False):
+ """Unfreeze the container."""
+ return self._set_state('unfreeze',
+ timeout=timeout,
+ force=force,
+ wait=wait)
+
+ @deprecated('Container.snapshot is deprecated. Please use Container.snapshots.create') # NOQA
+ def snapshot(self, name, stateful=False, wait=False): # pragma: no cover
+ """Take a snapshot of the container."""
+ self.snapshots.create(name, stateful, wait)
+
+ @deprecated('Container.list_snapshots is deprecated. Please use Container.snapshots.all') # NOQA
+ def list_snapshots(self): # pragma: no cover
+ """List all container snapshots."""
+ return [s.name for s in self.snapshots.all()]
+
+ @deprecated('Container.rename_snapshot is deprecated. Please use Snapshot.rename') # NOQA
+ def rename_snapshot(self, old, new, wait=False): # pragma: no cover
+ """Rename a snapshot."""
+ snapshot = self.snapshots.get(old)
+ snapshot.rename(new, wait=wait)
+
+ @deprecated('Container.delete_snapshot is deprecated. Please use Snapshot.delete') # NOQA
+ def delete_snapshot(self, name, wait=False): # pragma: no cover
+ """Delete a snapshot."""
+ snapshot = self.snapshots.get(name)
+ snapshot.delete(wait=wait)
+
+ @deprecated('Container.get_file is deprecated. Please use Container.files.get') # NOQA
+ def get_file(self, filepath): # pragma: no cover
+ """Get a file from the container."""
+ return self.files.get(filepath)
+
+ @deprecated('Container.put_file is deprecated. Please use Container.files.put') # NOQA
+ def put_file(self, filepath, data): # pragma: no cover
+ """Put a file on the container."""
+ return self.files.put(filepath, data)
+
+ def execute(self, commands, environment={}):
+ """Execute a command on the container."""
+ if isinstance(commands, six.string_types):
+ raise TypeError("First argument must be a list.")
+ response = self.api['exec'].post(json={
+ 'command': commands,
+ 'environment': environment,
+ 'wait-for-websocket': True,
+ 'interactive': False,
+ })
+
+ fds = response.json()['metadata']['metadata']['fds']
+ operation_id = response.json()['operation'].split('/')[-1]
+ parsed = parse.urlparse(
+ self.client.api.operations[operation_id].websocket._api_endpoint)
+
+ manager = WebSocketManager()
+
+ stdin = _StdinWebsocket(manager, self.client.websocket_url)
+ stdin.resource = '{}?secret={}'.format(parsed.path, fds['0'])
+ stdin.connect()
+ stdout = _CommandWebsocketClient(manager, self.client.websocket_url)
+ stdout.resource = '{}?secret={}'.format(parsed.path, fds['1'])
+ stdout.connect()
+ stderr = _CommandWebsocketClient(manager, self.client.websocket_url)
+ stderr.resource = '{}?secret={}'.format(parsed.path, fds['2'])
+ stderr.connect()
+
+ manager.start()
+
+ while True: # pragma: no cover
+ for websocket in manager.websockets.values():
+ if not websocket.terminated:
+ break
+ else:
+ break
+ time.sleep(1)
+
+ return stdout.data, stderr.data
+
+ def migrate(self, new_client, wait=False):
+ """Migrate a container.
+
+ Destination host information is contained in the client
+ connection passed in.
+
+ If the container is running, it either must be shut down
+ first or criu must be installed on the source and destination
+ machines.
+ """
+ self.sync() # Make sure the object isn't stale
+ response = self.api.post(json={'migration': True})
+ operation = self.client.operations.get(response.json()['operation'])
+ operation_url = self.client.api.operations[operation.id]._api_endpoint
+ secrets = response.json()['metadata']['metadata']
+ cert = self.client.host_info['environment']['certificate']
+
+ config = {
+ 'name': self.name,
+ 'architecture': self.architecture,
+ 'config': self.config,
+ 'devices': self.devices,
+ 'epehemeral': self.ephemeral,
+ 'default': self.profiles,
+ 'source': {
+ 'type': 'migration',
+ 'operation': operation_url,
+ 'mode': 'pull',
+ 'certificate': cert,
+ 'secrets': secrets,
+ }
+ }
+ return new_client.containers.create(config, wait=wait)
+
+ def publish(self, public=False, wait=False):
+ """Publish a container as an image.
+
+ The container must be stopped in order publish it as an image. This
+ method does not enforce that constraint, so a LXDAPIException may be
+ raised if this method is called on a running container.
+
+ If wait=True, an Image is returned.
+ """
+ data = {
+ 'public': public,
+ 'source': {
+ 'type': 'container',
+ 'name': self.name,
+ }
+ }
+
+ response = self.client.api.images.post(json=data)
+ if wait:
+ operation = Operation.wait_for_operation(
+ self.client, response.json()['operation'])
+
+ return self.client.images.get(operation.metadata['fingerprint'])
+
+
+class _CommandWebsocketClient(WebSocketBaseClient): # pragma: no cover
+ def __init__(self, manager, *args, **kwargs):
+ self.manager = manager
+ super(_CommandWebsocketClient, self).__init__(*args, **kwargs)
+
+ def handshake_ok(self):
+ self.manager.add(self)
+ self.buffer = []
+
+ def received_message(self, message):
+ if message.encoding:
+ self.buffer.append(message.data.decode(message.encoding))
+ else:
+ self.buffer.append(message.data.decode('utf-8'))
+
+ @property
+ def data(self):
+ return ''.join(self.buffer)
+
+
+class _StdinWebsocket(WebSocketBaseClient): # pragma: no cover
+ """A websocket client for handling stdin."""
+
+ def __init__(self, manager, *args, **kwargs):
+ self.manager = manager
+ super(_StdinWebsocket, self).__init__(*args, **kwargs)
+
+ def handshake_ok(self):
+ self.manager.add(self)
+ self.close()
+
+
+class Snapshot(model.Model):
+ """A container snapshot."""
+
+ name = model.Attribute()
+ stateful = model.Attribute()
+
+ container = model.Parent()
+
+ @property
+ def api(self):
+ return self.client.api.containers[
+ self.container.name].snapshots[self.name]
+
+ @classmethod
+ def get(cls, client, container, name):
+ response = client.api.containers[
+ container.name].snapshots[name].get()
+
+ snapshot = cls(
+ client, container=container,
+ **response.json()['metadata'])
+ # Snapshot names are namespaced in LXD, as
+ # container-name/snapshot-name. We hide that implementation
+ # detail.
+ snapshot.name = snapshot.name.split('/')[-1]
+ return snapshot
+
+ @classmethod
+ def all(cls, client, container):
+ response = client.api.containers[container.name].snapshots.get()
+
+ return [cls(
+ client, name=snapshot.split('/')[-1],
+ container=container)
+ for snapshot in response.json()['metadata']]
+
+ @classmethod
+ def create(cls, client, container, name, stateful=False, wait=False):
+ response = client.api.containers[container.name].snapshots.post(json={
+ 'name': name, 'stateful': stateful})
+
+ snapshot = cls(client, container=container, name=name)
+ if wait:
+ Operation.wait_for_operation(client, response.json()['operation'])
+ return snapshot
+
+ def rename(self, new_name, wait=False):
+ """Rename a snapshot."""
+ response = self.api.post(json={'name': new_name})
+ if wait:
+ Operation.wait_for_operation(
+ self.client, response.json()['operation'])
+ self.name = new_name
+
+ def publish(self, public=False, wait=False):
+ """Publish a snapshot as an image.
+
+ If wait=True, an Image is returned.
+
+ This functionality is currently broken in LXD. Please see
+ https://github.com/lxc/lxd/issues/2201 - The implementation
+ here is mostly a guess. Once that bug is fixed, we can verify
+ that this works, or file a bug to fix it appropriately.
+ """
+ data = {
+ 'public': public,
+ 'source': {
+ 'type': 'snapshot',
+ 'name': '{}/{}'.format(self.container.name, self.name),
+ }
+ }
+
+ response = self.client.api.images.post(json=data)
+ if wait:
+ operation = Operation.wait_for_operation(
+ self.client, response.json()['operation'])
+ return self.client.images.get(operation.metadata['fingerprint'])
diff --git a/pylxd/tests/test_container.py b/pylxd/tests/test_container.py
index 0178cb7..e2044a5 100644
--- a/pylxd/tests/test_container.py
+++ b/pylxd/tests/test_container.py
@@ -2,16 +2,16 @@
import mock
-from pylxd import container, exceptions
+from pylxd import exceptions, models
from pylxd.tests import testing
class TestContainer(testing.PyLXDTestCase):
- """Tests for pylxd.container.Container."""
+ """Tests for pylxd.models.Container."""
def test_all(self):
"""A list of all containers are returned."""
- containers = container.Container.all(self.client)
+ containers = models.Container.all(self.client)
self.assertEqual(1, len(containers))
@@ -19,7 +19,7 @@ def test_get(self):
"""Return a container."""
name = 'an-container'
- an_container = container.Container.get(self.client, name)
+ an_container = models.Container.get(self.client, name)
self.assertEqual(name, an_container.name)
@@ -41,7 +41,7 @@ def not_found(request, context):
self.assertRaises(
exceptions.LXDAPIException,
- container.Container.get, self.client, name)
+ models.Container.get, self.client, name)
def test_get_error(self):
"""LXDAPIException is raised when the LXD API errors."""
@@ -61,20 +61,20 @@ def not_found(request, context):
self.assertRaises(
exceptions.LXDAPIException,
- container.Container.get, self.client, name)
+ models.Container.get, self.client, name)
def test_create(self):
"""A new container is created."""
config = {'name': 'an-new-container'}
- an_new_container = container.Container.create(
+ an_new_container = models.Container.create(
self.client, config, wait=True)
self.assertEqual(config['name'], an_new_container.name)
def test_fetch(self):
"""A sync updates the properties of a container."""
- an_container = container.Container(
+ an_container = models.Container(
self.client, name='an-container')
an_container.sync()
@@ -95,7 +95,7 @@ def not_found(request, context):
'url': r'^http://pylxd.test/1.0/containers/an-missing-container$', # NOQA
})
- an_container = container.Container(
+ an_container = models.Container(
self.client, name='an-missing-container')
self.assertRaises(exceptions.LXDAPIException, an_container.sync)
@@ -114,14 +114,14 @@ def not_found(request, context):
'url': r'^http://pylxd.test/1.0/containers/an-missing-container$', # NOQA
})
- an_container = container.Container(
+ an_container = models.Container(
self.client, name='an-missing-container')
self.assertRaises(exceptions.LXDAPIException, an_container.sync)
def test_update(self):
"""A container is updated."""
- an_container = container.Container(
+ an_container = models.Container(
self.client, name='an-container')
an_container.architecture = 1
an_container.config = {}
@@ -138,7 +138,7 @@ def test_update(self):
self.assertTrue(an_container.ephemeral)
def test_rename(self):
- an_container = container.Container(
+ an_container = models.Container(
self.client, name='an-container')
an_container.rename('an-renamed-container', wait=True)
@@ -150,13 +150,13 @@ def test_delete(self):
# XXX: rockstar (21 May 2016) - This just executes
# a code path. There should be an assertion here, but
# it's not clear how to assert that, just yet.
- an_container = container.Container(
+ an_container = models.Container(
self.client, name='an-container')
an_container.delete(wait=True)
- @mock.patch('pylxd.container._StdinWebsocket')
- @mock.patch('pylxd.container._CommandWebsocketClient')
+ @mock.patch('pylxd.models.container._StdinWebsocket')
+ @mock.patch('pylxd.models.container._CommandWebsocketClient')
def test_execute(self, _CommandWebsocketClient, _StdinWebsocket):
"""A command is executed on a container."""
fake_websocket = mock.Mock()
@@ -164,7 +164,7 @@ def test_execute(self, _CommandWebsocketClient, _StdinWebsocket):
_StdinWebsocket.return_value = fake_websocket
_CommandWebsocketClient.return_value = fake_websocket
- an_container = container.Container(
+ an_container = models.Container(
self.client, name='an-container')
stdout, _ = an_container.execute(['echo', 'test'])
@@ -173,7 +173,7 @@ def test_execute(self, _CommandWebsocketClient, _StdinWebsocket):
def test_execute_string(self):
"""A command passed as string raises a TypeError."""
- an_container = container.Container(
+ an_container = models.Container(
self.client, name='an-container')
self.assertRaises(TypeError, an_container.execute, 'apt-get update')
@@ -183,7 +183,7 @@ def test_migrate(self):
from pylxd.client import Client
client2 = Client(endpoint='http://pylxd2.test')
- an_container = container.Container(
+ an_container = models.Container(
self.client, name='an-container')
an_migrated_container = an_container.migrate(client2)
@@ -207,7 +207,7 @@ def test_publish(self):
'url': r'^http://pylxd.test/1.0/operations/operation-abc$',
})
- an_container = container.Container(
+ an_container = models.Container(
self.client, name='an-container')
image = an_container.publish(wait=True)
@@ -218,13 +218,13 @@ def test_publish(self):
class TestContainerState(testing.PyLXDTestCase):
- """Tests for pylxd.container.ContainerState."""
+ """Tests for pylxd.models.ContainerState."""
def test_get(self):
"""Return a container."""
name = 'an-container'
- an_container = container.Container.get(self.client, name)
+ an_container = models.Container.get(self.client, name)
state = an_container.state()
self.assertEqual('Running', state.status)
@@ -232,41 +232,41 @@ def test_get(self):
def test_start(self):
"""A container is started."""
- an_container = container.Container.get(self.client, 'an-container')
+ an_container = models.Container.get(self.client, 'an-container')
an_container.start(wait=True)
def test_stop(self):
"""A container is stopped."""
- an_container = container.Container.get(self.client, 'an-container')
+ an_container = models.Container.get(self.client, 'an-container')
an_container.stop()
def test_restart(self):
"""A container is restarted."""
- an_container = container.Container.get(self.client, 'an-container')
+ an_container = models.Container.get(self.client, 'an-container')
an_container.restart()
def test_freeze(self):
"""A container is suspended."""
- an_container = container.Container.get(self.client, 'an-container')
+ an_container = models.Container.get(self.client, 'an-container')
an_container.freeze()
def test_unfreeze(self):
"""A container is resumed."""
- an_container = container.Container.get(self.client, 'an-container')
+ an_container = models.Container.get(self.client, 'an-container')
an_container.unfreeze()
class TestContainerSnapshots(testing.PyLXDTestCase):
- """Tests for pylxd.container.Container.snapshots."""
+ """Tests for pylxd.models.Container.snapshots."""
def setUp(self):
super(TestContainerSnapshots, self).setUp()
- self.container = container.Container.get(self.client, 'an-container')
+ self.container = models.Container.get(self.client, 'an-container')
def test_get(self):
"""Return a specific snapshot."""
@@ -292,15 +292,15 @@ def test_create(self):
class TestSnapshot(testing.PyLXDTestCase):
- """Tests for pylxd.container.Snapshot."""
+ """Tests for pylxd.models.Snapshot."""
def setUp(self):
super(TestSnapshot, self).setUp()
- self.container = container.Container.get(self.client, 'an-container')
+ self.container = models.Container.get(self.client, 'an-container')
def test_rename(self):
"""A snapshot is renamed."""
- snapshot = container.Snapshot(
+ snapshot = models.Snapshot(
self.client, container=self.container,
name='an-snapshot')
@@ -310,7 +310,7 @@ def test_rename(self):
def test_delete(self):
"""A snapshot is deleted."""
- snapshot = container.Snapshot(
+ snapshot = models.Snapshot(
self.client, container=self.container,
name='an-snapshot')
@@ -332,7 +332,7 @@ def not_found(request, context):
'url': r'^http://pylxd.test/1.0/containers/an-container/snapshots/an-snapshot$', # NOQA
})
- snapshot = container.Snapshot(
+ snapshot = models.Snapshot(
self.client, container=self.container,
name='an-snapshot')
@@ -354,7 +354,7 @@ def test_publish(self):
'url': r'^http://pylxd.test/1.0/operations/operation-abc$',
})
- snapshot = container.Snapshot(
+ snapshot = models.Snapshot(
self.client, container=self.container,
name='an-snapshot')
@@ -366,11 +366,11 @@ def test_publish(self):
class TestFiles(testing.PyLXDTestCase):
- """Tests for pylxd.container.Container.files."""
+ """Tests for pylxd.models.Container.files."""
def setUp(self):
super(TestFiles, self).setUp()
- self.container = container.Container.get(self.client, 'an-container')
+ self.container = models.Container.get(self.client, 'an-container')
def test_put(self):
"""A file is put on the container."""
From 9f4becc932366f6fddbd7f064ddab2a8bc937b6a Mon Sep 17 00:00:00 2001
From: Paul Hummer <paul.hummer at canonical.com>
Date: Wed, 7 Sep 2016 11:24:24 -0600
Subject: [PATCH 5/9] Move pylxd.image to pylxd.models.image
---
pylxd/image.py | 211 ----------------------------------------------
pylxd/managers.py | 2 +-
pylxd/models/__init__.py | 1 +
pylxd/models/image.py | 211 ++++++++++++++++++++++++++++++++++++++++++++++
pylxd/tests/test_image.py | 22 ++---
5 files changed, 224 insertions(+), 223 deletions(-)
delete mode 100644 pylxd/image.py
create mode 100644 pylxd/models/image.py
diff --git a/pylxd/image.py b/pylxd/image.py
deleted file mode 100644
index 00b0479..0000000
--- a/pylxd/image.py
+++ /dev/null
@@ -1,211 +0,0 @@
-# Copyright (c) 2016 Canonical Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import hashlib
-
-from pylxd.models import _model as model
-from pylxd.operation import Operation
-
-
-def _image_create_from_config(client, config, wait=False):
- """ Create an image from the given configuration.
-
- See: https://github.com/lxc/lxd/blob/master/doc/rest-api.md#post-6
- """
- response = client.api.images.post(json=config)
- if wait:
- Operation.wait_for_operation(client, response.json()['operation'])
-
- return Operation.get(client, response.json()['operation'])
-
- return response.json()['operation']
-
-
-class Image(model.Model):
- """A LXD Image."""
- aliases = model.Attribute(readonly=True)
- auto_update = model.Attribute(optional=True)
- architecture = model.Attribute(readonly=True)
- cached = model.Attribute(readonly=True)
- created_at = model.Attribute(readonly=True)
- expires_at = model.Attribute(readonly=True)
- filename = model.Attribute(readonly=True)
- fingerprint = model.Attribute(readonly=True)
- last_used_at = model.Attribute(readonly=True)
- properties = model.Attribute()
- public = model.Attribute()
- size = model.Attribute(readonly=True)
- uploaded_at = model.Attribute(readonly=True)
- update_source = model.Attribute(readonly=True)
-
- @property
- def api(self):
- return self.client.api.images[self.fingerprint]
-
- @classmethod
- def get(cls, client, fingerprint):
- """Get an image."""
- response = client.api.images[fingerprint].get()
-
- image = cls(client, **response.json()['metadata'])
- return image
-
- @classmethod
- def get_by_alias(cls, client, alias):
- """Get an image by its alias."""
- response = client.api.images.aliases[alias].get()
-
- fingerprint = response.json()['metadata']['target']
- return cls.get(client, fingerprint)
-
- @classmethod
- def all(cls, client):
- """Get all images."""
- response = client.api.images.get()
-
- images = []
- for url in response.json()['metadata']:
- fingerprint = url.split('/')[-1]
- images.append(cls(client, fingerprint=fingerprint))
- return images
-
- @classmethod
- def create(cls, client, image_data, public=False, wait=False):
- """Create an image."""
- fingerprint = hashlib.sha256(image_data).hexdigest()
-
- headers = {}
- if public:
- headers['X-LXD-Public'] = '1'
- response = client.api.images.post(
- data=image_data, headers=headers)
-
- if wait:
- Operation.wait_for_operation(client, response.json()['operation'])
- return cls(client, fingerprint=fingerprint)
-
- @classmethod
- def create_from_simplestreams(cls, client, server, alias,
- public=False, auto_update=False):
- """ Copy an image from simplestreams.
- """
- config = {
- 'public': public,
- 'auto_update': auto_update,
-
- 'source': {
- 'type': 'image',
- 'mode': 'pull',
- 'server': server,
- 'protocol': 'simplestreams',
- 'fingerprint': alias
- }
- }
-
- op = _image_create_from_config(client, config, wait=True)
-
- return client.images.get(op.metadata['fingerprint'])
-
- @classmethod
- def create_from_url(cls, client, url,
- public=False, auto_update=False):
- """ Copy an image from an url.
- """
- config = {
- 'public': public,
- 'auto_update': auto_update,
-
- 'source': {
- 'type': 'url',
- 'mode': 'pull',
- 'url': url
- }
- }
-
- op = _image_create_from_config(client, config, wait=True)
-
- return client.images.get(op.metadata['fingerprint'])
-
- def export(self):
- """Export the image."""
- response = self.api.export.get()
- return response.content
-
- def add_alias(self, name, description):
- """Add an alias to the image."""
- self.client.api.images.aliases.post(json={
- 'description': description,
- 'target': self.fingerprint,
- 'name': name
- })
-
- # Update current aliases list
- self.aliases.append({
- 'description': description,
- 'target': self.fingerprint,
- 'name': name
- })
-
- def delete_alias(self, name):
- """Delete an alias from the image."""
- self.client.api.images.aliases[name].delete()
-
- # Update current aliases list
- la = [a['name'] for a in self.aliases]
- try:
- del self.aliases[la.index(name)]
- except ValueError:
- pass
-
- def copy(self, new_client, public=None, auto_update=None, wait=False):
- """ Copy an image to a another LXD.
-
- Destination host information is contained in the client
- connection passed in.
- """
- self.sync() # Make sure the object isn't stale
-
- url = '/'.join(self.client.api._api_endpoint.split('/')[:-1])
-
- if public is None:
- public = self.public
-
- if auto_update is None:
- auto_update = self.auto_update
-
- config = {
- 'filename': self.filename,
- 'public': public,
- 'auto_update': auto_update,
- 'properties': self.properties,
-
- 'source': {
- 'type': 'image',
- 'mode': 'pull',
- 'server': url,
- 'protocol': 'lxd',
- 'fingerprint': self.fingerprint
- }
- }
-
- if self.public is not True:
- response = self.api.secret.post(json={})
- secret = response.json()['metadata']['metadata']['secret']
- config['source']['secret'] = secret
- cert = self.client.host_info['environment']['certificate']
- config['source']['certificate'] = cert
-
- _image_create_from_config(new_client, config, wait)
-
- if wait:
- return new_client.images.get(self.fingerprint)
diff --git a/pylxd/managers.py b/pylxd/managers.py
index 59bd9d5..394f678 100644
--- a/pylxd/managers.py
+++ b/pylxd/managers.py
@@ -34,7 +34,7 @@ class ContainerManager(BaseManager):
class ImageManager(BaseManager):
- manager_for = 'pylxd.image.Image'
+ manager_for = 'pylxd.models.Image'
class NetworkManager(BaseManager):
diff --git a/pylxd/models/__init__.py b/pylxd/models/__init__.py
index ad30cf2..54e8943 100644
--- a/pylxd/models/__init__.py
+++ b/pylxd/models/__init__.py
@@ -1,2 +1,3 @@
from pylxd.models.certificate import Certificate # NOQA
from pylxd.models.container import Container, Snapshot # NOQA
+from pylxd.models.image import Image # NOQA
diff --git a/pylxd/models/image.py b/pylxd/models/image.py
new file mode 100644
index 0000000..00b0479
--- /dev/null
+++ b/pylxd/models/image.py
@@ -0,0 +1,211 @@
+# Copyright (c) 2016 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import hashlib
+
+from pylxd.models import _model as model
+from pylxd.operation import Operation
+
+
+def _image_create_from_config(client, config, wait=False):
+ """ Create an image from the given configuration.
+
+ See: https://github.com/lxc/lxd/blob/master/doc/rest-api.md#post-6
+ """
+ response = client.api.images.post(json=config)
+ if wait:
+ Operation.wait_for_operation(client, response.json()['operation'])
+
+ return Operation.get(client, response.json()['operation'])
+
+ return response.json()['operation']
+
+
+class Image(model.Model):
+ """A LXD Image."""
+ aliases = model.Attribute(readonly=True)
+ auto_update = model.Attribute(optional=True)
+ architecture = model.Attribute(readonly=True)
+ cached = model.Attribute(readonly=True)
+ created_at = model.Attribute(readonly=True)
+ expires_at = model.Attribute(readonly=True)
+ filename = model.Attribute(readonly=True)
+ fingerprint = model.Attribute(readonly=True)
+ last_used_at = model.Attribute(readonly=True)
+ properties = model.Attribute()
+ public = model.Attribute()
+ size = model.Attribute(readonly=True)
+ uploaded_at = model.Attribute(readonly=True)
+ update_source = model.Attribute(readonly=True)
+
+ @property
+ def api(self):
+ return self.client.api.images[self.fingerprint]
+
+ @classmethod
+ def get(cls, client, fingerprint):
+ """Get an image."""
+ response = client.api.images[fingerprint].get()
+
+ image = cls(client, **response.json()['metadata'])
+ return image
+
+ @classmethod
+ def get_by_alias(cls, client, alias):
+ """Get an image by its alias."""
+ response = client.api.images.aliases[alias].get()
+
+ fingerprint = response.json()['metadata']['target']
+ return cls.get(client, fingerprint)
+
+ @classmethod
+ def all(cls, client):
+ """Get all images."""
+ response = client.api.images.get()
+
+ images = []
+ for url in response.json()['metadata']:
+ fingerprint = url.split('/')[-1]
+ images.append(cls(client, fingerprint=fingerprint))
+ return images
+
+ @classmethod
+ def create(cls, client, image_data, public=False, wait=False):
+ """Create an image."""
+ fingerprint = hashlib.sha256(image_data).hexdigest()
+
+ headers = {}
+ if public:
+ headers['X-LXD-Public'] = '1'
+ response = client.api.images.post(
+ data=image_data, headers=headers)
+
+ if wait:
+ Operation.wait_for_operation(client, response.json()['operation'])
+ return cls(client, fingerprint=fingerprint)
+
+ @classmethod
+ def create_from_simplestreams(cls, client, server, alias,
+ public=False, auto_update=False):
+ """ Copy an image from simplestreams.
+ """
+ config = {
+ 'public': public,
+ 'auto_update': auto_update,
+
+ 'source': {
+ 'type': 'image',
+ 'mode': 'pull',
+ 'server': server,
+ 'protocol': 'simplestreams',
+ 'fingerprint': alias
+ }
+ }
+
+ op = _image_create_from_config(client, config, wait=True)
+
+ return client.images.get(op.metadata['fingerprint'])
+
+ @classmethod
+ def create_from_url(cls, client, url,
+ public=False, auto_update=False):
+ """ Copy an image from an url.
+ """
+ config = {
+ 'public': public,
+ 'auto_update': auto_update,
+
+ 'source': {
+ 'type': 'url',
+ 'mode': 'pull',
+ 'url': url
+ }
+ }
+
+ op = _image_create_from_config(client, config, wait=True)
+
+ return client.images.get(op.metadata['fingerprint'])
+
+ def export(self):
+ """Export the image."""
+ response = self.api.export.get()
+ return response.content
+
+ def add_alias(self, name, description):
+ """Add an alias to the image."""
+ self.client.api.images.aliases.post(json={
+ 'description': description,
+ 'target': self.fingerprint,
+ 'name': name
+ })
+
+ # Update current aliases list
+ self.aliases.append({
+ 'description': description,
+ 'target': self.fingerprint,
+ 'name': name
+ })
+
+ def delete_alias(self, name):
+ """Delete an alias from the image."""
+ self.client.api.images.aliases[name].delete()
+
+ # Update current aliases list
+ la = [a['name'] for a in self.aliases]
+ try:
+ del self.aliases[la.index(name)]
+ except ValueError:
+ pass
+
+ def copy(self, new_client, public=None, auto_update=None, wait=False):
+ """ Copy an image to a another LXD.
+
+ Destination host information is contained in the client
+ connection passed in.
+ """
+ self.sync() # Make sure the object isn't stale
+
+ url = '/'.join(self.client.api._api_endpoint.split('/')[:-1])
+
+ if public is None:
+ public = self.public
+
+ if auto_update is None:
+ auto_update = self.auto_update
+
+ config = {
+ 'filename': self.filename,
+ 'public': public,
+ 'auto_update': auto_update,
+ 'properties': self.properties,
+
+ 'source': {
+ 'type': 'image',
+ 'mode': 'pull',
+ 'server': url,
+ 'protocol': 'lxd',
+ 'fingerprint': self.fingerprint
+ }
+ }
+
+ if self.public is not True:
+ response = self.api.secret.post(json={})
+ secret = response.json()['metadata']['metadata']['secret']
+ config['source']['secret'] = secret
+ cert = self.client.host_info['environment']['certificate']
+ config['source']['certificate'] = cert
+
+ _image_create_from_config(new_client, config, wait)
+
+ if wait:
+ return new_client.images.get(self.fingerprint)
diff --git a/pylxd/tests/test_image.py b/pylxd/tests/test_image.py
index c9cb6a8..26e7b21 100644
--- a/pylxd/tests/test_image.py
+++ b/pylxd/tests/test_image.py
@@ -1,17 +1,17 @@
import hashlib
import json
-from pylxd import exceptions, image
+from pylxd import exceptions, models
from pylxd.tests import testing
class TestImage(testing.PyLXDTestCase):
- """Tests for pylxd.image.Image."""
+ """Tests for pylxd.models.Image."""
def test_get(self):
"""An image is fetched."""
fingerprint = hashlib.sha256(b'').hexdigest()
- a_image = image.Image.get(self.client, fingerprint)
+ a_image = models.Image.get(self.client, fingerprint)
self.assertEqual(fingerprint, a_image.fingerprint)
@@ -33,7 +33,7 @@ def not_found(request, context):
self.assertRaises(
exceptions.LXDAPIException,
- image.Image.get, self.client, fingerprint)
+ models.Image.get, self.client, fingerprint)
def test_get_error(self):
"""LXDAPIException is raised on error."""
@@ -53,27 +53,27 @@ def error(request, context):
self.assertRaises(
exceptions.LXDAPIException,
- image.Image.get, self.client, fingerprint)
+ models.Image.get, self.client, fingerprint)
def test_get_by_alias(self):
fingerprint = hashlib.sha256(b'').hexdigest()
- a_image = image.Image.get_by_alias(self.client, 'an-alias')
+ a_image = models.Image.get_by_alias(self.client, 'an-alias')
self.assertEqual(fingerprint, a_image.fingerprint)
def test_all(self):
"""A list of all images is returned."""
- images = image.Image.all(self.client)
+ images = models.Image.all(self.client)
self.assertEqual(1, len(images))
def test_create(self):
"""An image is created."""
fingerprint = hashlib.sha256(b'').hexdigest()
- a_image = image.Image.create(self.client, b'', public=True, wait=True)
+ a_image = models.Image.create(self.client, b'', public=True, wait=True)
- self.assertIsInstance(a_image, image.Image)
+ self.assertIsInstance(a_image, models.Image)
self.assertEqual(fingerprint, a_image.fingerprint)
def test_update(self):
@@ -106,7 +106,7 @@ def not_found(request, context):
})
fingerprint = hashlib.sha256(b'').hexdigest()
- a_image = image.Image(self.client, fingerprint=fingerprint)
+ a_image = models.Image(self.client, fingerprint=fingerprint)
self.assertRaises(exceptions.LXDAPIException, a_image.sync)
@@ -125,7 +125,7 @@ def not_found(request, context):
})
fingerprint = hashlib.sha256(b'').hexdigest()
- a_image = image.Image(self.client, fingerprint=fingerprint)
+ a_image = models.Image(self.client, fingerprint=fingerprint)
self.assertRaises(exceptions.LXDAPIException, a_image.sync)
From 2f70c93a54205543059d7bba8c085801b6016068 Mon Sep 17 00:00:00 2001
From: Paul Hummer <paul.hummer at canonical.com>
Date: Wed, 7 Sep 2016 11:34:13 -0600
Subject: [PATCH 6/9] Move pylxd.network to pylxd.models.network
---
pylxd/managers.py | 2 +-
pylxd/models/__init__.py | 1 +
pylxd/models/network.py | 52 +++++++++++++++++++++++++++++++++++++++++++++
pylxd/network.py | 52 ---------------------------------------------
pylxd/tests/test_network.py | 14 ++++++------
5 files changed, 61 insertions(+), 60 deletions(-)
create mode 100644 pylxd/models/network.py
delete mode 100644 pylxd/network.py
diff --git a/pylxd/managers.py b/pylxd/managers.py
index 394f678..9a90c77 100644
--- a/pylxd/managers.py
+++ b/pylxd/managers.py
@@ -38,7 +38,7 @@ class ImageManager(BaseManager):
class NetworkManager(BaseManager):
- manager_for = 'pylxd.network.Network'
+ manager_for = 'pylxd.models.Network'
class OperationManager(BaseManager):
diff --git a/pylxd/models/__init__.py b/pylxd/models/__init__.py
index 54e8943..55e32c5 100644
--- a/pylxd/models/__init__.py
+++ b/pylxd/models/__init__.py
@@ -1,3 +1,4 @@
from pylxd.models.certificate import Certificate # NOQA
from pylxd.models.container import Container, Snapshot # NOQA
from pylxd.models.image import Image # NOQA
+from pylxd.models.network import Network # NOQA
diff --git a/pylxd/models/network.py b/pylxd/models/network.py
new file mode 100644
index 0000000..2b9af32
--- /dev/null
+++ b/pylxd/models/network.py
@@ -0,0 +1,52 @@
+# Copyright (c) 2016 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from pylxd.models import _model as model
+
+
+class Network(model.Model):
+ """A LXD network."""
+ name = model.Attribute()
+ type = model.Attribute()
+ used_by = model.Attribute()
+
+ @classmethod
+ def get(cls, client, name):
+ """Get a network by name."""
+ response = client.api.networks[name].get()
+
+ network = cls(client, **response.json()['metadata'])
+ return network
+
+ @classmethod
+ def all(cls, client):
+ """Get all networks."""
+ response = client.api.networks.get()
+
+ networks = []
+ for url in response.json()['metadata']:
+ name = url.split('/')[-1]
+ networks.append(cls(client, name=name))
+ return networks
+
+ @property
+ def api(self):
+ return self.client.api.networks[self.name]
+
+ def save(self, wait=False):
+ """Save is not available for networks."""
+ raise NotImplementedError('save is not implemented')
+
+ def delete(self):
+ """Delete is not available for networks."""
+ raise NotImplementedError('delete is not implemented')
diff --git a/pylxd/network.py b/pylxd/network.py
deleted file mode 100644
index 2b9af32..0000000
--- a/pylxd/network.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# Copyright (c) 2016 Canonical Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from pylxd.models import _model as model
-
-
-class Network(model.Model):
- """A LXD network."""
- name = model.Attribute()
- type = model.Attribute()
- used_by = model.Attribute()
-
- @classmethod
- def get(cls, client, name):
- """Get a network by name."""
- response = client.api.networks[name].get()
-
- network = cls(client, **response.json()['metadata'])
- return network
-
- @classmethod
- def all(cls, client):
- """Get all networks."""
- response = client.api.networks.get()
-
- networks = []
- for url in response.json()['metadata']:
- name = url.split('/')[-1]
- networks.append(cls(client, name=name))
- return networks
-
- @property
- def api(self):
- return self.client.api.networks[self.name]
-
- def save(self, wait=False):
- """Save is not available for networks."""
- raise NotImplementedError('save is not implemented')
-
- def delete(self):
- """Delete is not available for networks."""
- raise NotImplementedError('delete is not implemented')
diff --git a/pylxd/tests/test_network.py b/pylxd/tests/test_network.py
index f7743cd..c815438 100644
--- a/pylxd/tests/test_network.py
+++ b/pylxd/tests/test_network.py
@@ -11,16 +11,16 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-from pylxd import network
+from pylxd import models
from pylxd.tests import testing
class TestNetwork(testing.PyLXDTestCase):
- """Tests for pylxd.network.Network."""
+ """Tests for pylxd.models.Network."""
def test_all(self):
"""A list of all networks are returned."""
- networks = network.Network.all(self.client)
+ networks = models.Network.all(self.client)
self.assertEqual(1, len(networks))
@@ -28,26 +28,26 @@ def test_get(self):
"""Return a container."""
name = 'lo'
- an_network = network.Network.get(self.client, name)
+ an_network = models.Network.get(self.client, name)
self.assertEqual(name, an_network.name)
def test_partial(self):
"""A partial network is synced."""
- an_network = network.Network(self.client, name='lo')
+ an_network = models.Network(self.client, name='lo')
self.assertEqual('loopback', an_network.type)
def test_delete(self):
"""delete is not implemented in networks."""
- an_network = network.Network(self.client, name='lo')
+ an_network = models.Network(self.client, name='lo')
with self.assertRaises(NotImplementedError):
an_network.delete()
def test_save(self):
"""save is not implemented in networks."""
- an_network = network.Network(self.client, name='lo')
+ an_network = models.Network(self.client, name='lo')
with self.assertRaises(NotImplementedError):
an_network.save()
From ea04efb27e538ea497b2c3b42c85061c758b5aff Mon Sep 17 00:00:00 2001
From: Paul Hummer <paul.hummer at canonical.com>
Date: Wed, 7 Sep 2016 11:43:43 -0600
Subject: [PATCH 7/9] Move pylxd.operation to pylxd.models.operation
---
pylxd/managers.py | 2 +-
pylxd/models/__init__.py | 1 +
pylxd/models/_model.py | 2 +-
pylxd/models/container.py | 2 +-
pylxd/models/image.py | 2 +-
pylxd/models/operation.py | 54 +++++++++++++++++++++++++++++++++++++++++++
pylxd/operation.py | 54 -------------------------------------------
pylxd/tests/test_operation.py | 10 ++++----
8 files changed, 64 insertions(+), 63 deletions(-)
create mode 100644 pylxd/models/operation.py
delete mode 100644 pylxd/operation.py
diff --git a/pylxd/managers.py b/pylxd/managers.py
index 9a90c77..0138b76 100644
--- a/pylxd/managers.py
+++ b/pylxd/managers.py
@@ -42,7 +42,7 @@ class NetworkManager(BaseManager):
class OperationManager(BaseManager):
- manager_for = 'pylxd.operation.Operation'
+ manager_for = 'pylxd.models.Operation'
class ProfileManager(BaseManager):
diff --git a/pylxd/models/__init__.py b/pylxd/models/__init__.py
index 55e32c5..2c3252d 100644
--- a/pylxd/models/__init__.py
+++ b/pylxd/models/__init__.py
@@ -2,3 +2,4 @@
from pylxd.models.container import Container, Snapshot # NOQA
from pylxd.models.image import Image # NOQA
from pylxd.models.network import Network # NOQA
+from pylxd.models.operation import Operation # NOQA
diff --git a/pylxd/models/_model.py b/pylxd/models/_model.py
index 3ca5ebf..f95e05b 100644
--- a/pylxd/models/_model.py
+++ b/pylxd/models/_model.py
@@ -16,7 +16,7 @@
import six
from pylxd.deprecation import deprecated
-from pylxd.operation import Operation
+from pylxd.models.operation import Operation
class Attribute(object):
diff --git a/pylxd/models/container.py b/pylxd/models/container.py
index 8f73a3e..b0f8c8b 100644
--- a/pylxd/models/container.py
+++ b/pylxd/models/container.py
@@ -21,7 +21,7 @@
from pylxd import managers
from pylxd.deprecation import deprecated
from pylxd.models import _model as model
-from pylxd.operation import Operation
+from pylxd.models.operation import Operation
class ContainerState(object):
diff --git a/pylxd/models/image.py b/pylxd/models/image.py
index 00b0479..fbf2e43 100644
--- a/pylxd/models/image.py
+++ b/pylxd/models/image.py
@@ -14,7 +14,7 @@
import hashlib
from pylxd.models import _model as model
-from pylxd.operation import Operation
+from pylxd.models.operation import Operation
def _image_create_from_config(client, config, wait=False):
diff --git a/pylxd/models/operation.py b/pylxd/models/operation.py
new file mode 100644
index 0000000..1594680
--- /dev/null
+++ b/pylxd/models/operation.py
@@ -0,0 +1,54 @@
+# Copyright (c) 2016 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from pylxd import exceptions
+
+
+class Operation(object):
+ """A LXD operation."""
+
+ __slots__ = [
+ '_client',
+ 'class', 'created_at', 'err', 'id', 'may_cancel', 'metadata',
+ 'resources', 'status', 'status_code', 'updated_at']
+
+ @classmethod
+ def wait_for_operation(cls, client, operation_id):
+ """Get an operation and wait for it to complete."""
+ operation = cls.get(client, operation_id)
+ operation.wait()
+ return cls.get(client, operation.id)
+
+ @classmethod
+ def get(cls, client, operation_id):
+ """Get an operation."""
+ if operation_id.startswith('/'):
+ operation_id = operation_id.split('/')[-1]
+ response = client.api.operations[operation_id].get()
+ return cls(_client=client, **response.json()['metadata'])
+
+ def __init__(self, **kwargs):
+ super(Operation, self).__init__()
+ for key, value in kwargs.items():
+ setattr(self, key, value)
+
+ def wait(self):
+ """Wait for the operation to complete and return."""
+ response = self._client.api.operations[self.id].wait.get()
+
+ try:
+ if response.json()['metadata']['status'] == 'Failure':
+ raise exceptions.LXDAPIException(response)
+ except KeyError:
+ # Support for legacy LXD
+ pass
diff --git a/pylxd/operation.py b/pylxd/operation.py
deleted file mode 100644
index 1594680..0000000
--- a/pylxd/operation.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright (c) 2016 Canonical Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from pylxd import exceptions
-
-
-class Operation(object):
- """A LXD operation."""
-
- __slots__ = [
- '_client',
- 'class', 'created_at', 'err', 'id', 'may_cancel', 'metadata',
- 'resources', 'status', 'status_code', 'updated_at']
-
- @classmethod
- def wait_for_operation(cls, client, operation_id):
- """Get an operation and wait for it to complete."""
- operation = cls.get(client, operation_id)
- operation.wait()
- return cls.get(client, operation.id)
-
- @classmethod
- def get(cls, client, operation_id):
- """Get an operation."""
- if operation_id.startswith('/'):
- operation_id = operation_id.split('/')[-1]
- response = client.api.operations[operation_id].get()
- return cls(_client=client, **response.json()['metadata'])
-
- def __init__(self, **kwargs):
- super(Operation, self).__init__()
- for key, value in kwargs.items():
- setattr(self, key, value)
-
- def wait(self):
- """Wait for the operation to complete and return."""
- response = self._client.api.operations[self.id].wait.get()
-
- try:
- if response.json()['metadata']['status'] == 'Failure':
- raise exceptions.LXDAPIException(response)
- except KeyError:
- # Support for legacy LXD
- pass
diff --git a/pylxd/tests/test_operation.py b/pylxd/tests/test_operation.py
index b250af9..8c74cc9 100644
--- a/pylxd/tests/test_operation.py
+++ b/pylxd/tests/test_operation.py
@@ -12,18 +12,18 @@
# License for the specific language governing permissions and limitations
# under the License.
-from pylxd import exceptions, operation
+from pylxd import exceptions, models
from pylxd.tests import testing
class TestOperation(testing.PyLXDTestCase):
- """Tests for pylxd.operation.Operation."""
+ """Tests for pylxd.models.Operation."""
def test_get(self):
"""Return an operation."""
name = 'operation-abc'
- an_operation = operation.Operation.get(self.client, name)
+ an_operation = models.Operation.get(self.client, name)
self.assertEqual(name, an_operation.id)
@@ -31,7 +31,7 @@ def test_get_full_path(self):
"""Return an operation even if the full path is specified."""
name = '/1.0/operations/operation-abc'
- an_operation = operation.Operation.get(self.client, name)
+ an_operation = models.Operation.get(self.client, name)
self.assertEqual('operation-abc', an_operation.id)
@@ -53,6 +53,6 @@ def error(request, context):
name = '/1.0/operations/operation-abc'
- an_operation = operation.Operation.get(self.client, name)
+ an_operation = models.Operation.get(self.client, name)
self.assertRaises(exceptions.LXDAPIException, an_operation.wait)
From 7b597535c0fd33488c321bac40122c3b5147a0f4 Mon Sep 17 00:00:00 2001
From: Paul Hummer <paul.hummer at canonical.com>
Date: Wed, 7 Sep 2016 11:47:36 -0600
Subject: [PATCH 8/9] Move pylxd.profile to pylxd.models.profile
---
pylxd/managers.py | 2 +-
pylxd/models/__init__.py | 1 +
pylxd/models/profile.py | 61 +++++++++++++++++++++++++++++++++++++++++++++
pylxd/profile.py | 61 ---------------------------------------------
pylxd/tests/test_profile.py | 24 +++++++++---------
5 files changed, 75 insertions(+), 74 deletions(-)
create mode 100644 pylxd/models/profile.py
delete mode 100644 pylxd/profile.py
diff --git a/pylxd/managers.py b/pylxd/managers.py
index 0138b76..6a06971 100644
--- a/pylxd/managers.py
+++ b/pylxd/managers.py
@@ -46,7 +46,7 @@ class OperationManager(BaseManager):
class ProfileManager(BaseManager):
- manager_for = 'pylxd.profile.Profile'
+ manager_for = 'pylxd.models.Profile'
class SnapshotManager(BaseManager):
diff --git a/pylxd/models/__init__.py b/pylxd/models/__init__.py
index 2c3252d..725fb66 100644
--- a/pylxd/models/__init__.py
+++ b/pylxd/models/__init__.py
@@ -3,3 +3,4 @@
from pylxd.models.image import Image # NOQA
from pylxd.models.network import Network # NOQA
from pylxd.models.operation import Operation # NOQA
+from pylxd.models.profile import Profile # NOQA
diff --git a/pylxd/models/profile.py b/pylxd/models/profile.py
new file mode 100644
index 0000000..c5d564c
--- /dev/null
+++ b/pylxd/models/profile.py
@@ -0,0 +1,61 @@
+# Copyright (c) 2016 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from pylxd.models import _model as model
+
+
+class Profile(model.Model):
+ """A LXD profile."""
+
+ name = model.Attribute(readonly=True)
+ description = model.Attribute()
+ config = model.Attribute()
+ devices = model.Attribute()
+
+ @classmethod
+ def get(cls, client, name):
+ """Get a profile."""
+ response = client.api.profiles[name].get()
+ return cls(client, **response.json()['metadata'])
+
+ @classmethod
+ def all(cls, client):
+ """Get all profiles."""
+ response = client.api.profiles.get()
+
+ profiles = []
+ for url in response.json()['metadata']:
+ name = url.split('/')[-1]
+ profiles.append(cls(client, name=name))
+ return profiles
+
+ @classmethod
+ def create(cls, client, name, config=None, devices=None):
+ """Create a profile."""
+ profile = {'name': name}
+ if config is not None:
+ profile['config'] = config
+ if devices is not None:
+ profile['devices'] = devices
+ client.api.profiles.post(json=profile)
+ return cls.get(client, name)
+
+ @property
+ def api(self):
+ return self.client.api.profiles[self.name]
+
+ def rename(self, new_name):
+ """Rename the profile."""
+ self.api.post(json={'name': new_name})
+
+ return Profile.get(self.client, new_name)
diff --git a/pylxd/profile.py b/pylxd/profile.py
deleted file mode 100644
index c5d564c..0000000
--- a/pylxd/profile.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# Copyright (c) 2016 Canonical Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from pylxd.models import _model as model
-
-
-class Profile(model.Model):
- """A LXD profile."""
-
- name = model.Attribute(readonly=True)
- description = model.Attribute()
- config = model.Attribute()
- devices = model.Attribute()
-
- @classmethod
- def get(cls, client, name):
- """Get a profile."""
- response = client.api.profiles[name].get()
- return cls(client, **response.json()['metadata'])
-
- @classmethod
- def all(cls, client):
- """Get all profiles."""
- response = client.api.profiles.get()
-
- profiles = []
- for url in response.json()['metadata']:
- name = url.split('/')[-1]
- profiles.append(cls(client, name=name))
- return profiles
-
- @classmethod
- def create(cls, client, name, config=None, devices=None):
- """Create a profile."""
- profile = {'name': name}
- if config is not None:
- profile['config'] = config
- if devices is not None:
- profile['devices'] = devices
- client.api.profiles.post(json=profile)
- return cls.get(client, name)
-
- @property
- def api(self):
- return self.client.api.profiles[self.name]
-
- def rename(self, new_name):
- """Rename the profile."""
- self.api.post(json={'name': new_name})
-
- return Profile.get(self.client, new_name)
diff --git a/pylxd/tests/test_profile.py b/pylxd/tests/test_profile.py
index c8f4e6c..6a357aa 100644
--- a/pylxd/tests/test_profile.py
+++ b/pylxd/tests/test_profile.py
@@ -1,16 +1,16 @@
import json
-from pylxd import exceptions, profile
+from pylxd import exceptions, models
from pylxd.tests import testing
class TestProfile(testing.PyLXDTestCase):
- """Tests for pylxd.profile.Profile."""
+ """Tests for pylxd.models.Profile."""
def test_get(self):
"""A profile is fetched."""
name = 'an-profile'
- an_profile = profile.Profile.get(self.client, name)
+ an_profile = models.Profile.get(self.client, name)
self.assertEqual(name, an_profile.name)
@@ -30,7 +30,7 @@ def not_found(request, context):
self.assertRaises(
exceptions.LXDAPIException,
- profile.Profile.get, self.client, 'an-profile')
+ models.Profile.get, self.client, 'an-profile')
def test_get_error(self):
"""LXDAPIException is raised on get error."""
@@ -48,25 +48,25 @@ def error(request, context):
self.assertRaises(
exceptions.LXDAPIException,
- profile.Profile.get, self.client, 'an-profile')
+ models.Profile.get, self.client, 'an-profile')
def test_all(self):
"""A list of all profiles is returned."""
- profiles = profile.Profile.all(self.client)
+ profiles = models.Profile.all(self.client)
self.assertEqual(1, len(profiles))
def test_create(self):
"""A new profile is created."""
- an_profile = profile.Profile.create(
+ an_profile = models.Profile.create(
self.client, name='an-new-profile', config={}, devices={})
- self.assertIsInstance(an_profile, profile.Profile)
+ self.assertIsInstance(an_profile, models.Profile)
self.assertEqual('an-new-profile', an_profile.name)
def test_rename(self):
"""A profile is renamed."""
- an_profile = profile.Profile.get(self.client, 'an-profile')
+ an_profile = models.Profile.get(self.client, 'an-profile')
an_renamed_profile = an_profile.rename('an-renamed-profile')
@@ -77,7 +77,7 @@ def test_update(self):
# XXX: rockstar (03 Jun 2016) - This just executes
# a code path. There should be an assertion here, but
# it's not clear how to assert that, just yet.
- an_profile = profile.Profile.get(self.client, 'an-profile')
+ an_profile = models.Profile.get(self.client, 'an-profile')
an_profile.save()
@@ -105,7 +105,7 @@ def not_found(request, context):
'url': r'^http://pylxd.test/1.0/profiles/an-profile$',
})
- an_profile = profile.Profile(self.client, name='an-profile')
+ an_profile = models.Profile(self.client, name='an-profile')
self.assertRaises(exceptions.LXDAPIException, an_profile.sync)
@@ -123,7 +123,7 @@ def error(request, context):
'url': r'^http://pylxd.test/1.0/profiles/an-profile$',
})
- an_profile = profile.Profile(self.client, name='an-profile')
+ an_profile = models.Profile(self.client, name='an-profile')
self.assertRaises(exceptions.LXDAPIException, an_profile.sync)
From 482d8718ae93fe0b83b8e24701419036e684fd00 Mon Sep 17 00:00:00 2001
From: Paul Hummer <paul.hummer at canonical.com>
Date: Wed, 7 Sep 2016 12:08:04 -0600
Subject: [PATCH 9/9] Move the model tests to pylxd.tests.models
---
pylxd/tests/models/__init__.py | 0
pylxd/tests/models/test_certificate.py | 62 +++++
pylxd/tests/models/test_container.py | 417 +++++++++++++++++++++++++++++++++
pylxd/tests/models/test_image.py | 328 ++++++++++++++++++++++++++
pylxd/tests/models/test_model.py | 181 ++++++++++++++
pylxd/tests/models/test_network.py | 53 +++++
pylxd/tests/models/test_operation.py | 58 +++++
pylxd/tests/models/test_profile.py | 137 +++++++++++
pylxd/tests/test_certificate.py | 62 -----
pylxd/tests/test_container.py | 417 ---------------------------------
pylxd/tests/test_image.py | 328 --------------------------
pylxd/tests/test_model.py | 181 --------------
pylxd/tests/test_network.py | 53 -----
pylxd/tests/test_operation.py | 58 -----
pylxd/tests/test_profile.py | 137 -----------
15 files changed, 1236 insertions(+), 1236 deletions(-)
create mode 100644 pylxd/tests/models/__init__.py
create mode 100644 pylxd/tests/models/test_certificate.py
create mode 100644 pylxd/tests/models/test_container.py
create mode 100644 pylxd/tests/models/test_image.py
create mode 100644 pylxd/tests/models/test_model.py
create mode 100644 pylxd/tests/models/test_network.py
create mode 100644 pylxd/tests/models/test_operation.py
create mode 100644 pylxd/tests/models/test_profile.py
delete mode 100644 pylxd/tests/test_certificate.py
delete mode 100644 pylxd/tests/test_container.py
delete mode 100644 pylxd/tests/test_image.py
delete mode 100644 pylxd/tests/test_model.py
delete mode 100644 pylxd/tests/test_network.py
delete mode 100644 pylxd/tests/test_operation.py
delete mode 100644 pylxd/tests/test_profile.py
diff --git a/pylxd/tests/models/__init__.py b/pylxd/tests/models/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/pylxd/tests/models/test_certificate.py b/pylxd/tests/models/test_certificate.py
new file mode 100644
index 0000000..d51a7e3
--- /dev/null
+++ b/pylxd/tests/models/test_certificate.py
@@ -0,0 +1,62 @@
+# Copyright (c) 2016 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import os
+
+from pylxd import models
+from pylxd.tests import testing
+
+
+class TestCertificate(testing.PyLXDTestCase):
+ """Tests for pylxd.models.Certificate."""
+
+ def test_get(self):
+ """A certificate is retrieved."""
+ cert = self.client.certificates.get('an-certificate')
+
+ self.assertEqual('certificate-content', cert.certificate)
+
+ def test_all(self):
+ """A certificates are returned."""
+ certs = self.client.certificates.all()
+
+ self.assertIn('an-certificate', [c.fingerprint for c in certs])
+
+ def test_create(self):
+ """A certificate is created."""
+ cert_data = open(os.path.join(
+ os.path.dirname(__file__), '..', 'lxd.crt')).read().encode('utf-8')
+ an_certificate = self.client.certificates.create(
+ 'test-password', cert_data)
+
+ self.assertEqual(
+ 'eaf55b72fc23aa516d709271df9b0116064bf8cfa009cf34c67c33ad32c2320c',
+ an_certificate.fingerprint)
+
+ def test_fetch(self):
+ """A partial object is fully fetched."""
+ an_certificate = models.Certificate(
+ self.client, fingerprint='an-certificate')
+
+ an_certificate.sync()
+
+ self.assertEqual('certificate-content', an_certificate.certificate)
+
+ def test_delete(self):
+ """A certificate is deleted."""
+ # XXX: rockstar (08 Jun 2016) - This just executes a code path. An
+ # assertion should be added.
+ an_certificate = models.Certificate(
+ self.client, fingerprint='an-certificate')
+
+ an_certificate.delete()
diff --git a/pylxd/tests/models/test_container.py b/pylxd/tests/models/test_container.py
new file mode 100644
index 0000000..e2044a5
--- /dev/null
+++ b/pylxd/tests/models/test_container.py
@@ -0,0 +1,417 @@
+import json
+
+import mock
+
+from pylxd import exceptions, models
+from pylxd.tests import testing
+
+
+class TestContainer(testing.PyLXDTestCase):
+ """Tests for pylxd.models.Container."""
+
+ def test_all(self):
+ """A list of all containers are returned."""
+ containers = models.Container.all(self.client)
+
+ self.assertEqual(1, len(containers))
+
+ def test_get(self):
+ """Return a container."""
+ name = 'an-container'
+
+ an_container = models.Container.get(self.client, name)
+
+ self.assertEqual(name, an_container.name)
+
+ def test_get_not_found(self):
+ """LXDAPIException is raised when the container doesn't exist."""
+ def not_found(request, context):
+ context.status_code = 404
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 404})
+ self.add_rule({
+ 'text': not_found,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/containers/an-missing-container$', # NOQA
+ })
+
+ name = 'an-missing-container'
+
+ self.assertRaises(
+ exceptions.LXDAPIException,
+ models.Container.get, self.client, name)
+
+ def test_get_error(self):
+ """LXDAPIException is raised when the LXD API errors."""
+ def not_found(request, context):
+ context.status_code = 500
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 500})
+ self.add_rule({
+ 'text': not_found,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/containers/an-missing-container$', # NOQA
+ })
+
+ name = 'an-missing-container'
+
+ self.assertRaises(
+ exceptions.LXDAPIException,
+ models.Container.get, self.client, name)
+
+ def test_create(self):
+ """A new container is created."""
+ config = {'name': 'an-new-container'}
+
+ an_new_container = models.Container.create(
+ self.client, config, wait=True)
+
+ self.assertEqual(config['name'], an_new_container.name)
+
+ def test_fetch(self):
+ """A sync updates the properties of a container."""
+ an_container = models.Container(
+ self.client, name='an-container')
+
+ an_container.sync()
+
+ self.assertTrue(an_container.ephemeral)
+
+ def test_fetch_not_found(self):
+ """LXDAPIException is raised on a 404 for updating container."""
+ def not_found(request, context):
+ context.status_code = 404
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 404})
+ self.add_rule({
+ 'text': not_found,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/containers/an-missing-container$', # NOQA
+ })
+
+ an_container = models.Container(
+ self.client, name='an-missing-container')
+
+ self.assertRaises(exceptions.LXDAPIException, an_container.sync)
+
+ def test_fetch_error(self):
+ """LXDAPIException is raised on error."""
+ def not_found(request, context):
+ context.status_code = 500
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'An bad error',
+ 'error_code': 500})
+ self.add_rule({
+ 'text': not_found,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/containers/an-missing-container$', # NOQA
+ })
+
+ an_container = models.Container(
+ self.client, name='an-missing-container')
+
+ self.assertRaises(exceptions.LXDAPIException, an_container.sync)
+
+ def test_update(self):
+ """A container is updated."""
+ an_container = models.Container(
+ self.client, name='an-container')
+ an_container.architecture = 1
+ an_container.config = {}
+ an_container.created_at = 1
+ an_container.devices = {}
+ an_container.ephemeral = 1
+ an_container.expanded_config = {}
+ an_container.expanded_devices = {}
+ an_container.profiles = 1
+ an_container.status = 1
+
+ an_container.save(wait=True)
+
+ self.assertTrue(an_container.ephemeral)
+
+ def test_rename(self):
+ an_container = models.Container(
+ self.client, name='an-container')
+
+ an_container.rename('an-renamed-container', wait=True)
+
+ self.assertEqual('an-renamed-container', an_container.name)
+
+ def test_delete(self):
+ """A container is deleted."""
+ # XXX: rockstar (21 May 2016) - This just executes
+ # a code path. There should be an assertion here, but
+ # it's not clear how to assert that, just yet.
+ an_container = models.Container(
+ self.client, name='an-container')
+
+ an_container.delete(wait=True)
+
+ @mock.patch('pylxd.models.container._StdinWebsocket')
+ @mock.patch('pylxd.models.container._CommandWebsocketClient')
+ def test_execute(self, _CommandWebsocketClient, _StdinWebsocket):
+ """A command is executed on a container."""
+ fake_websocket = mock.Mock()
+ fake_websocket.data = 'test\n'
+ _StdinWebsocket.return_value = fake_websocket
+ _CommandWebsocketClient.return_value = fake_websocket
+
+ an_container = models.Container(
+ self.client, name='an-container')
+
+ stdout, _ = an_container.execute(['echo', 'test'])
+
+ self.assertEqual('test\n', stdout)
+
+ def test_execute_string(self):
+ """A command passed as string raises a TypeError."""
+ an_container = models.Container(
+ self.client, name='an-container')
+
+ self.assertRaises(TypeError, an_container.execute, 'apt-get update')
+
+ def test_migrate(self):
+ """A container is migrated."""
+ from pylxd.client import Client
+
+ client2 = Client(endpoint='http://pylxd2.test')
+ an_container = models.Container(
+ self.client, name='an-container')
+
+ an_migrated_container = an_container.migrate(client2)
+
+ self.assertEqual('an-container', an_migrated_container.name)
+ self.assertEqual(client2, an_migrated_container.client)
+
+ def test_publish(self):
+ """Containers can be published."""
+ self.add_rule({
+ 'text': json.dumps({
+ 'type': 'sync',
+ 'metadata': {
+ 'id': 'operation-abc',
+ 'metadata': {
+ 'fingerprint': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' # NOQA
+ }
+ }
+ }),
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/operations/operation-abc$',
+ })
+
+ an_container = models.Container(
+ self.client, name='an-container')
+
+ image = an_container.publish(wait=True)
+
+ self.assertEqual(
+ 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
+ image.fingerprint)
+
+
+class TestContainerState(testing.PyLXDTestCase):
+ """Tests for pylxd.models.ContainerState."""
+
+ def test_get(self):
+ """Return a container."""
+ name = 'an-container'
+
+ an_container = models.Container.get(self.client, name)
+ state = an_container.state()
+
+ self.assertEqual('Running', state.status)
+ self.assertEqual(103, state.status_code)
+
+ def test_start(self):
+ """A container is started."""
+ an_container = models.Container.get(self.client, 'an-container')
+
+ an_container.start(wait=True)
+
+ def test_stop(self):
+ """A container is stopped."""
+ an_container = models.Container.get(self.client, 'an-container')
+
+ an_container.stop()
+
+ def test_restart(self):
+ """A container is restarted."""
+ an_container = models.Container.get(self.client, 'an-container')
+
+ an_container.restart()
+
+ def test_freeze(self):
+ """A container is suspended."""
+ an_container = models.Container.get(self.client, 'an-container')
+
+ an_container.freeze()
+
+ def test_unfreeze(self):
+ """A container is resumed."""
+ an_container = models.Container.get(self.client, 'an-container')
+
+ an_container.unfreeze()
+
+
+class TestContainerSnapshots(testing.PyLXDTestCase):
+ """Tests for pylxd.models.Container.snapshots."""
+
+ def setUp(self):
+ super(TestContainerSnapshots, self).setUp()
+ self.container = models.Container.get(self.client, 'an-container')
+
+ def test_get(self):
+ """Return a specific snapshot."""
+ snapshot = self.container.snapshots.get('an-snapshot')
+
+ self.assertEqual('an-snapshot', snapshot.name)
+
+ def test_all(self):
+ """Return all snapshots."""
+ snapshots = self.container.snapshots.all()
+
+ self.assertEqual(1, len(snapshots))
+ self.assertEqual('an-snapshot', snapshots[0].name)
+ self.assertEqual(self.client, snapshots[0].client)
+ self.assertEqual(self.container, snapshots[0].container)
+
+ def test_create(self):
+ """Create a snapshot."""
+ snapshot = self.container.snapshots.create(
+ 'an-snapshot', stateful=True, wait=True)
+
+ self.assertEqual('an-snapshot', snapshot.name)
+
+
+class TestSnapshot(testing.PyLXDTestCase):
+ """Tests for pylxd.models.Snapshot."""
+
+ def setUp(self):
+ super(TestSnapshot, self).setUp()
+ self.container = models.Container.get(self.client, 'an-container')
+
+ def test_rename(self):
+ """A snapshot is renamed."""
+ snapshot = models.Snapshot(
+ self.client, container=self.container,
+ name='an-snapshot')
+
+ snapshot.rename('an-renamed-snapshot', wait=True)
+
+ self.assertEqual('an-renamed-snapshot', snapshot.name)
+
+ def test_delete(self):
+ """A snapshot is deleted."""
+ snapshot = models.Snapshot(
+ self.client, container=self.container,
+ name='an-snapshot')
+
+ snapshot.delete(wait=True)
+
+ # TODO: add an assertion here
+
+ def test_delete_failure(self):
+ """If the response indicates delete failure, raise an exception."""
+ def not_found(request, context):
+ context.status_code = 404
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 404})
+ self.add_rule({
+ 'text': not_found,
+ 'method': 'DELETE',
+ 'url': r'^http://pylxd.test/1.0/containers/an-container/snapshots/an-snapshot$', # NOQA
+ })
+
+ snapshot = models.Snapshot(
+ self.client, container=self.container,
+ name='an-snapshot')
+
+ self.assertRaises(exceptions.LXDAPIException, snapshot.delete)
+
+ def test_publish(self):
+ """Snapshots can be published."""
+ self.add_rule({
+ 'text': json.dumps({
+ 'type': 'sync',
+ 'metadata': {
+ 'id': 'operation-abc',
+ 'metadata': {
+ 'fingerprint': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' # NOQA
+ }
+ }
+ }),
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/operations/operation-abc$',
+ })
+
+ snapshot = models.Snapshot(
+ self.client, container=self.container,
+ name='an-snapshot')
+
+ image = snapshot.publish(wait=True)
+
+ self.assertEqual(
+ 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
+ image.fingerprint)
+
+
+class TestFiles(testing.PyLXDTestCase):
+ """Tests for pylxd.models.Container.files."""
+
+ def setUp(self):
+ super(TestFiles, self).setUp()
+ self.container = models.Container.get(self.client, 'an-container')
+
+ def test_put(self):
+ """A file is put on the container."""
+ data = 'The quick brown fox'
+
+ self.container.files.put('/tmp/putted', data)
+
+ # TODO: Add an assertion here
+
+ def test_get(self):
+ """A file is retrieved from the container."""
+ data = self.container.files.get('/tmp/getted')
+
+ self.assertEqual(b'This is a getted file', data)
+
+ def test_get_not_found(self):
+ """LXDAPIException is raised on bogus filenames."""
+ def not_found(request, context):
+ context.status_code = 500
+ rule = {
+ 'text': not_found,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/containers/an-container/files\?path=%2Ftmp%2Fgetted$', # NOQA
+ }
+ self.add_rule(rule)
+
+ self.assertRaises(
+ exceptions.LXDAPIException,
+ self.container.files.get, '/tmp/getted')
+
+ def test_get_error(self):
+ """LXDAPIException is raised on error."""
+ def not_found(request, context):
+ context.status_code = 503
+ rule = {
+ 'text': not_found,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/containers/an-container/files\?path=%2Ftmp%2Fgetted$', # NOQA
+ }
+ self.add_rule(rule)
+
+ self.assertRaises(
+ exceptions.LXDAPIException,
+ self.container.files.get, '/tmp/getted')
diff --git a/pylxd/tests/models/test_image.py b/pylxd/tests/models/test_image.py
new file mode 100644
index 0000000..26e7b21
--- /dev/null
+++ b/pylxd/tests/models/test_image.py
@@ -0,0 +1,328 @@
+import hashlib
+import json
+
+from pylxd import exceptions, models
+from pylxd.tests import testing
+
+
+class TestImage(testing.PyLXDTestCase):
+ """Tests for pylxd.models.Image."""
+
+ def test_get(self):
+ """An image is fetched."""
+ fingerprint = hashlib.sha256(b'').hexdigest()
+ a_image = models.Image.get(self.client, fingerprint)
+
+ self.assertEqual(fingerprint, a_image.fingerprint)
+
+ def test_get_not_found(self):
+ """LXDAPIException is raised when the image isn't found."""
+ def not_found(request, context):
+ context.status_code = 404
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 404})
+ self.add_rule({
+ 'text': not_found,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855$', # NOQA
+ })
+
+ fingerprint = hashlib.sha256(b'').hexdigest()
+
+ self.assertRaises(
+ exceptions.LXDAPIException,
+ models.Image.get, self.client, fingerprint)
+
+ def test_get_error(self):
+ """LXDAPIException is raised on error."""
+ def error(request, context):
+ context.status_code = 500
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 500})
+ self.add_rule({
+ 'text': error,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855$', # NOQA
+ })
+
+ fingerprint = hashlib.sha256(b'').hexdigest()
+
+ self.assertRaises(
+ exceptions.LXDAPIException,
+ models.Image.get, self.client, fingerprint)
+
+ def test_get_by_alias(self):
+ fingerprint = hashlib.sha256(b'').hexdigest()
+
+ a_image = models.Image.get_by_alias(self.client, 'an-alias')
+
+ self.assertEqual(fingerprint, a_image.fingerprint)
+
+ def test_all(self):
+ """A list of all images is returned."""
+ images = models.Image.all(self.client)
+
+ self.assertEqual(1, len(images))
+
+ def test_create(self):
+ """An image is created."""
+ fingerprint = hashlib.sha256(b'').hexdigest()
+ a_image = models.Image.create(self.client, b'', public=True, wait=True)
+
+ self.assertIsInstance(a_image, models.Image)
+ self.assertEqual(fingerprint, a_image.fingerprint)
+
+ def test_update(self):
+ """An image is updated."""
+ a_image = self.client.images.all()[0]
+ a_image.sync()
+
+ a_image.save()
+
+ def test_fetch(self):
+ """A partial object is fetched and populated."""
+ a_image = self.client.images.all()[0]
+
+ a_image.sync()
+
+ self.assertEqual(1, a_image.size)
+
+ def test_fetch_notfound(self):
+ """A bogus image fetch raises LXDAPIException."""
+ def not_found(request, context):
+ context.status_code = 404
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 404})
+ self.add_rule({
+ 'text': not_found,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855$', # NOQA
+ })
+ fingerprint = hashlib.sha256(b'').hexdigest()
+
+ a_image = models.Image(self.client, fingerprint=fingerprint)
+
+ self.assertRaises(exceptions.LXDAPIException, a_image.sync)
+
+ def test_fetch_error(self):
+ """A 500 error raises LXDAPIException."""
+ def not_found(request, context):
+ context.status_code = 500
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 500})
+ self.add_rule({
+ 'text': not_found,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855$', # NOQA
+ })
+ fingerprint = hashlib.sha256(b'').hexdigest()
+
+ a_image = models.Image(self.client, fingerprint=fingerprint)
+
+ self.assertRaises(exceptions.LXDAPIException, a_image.sync)
+
+ def test_delete(self):
+ """An image is deleted."""
+ # XXX: rockstar (03 Jun 2016) - This just executes
+ # a code path. There should be an assertion here, but
+ # it's not clear how to assert that, just yet.
+ a_image = self.client.images.all()[0]
+
+ a_image.delete(wait=True)
+
+ def test_export(self):
+ """An image is exported."""
+ a_image = self.client.images.all()[0]
+
+ data = a_image.export()
+ data_sha = hashlib.sha256(data).hexdigest()
+
+ self.assertEqual(a_image.fingerprint, data_sha)
+
+ def test_export_not_found(self):
+ """LXDAPIException is raised on export of bogus image."""
+ def not_found(request, context):
+ context.status_code = 404
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 404})
+ self.add_rule({
+ 'text': not_found,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855/export$', # NOQA
+ })
+ a_image = self.client.images.all()[0]
+
+ self.assertRaises(exceptions.LXDAPIException, a_image.export)
+
+ def test_export_error(self):
+ """LXDAPIException is raised on API error."""
+ def error(request, context):
+ context.status_code = 500
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'LOLOLOLOL',
+ 'error_code': 500})
+ self.add_rule({
+ 'text': error,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855/export$', # NOQA
+ })
+ a_image = self.client.images.all()[0]
+
+ self.assertRaises(exceptions.LXDAPIException, a_image.export)
+
+ def test_add_alias(self):
+ """Try to add an alias."""
+ a_image = self.client.images.all()[0]
+ a_image.add_alias('lol', 'Just LOL')
+
+ aliases = [a['name'] for a in a_image.aliases]
+ self.assertTrue('lol' in aliases, "Image didn't get updated.")
+
+ def test_add_alias_duplicate(self):
+ """Adding a alias twice should raise an LXDAPIException."""
+ def error(request, context):
+ context.status_code = 409
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'already exists',
+ 'error_code': 409})
+ self.add_rule({
+ 'text': error,
+ 'method': 'POST',
+ 'url': r'^http://pylxd.test/1.0/images/aliases$', # NOQA
+ })
+
+ a_image = self.client.images.all()[0]
+
+ self.assertRaises(
+ exceptions.LXDAPIException,
+ a_image.add_alias,
+ 'lol', 'Just LOL'
+ )
+
+ def test_remove_alias(self):
+ """Try to remove an-alias."""
+ a_image = self.client.images.all()[0]
+ a_image.delete_alias('an-alias')
+
+ self.assertEqual(0, len(a_image.aliases), "Alias didn't get deleted.")
+
+ def test_remove_alias_error(self):
+ """Try to remove an non existant alias."""
+ def error(request, context):
+ context.status_code = 404
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'not found',
+ 'error_code': 404})
+ self.add_rule({
+ 'text': error,
+ 'method': 'DELETE',
+ 'url': r'^http://pylxd.test/1.0/images/aliases/lol$', # NOQA
+ })
+
+ a_image = self.client.images.all()[0]
+ self.assertRaises(
+ exceptions.LXDAPIException,
+ a_image.delete_alias,
+ 'lol'
+ )
+
+ def test_remove_alias_not_in_image(self):
+ """Try to remove an alias which is not in the current image."""
+ a_image = self.client.images.all()[0]
+ a_image.delete_alias('b-alias')
+
+ def test_copy(self):
+ """Try to copy an image to another LXD instance."""
+ from pylxd.client import Client
+
+ a_image = self.client.images.all()[0]
+
+ client2 = Client(endpoint='http://pylxd2.test')
+ copied_image = a_image.copy(client2, wait=True)
+ self.assertEqual(a_image.fingerprint, copied_image.fingerprint)
+
+ def test_copy_public(self):
+ """Try to copy a public image."""
+ from pylxd.client import Client
+
+ def image_get(request, context):
+ context.status_code = 200
+ return json.dumps({
+ 'type': 'sync',
+ 'metadata': {
+ 'aliases': [
+ {
+ 'name': 'an-alias', # NOQA
+ 'fingerprint': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', # NOQA
+ }
+ ],
+ 'architecture': 'x86_64',
+ 'cached': False,
+ 'filename': 'a_image.tar.bz2',
+ 'fingerprint': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', # NOQA
+ 'public': True,
+ 'properties': {},
+ 'size': 1,
+ 'auto_update': False,
+ 'created_at': '1983-06-16T02:42:00Z',
+ 'expires_at': '1983-06-16T02:42:00Z',
+ 'last_used_at': '1983-06-16T02:42:00Z',
+ 'uploaded_at': '1983-06-16T02:42:00Z',
+
+ },
+ })
+ self.add_rule({
+ 'text': image_get,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855$', # NOQA
+ })
+
+ a_image = self.client.images.all()[0]
+ self.assertTrue(a_image.public)
+
+ client2 = Client(endpoint='http://pylxd2.test')
+ copied_image = a_image.copy(client2, wait=True)
+ self.assertEqual(a_image.fingerprint, copied_image.fingerprint)
+
+ def test_copy_no_wait(self):
+ """Try to copy and don't wait."""
+ from pylxd.client import Client
+
+ a_image = self.client.images.all()[0]
+
+ client2 = Client(endpoint='http://pylxd2.test')
+ a_image.copy(client2, public=False, auto_update=False)
+
+ def test_create_from_simplestreams(self):
+ """Try to create an image from simplestreams."""
+ image = self.client.images.create_from_simplestreams(
+ 'https://cloud-images.ubuntu.com/releases',
+ 'trusty/amd64'
+ )
+ self.assertEqual(
+ 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
+ image.fingerprint
+ )
+
+ def test_create_from_url(self):
+ """Try to create an image from an URL."""
+ image = self.client.images.create_from_url(
+ 'https://dl.stgraber.org/lxd'
+ )
+ self.assertEqual(
+ 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
+ image.fingerprint
+ )
diff --git a/pylxd/tests/models/test_model.py b/pylxd/tests/models/test_model.py
new file mode 100644
index 0000000..ca5f1b7
--- /dev/null
+++ b/pylxd/tests/models/test_model.py
@@ -0,0 +1,181 @@
+# Copyright (c) 2016 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from pylxd.models import _model as model
+from pylxd.tests import testing
+
+
+class Item(model.Model):
+ """A fake model."""
+ name = model.Attribute(readonly=True)
+ age = model.Attribute(int)
+ data = model.Attribute()
+
+ @property
+ def api(self):
+ return self.client.api.items[self.name]
+
+
+class TestModel(testing.PyLXDTestCase):
+ """Tests for pylxd.model.Model."""
+
+ def setUp(self):
+ super(TestModel, self).setUp()
+
+ self.add_rule({
+ 'json': {
+ 'type': 'sync',
+ 'metadata': {
+ 'name': 'an-item',
+ 'age': 1000,
+ 'data': {'key': 'val'},
+ }
+ },
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/items/an-item',
+ })
+ self.add_rule({
+ 'json': {
+ 'type': 'sync',
+ 'metadata': {}
+ },
+ 'method': 'PUT',
+ 'url': r'^http://pylxd.test/1.0/items/an-item',
+ })
+ self.add_rule({
+ 'json': {
+ 'type': 'sync',
+ 'metadata': {}
+ },
+ 'method': 'DELETE',
+ 'url': r'^http://pylxd.test/1.0/items/an-item',
+ })
+
+ def test_init(self):
+ """Initial attributes are set."""
+ item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
+
+ self.assertEqual(self.client, item.client)
+ self.assertEqual('an-item', item.name)
+
+ def test_init_unknown_attribute(self):
+ """Unknown attributes aren't set."""
+ item = Item(self.client, name='an-item', nonexistent='SRSLY')
+
+ try:
+ item.nonexistent
+ self.fail('item.nonexistent did not raise AttributeError')
+ except AttributeError:
+ pass
+
+ def test_unknown_attribute(self):
+ """Setting unknown attributes raise an exception."""
+ def set_unknown_attribute():
+ item = Item(self.client, name='an-item')
+ item.nonexistent = 'SRSLY'
+ self.assertRaises(AttributeError, set_unknown_attribute)
+
+ def test_get_unknown_attribute(self):
+ """Setting unknown attributes raise an exception."""
+ def get_unknown_attribute():
+ item = Item(self.client, name='an-item')
+ return item.nonexistent
+ self.assertRaises(AttributeError, get_unknown_attribute)
+
+ def test_unset_attribute_sync(self):
+ """Reading unavailable attributes calls sync."""
+ item = Item(self.client, name='an-item')
+
+ self.assertEqual(1000, item.age)
+
+ def test_sync(self):
+ """A sync will update attributes from the server."""
+ item = Item(self.client, name='an-item')
+
+ item.sync()
+
+ self.assertEqual(1000, item.age)
+
+ def test_sync_dirty(self):
+ """Sync will not overwrite local attribute changes."""
+ item = Item(self.client, name='an-item')
+
+ item.age = 250
+ item.sync()
+
+ self.assertEqual(250, item.age)
+
+ def test_rollback(self):
+ """Rollback resets the object from the server."""
+ item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
+
+ item.age = 50
+ item.rollback()
+
+ self.assertEqual(1000, item.age)
+ self.assertFalse(item.dirty)
+
+ def test_int_attribute_validator(self):
+ """Age is set properly to be an int."""
+ item = Item(self.client)
+
+ item.age = '100'
+
+ self.assertEqual(100, item.age)
+
+ def test_int_attribute_invalid(self):
+ """TypeError is raised when data can't be converted to type."""
+ def set_string():
+ item = Item(self.client)
+ item.age = 'abc'
+
+ self.assertRaises(ValueError, set_string)
+
+ def test_dirty(self):
+ """Changes mark the object as dirty."""
+ item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
+
+ item.age = 100
+
+ self.assertTrue(item.dirty)
+
+ def test_not_dirty(self):
+ """Changes mark the object as dirty."""
+ item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
+
+ self.assertFalse(item.dirty)
+
+ def test_marshall(self):
+ """The object is marshalled into a dict."""
+ item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
+
+ result = item.marshall()
+
+ self.assertEqual({'age': 15, 'data': {'key': 'val'}}, result)
+
+ def test_delete(self):
+ """The object is deleted, and client is unset."""
+ item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
+
+ item.delete()
+
+ self.assertIsNone(item.client)
+
+ def test_save(self):
+ """Attributes are written to the server; object is marked clean."""
+ item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
+
+ item.age = 69
+ item.save()
+
+ self.assertFalse(item.dirty)
diff --git a/pylxd/tests/models/test_network.py b/pylxd/tests/models/test_network.py
new file mode 100644
index 0000000..c815438
--- /dev/null
+++ b/pylxd/tests/models/test_network.py
@@ -0,0 +1,53 @@
+# Copyright (c) 2016 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from pylxd import models
+from pylxd.tests import testing
+
+
+class TestNetwork(testing.PyLXDTestCase):
+ """Tests for pylxd.models.Network."""
+
+ def test_all(self):
+ """A list of all networks are returned."""
+ networks = models.Network.all(self.client)
+
+ self.assertEqual(1, len(networks))
+
+ def test_get(self):
+ """Return a container."""
+ name = 'lo'
+
+ an_network = models.Network.get(self.client, name)
+
+ self.assertEqual(name, an_network.name)
+
+ def test_partial(self):
+ """A partial network is synced."""
+ an_network = models.Network(self.client, name='lo')
+
+ self.assertEqual('loopback', an_network.type)
+
+ def test_delete(self):
+ """delete is not implemented in networks."""
+ an_network = models.Network(self.client, name='lo')
+
+ with self.assertRaises(NotImplementedError):
+ an_network.delete()
+
+ def test_save(self):
+ """save is not implemented in networks."""
+ an_network = models.Network(self.client, name='lo')
+
+ with self.assertRaises(NotImplementedError):
+ an_network.save()
diff --git a/pylxd/tests/models/test_operation.py b/pylxd/tests/models/test_operation.py
new file mode 100644
index 0000000..8c74cc9
--- /dev/null
+++ b/pylxd/tests/models/test_operation.py
@@ -0,0 +1,58 @@
+# Copyright (c) 2016 Canonical Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from pylxd import exceptions, models
+from pylxd.tests import testing
+
+
+class TestOperation(testing.PyLXDTestCase):
+ """Tests for pylxd.models.Operation."""
+
+ def test_get(self):
+ """Return an operation."""
+ name = 'operation-abc'
+
+ an_operation = models.Operation.get(self.client, name)
+
+ self.assertEqual(name, an_operation.id)
+
+ def test_get_full_path(self):
+ """Return an operation even if the full path is specified."""
+ name = '/1.0/operations/operation-abc'
+
+ an_operation = models.Operation.get(self.client, name)
+
+ self.assertEqual('operation-abc', an_operation.id)
+
+ def test_wait_with_error(self):
+ """If the operation errors, wait raises an exception."""
+ def error(request, context):
+ context.status_code = 200
+ return {
+ 'type': 'sync',
+ 'metadata': {
+ 'status': 'Failure',
+ 'err': 'Keep your foot off the blasted samoflange.',
+ }}
+ self.add_rule({
+ 'json': error,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/operations/operation-abc/wait$', # NOQA
+ })
+
+ name = '/1.0/operations/operation-abc'
+
+ an_operation = models.Operation.get(self.client, name)
+
+ self.assertRaises(exceptions.LXDAPIException, an_operation.wait)
diff --git a/pylxd/tests/models/test_profile.py b/pylxd/tests/models/test_profile.py
new file mode 100644
index 0000000..6a357aa
--- /dev/null
+++ b/pylxd/tests/models/test_profile.py
@@ -0,0 +1,137 @@
+import json
+
+from pylxd import exceptions, models
+from pylxd.tests import testing
+
+
+class TestProfile(testing.PyLXDTestCase):
+ """Tests for pylxd.models.Profile."""
+
+ def test_get(self):
+ """A profile is fetched."""
+ name = 'an-profile'
+ an_profile = models.Profile.get(self.client, name)
+
+ self.assertEqual(name, an_profile.name)
+
+ def test_get_not_found(self):
+ """LXDAPIException is raised on unknown profiles."""
+ def not_found(request, context):
+ context.status_code = 404
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 404})
+ self.add_rule({
+ 'text': not_found,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/profiles/an-profile$',
+ })
+
+ self.assertRaises(
+ exceptions.LXDAPIException,
+ models.Profile.get, self.client, 'an-profile')
+
+ def test_get_error(self):
+ """LXDAPIException is raised on get error."""
+ def error(request, context):
+ context.status_code = 500
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 500})
+ self.add_rule({
+ 'text': error,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/profiles/an-profile$',
+ })
+
+ self.assertRaises(
+ exceptions.LXDAPIException,
+ models.Profile.get, self.client, 'an-profile')
+
+ def test_all(self):
+ """A list of all profiles is returned."""
+ profiles = models.Profile.all(self.client)
+
+ self.assertEqual(1, len(profiles))
+
+ def test_create(self):
+ """A new profile is created."""
+ an_profile = models.Profile.create(
+ self.client, name='an-new-profile', config={}, devices={})
+
+ self.assertIsInstance(an_profile, models.Profile)
+ self.assertEqual('an-new-profile', an_profile.name)
+
+ def test_rename(self):
+ """A profile is renamed."""
+ an_profile = models.Profile.get(self.client, 'an-profile')
+
+ an_renamed_profile = an_profile.rename('an-renamed-profile')
+
+ self.assertEqual('an-renamed-profile', an_renamed_profile.name)
+
+ def test_update(self):
+ """A profile is updated."""
+ # XXX: rockstar (03 Jun 2016) - This just executes
+ # a code path. There should be an assertion here, but
+ # it's not clear how to assert that, just yet.
+ an_profile = models.Profile.get(self.client, 'an-profile')
+
+ an_profile.save()
+
+ self.assertEqual({}, an_profile.config)
+
+ def test_fetch(self):
+ """A partially fetched profile is made complete."""
+ an_profile = self.client.profiles.all()[0]
+
+ an_profile.sync()
+
+ self.assertEqual('An description', an_profile.description)
+
+ def test_fetch_notfound(self):
+ """LXDAPIException is raised on bogus profile fetches."""
+ def not_found(request, context):
+ context.status_code = 404
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 404})
+ self.add_rule({
+ 'text': not_found,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/profiles/an-profile$',
+ })
+
+ an_profile = models.Profile(self.client, name='an-profile')
+
+ self.assertRaises(exceptions.LXDAPIException, an_profile.sync)
+
+ def test_fetch_error(self):
+ """LXDAPIException is raised on fetch error."""
+ def error(request, context):
+ context.status_code = 500
+ return json.dumps({
+ 'type': 'error',
+ 'error': 'Not found',
+ 'error_code': 500})
+ self.add_rule({
+ 'text': error,
+ 'method': 'GET',
+ 'url': r'^http://pylxd.test/1.0/profiles/an-profile$',
+ })
+
+ an_profile = models.Profile(self.client, name='an-profile')
+
+ self.assertRaises(exceptions.LXDAPIException, an_profile.sync)
+
+ def test_delete(self):
+ """A profile is deleted."""
+ # XXX: rockstar (03 Jun 2016) - This just executes
+ # a code path. There should be an assertion here, but
+ # it's not clear how to assert that, just yet.
+ an_profile = self.client.profiles.all()[0]
+
+ an_profile.delete()
diff --git a/pylxd/tests/test_certificate.py b/pylxd/tests/test_certificate.py
deleted file mode 100644
index c9b0479..0000000
--- a/pylxd/tests/test_certificate.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# Copyright (c) 2016 Canonical Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import os
-
-from pylxd import models
-from pylxd.tests import testing
-
-
-class TestCertificate(testing.PyLXDTestCase):
- """Tests for pylxd.models.Certificate."""
-
- def test_get(self):
- """A certificate is retrieved."""
- cert = self.client.certificates.get('an-certificate')
-
- self.assertEqual('certificate-content', cert.certificate)
-
- def test_all(self):
- """A certificates are returned."""
- certs = self.client.certificates.all()
-
- self.assertIn('an-certificate', [c.fingerprint for c in certs])
-
- def test_create(self):
- """A certificate is created."""
- cert_data = open(os.path.join(
- os.path.dirname(__file__), 'lxd.crt')).read().encode('utf-8')
- an_certificate = self.client.certificates.create(
- 'test-password', cert_data)
-
- self.assertEqual(
- 'eaf55b72fc23aa516d709271df9b0116064bf8cfa009cf34c67c33ad32c2320c',
- an_certificate.fingerprint)
-
- def test_fetch(self):
- """A partial object is fully fetched."""
- an_certificate = models.Certificate(
- self.client, fingerprint='an-certificate')
-
- an_certificate.sync()
-
- self.assertEqual('certificate-content', an_certificate.certificate)
-
- def test_delete(self):
- """A certificate is deleted."""
- # XXX: rockstar (08 Jun 2016) - This just executes a code path. An
- # assertion should be added.
- an_certificate = models.Certificate(
- self.client, fingerprint='an-certificate')
-
- an_certificate.delete()
diff --git a/pylxd/tests/test_container.py b/pylxd/tests/test_container.py
deleted file mode 100644
index e2044a5..0000000
--- a/pylxd/tests/test_container.py
+++ /dev/null
@@ -1,417 +0,0 @@
-import json
-
-import mock
-
-from pylxd import exceptions, models
-from pylxd.tests import testing
-
-
-class TestContainer(testing.PyLXDTestCase):
- """Tests for pylxd.models.Container."""
-
- def test_all(self):
- """A list of all containers are returned."""
- containers = models.Container.all(self.client)
-
- self.assertEqual(1, len(containers))
-
- def test_get(self):
- """Return a container."""
- name = 'an-container'
-
- an_container = models.Container.get(self.client, name)
-
- self.assertEqual(name, an_container.name)
-
- def test_get_not_found(self):
- """LXDAPIException is raised when the container doesn't exist."""
- def not_found(request, context):
- context.status_code = 404
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 404})
- self.add_rule({
- 'text': not_found,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/containers/an-missing-container$', # NOQA
- })
-
- name = 'an-missing-container'
-
- self.assertRaises(
- exceptions.LXDAPIException,
- models.Container.get, self.client, name)
-
- def test_get_error(self):
- """LXDAPIException is raised when the LXD API errors."""
- def not_found(request, context):
- context.status_code = 500
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 500})
- self.add_rule({
- 'text': not_found,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/containers/an-missing-container$', # NOQA
- })
-
- name = 'an-missing-container'
-
- self.assertRaises(
- exceptions.LXDAPIException,
- models.Container.get, self.client, name)
-
- def test_create(self):
- """A new container is created."""
- config = {'name': 'an-new-container'}
-
- an_new_container = models.Container.create(
- self.client, config, wait=True)
-
- self.assertEqual(config['name'], an_new_container.name)
-
- def test_fetch(self):
- """A sync updates the properties of a container."""
- an_container = models.Container(
- self.client, name='an-container')
-
- an_container.sync()
-
- self.assertTrue(an_container.ephemeral)
-
- def test_fetch_not_found(self):
- """LXDAPIException is raised on a 404 for updating container."""
- def not_found(request, context):
- context.status_code = 404
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 404})
- self.add_rule({
- 'text': not_found,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/containers/an-missing-container$', # NOQA
- })
-
- an_container = models.Container(
- self.client, name='an-missing-container')
-
- self.assertRaises(exceptions.LXDAPIException, an_container.sync)
-
- def test_fetch_error(self):
- """LXDAPIException is raised on error."""
- def not_found(request, context):
- context.status_code = 500
- return json.dumps({
- 'type': 'error',
- 'error': 'An bad error',
- 'error_code': 500})
- self.add_rule({
- 'text': not_found,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/containers/an-missing-container$', # NOQA
- })
-
- an_container = models.Container(
- self.client, name='an-missing-container')
-
- self.assertRaises(exceptions.LXDAPIException, an_container.sync)
-
- def test_update(self):
- """A container is updated."""
- an_container = models.Container(
- self.client, name='an-container')
- an_container.architecture = 1
- an_container.config = {}
- an_container.created_at = 1
- an_container.devices = {}
- an_container.ephemeral = 1
- an_container.expanded_config = {}
- an_container.expanded_devices = {}
- an_container.profiles = 1
- an_container.status = 1
-
- an_container.save(wait=True)
-
- self.assertTrue(an_container.ephemeral)
-
- def test_rename(self):
- an_container = models.Container(
- self.client, name='an-container')
-
- an_container.rename('an-renamed-container', wait=True)
-
- self.assertEqual('an-renamed-container', an_container.name)
-
- def test_delete(self):
- """A container is deleted."""
- # XXX: rockstar (21 May 2016) - This just executes
- # a code path. There should be an assertion here, but
- # it's not clear how to assert that, just yet.
- an_container = models.Container(
- self.client, name='an-container')
-
- an_container.delete(wait=True)
-
- @mock.patch('pylxd.models.container._StdinWebsocket')
- @mock.patch('pylxd.models.container._CommandWebsocketClient')
- def test_execute(self, _CommandWebsocketClient, _StdinWebsocket):
- """A command is executed on a container."""
- fake_websocket = mock.Mock()
- fake_websocket.data = 'test\n'
- _StdinWebsocket.return_value = fake_websocket
- _CommandWebsocketClient.return_value = fake_websocket
-
- an_container = models.Container(
- self.client, name='an-container')
-
- stdout, _ = an_container.execute(['echo', 'test'])
-
- self.assertEqual('test\n', stdout)
-
- def test_execute_string(self):
- """A command passed as string raises a TypeError."""
- an_container = models.Container(
- self.client, name='an-container')
-
- self.assertRaises(TypeError, an_container.execute, 'apt-get update')
-
- def test_migrate(self):
- """A container is migrated."""
- from pylxd.client import Client
-
- client2 = Client(endpoint='http://pylxd2.test')
- an_container = models.Container(
- self.client, name='an-container')
-
- an_migrated_container = an_container.migrate(client2)
-
- self.assertEqual('an-container', an_migrated_container.name)
- self.assertEqual(client2, an_migrated_container.client)
-
- def test_publish(self):
- """Containers can be published."""
- self.add_rule({
- 'text': json.dumps({
- 'type': 'sync',
- 'metadata': {
- 'id': 'operation-abc',
- 'metadata': {
- 'fingerprint': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' # NOQA
- }
- }
- }),
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/operations/operation-abc$',
- })
-
- an_container = models.Container(
- self.client, name='an-container')
-
- image = an_container.publish(wait=True)
-
- self.assertEqual(
- 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
- image.fingerprint)
-
-
-class TestContainerState(testing.PyLXDTestCase):
- """Tests for pylxd.models.ContainerState."""
-
- def test_get(self):
- """Return a container."""
- name = 'an-container'
-
- an_container = models.Container.get(self.client, name)
- state = an_container.state()
-
- self.assertEqual('Running', state.status)
- self.assertEqual(103, state.status_code)
-
- def test_start(self):
- """A container is started."""
- an_container = models.Container.get(self.client, 'an-container')
-
- an_container.start(wait=True)
-
- def test_stop(self):
- """A container is stopped."""
- an_container = models.Container.get(self.client, 'an-container')
-
- an_container.stop()
-
- def test_restart(self):
- """A container is restarted."""
- an_container = models.Container.get(self.client, 'an-container')
-
- an_container.restart()
-
- def test_freeze(self):
- """A container is suspended."""
- an_container = models.Container.get(self.client, 'an-container')
-
- an_container.freeze()
-
- def test_unfreeze(self):
- """A container is resumed."""
- an_container = models.Container.get(self.client, 'an-container')
-
- an_container.unfreeze()
-
-
-class TestContainerSnapshots(testing.PyLXDTestCase):
- """Tests for pylxd.models.Container.snapshots."""
-
- def setUp(self):
- super(TestContainerSnapshots, self).setUp()
- self.container = models.Container.get(self.client, 'an-container')
-
- def test_get(self):
- """Return a specific snapshot."""
- snapshot = self.container.snapshots.get('an-snapshot')
-
- self.assertEqual('an-snapshot', snapshot.name)
-
- def test_all(self):
- """Return all snapshots."""
- snapshots = self.container.snapshots.all()
-
- self.assertEqual(1, len(snapshots))
- self.assertEqual('an-snapshot', snapshots[0].name)
- self.assertEqual(self.client, snapshots[0].client)
- self.assertEqual(self.container, snapshots[0].container)
-
- def test_create(self):
- """Create a snapshot."""
- snapshot = self.container.snapshots.create(
- 'an-snapshot', stateful=True, wait=True)
-
- self.assertEqual('an-snapshot', snapshot.name)
-
-
-class TestSnapshot(testing.PyLXDTestCase):
- """Tests for pylxd.models.Snapshot."""
-
- def setUp(self):
- super(TestSnapshot, self).setUp()
- self.container = models.Container.get(self.client, 'an-container')
-
- def test_rename(self):
- """A snapshot is renamed."""
- snapshot = models.Snapshot(
- self.client, container=self.container,
- name='an-snapshot')
-
- snapshot.rename('an-renamed-snapshot', wait=True)
-
- self.assertEqual('an-renamed-snapshot', snapshot.name)
-
- def test_delete(self):
- """A snapshot is deleted."""
- snapshot = models.Snapshot(
- self.client, container=self.container,
- name='an-snapshot')
-
- snapshot.delete(wait=True)
-
- # TODO: add an assertion here
-
- def test_delete_failure(self):
- """If the response indicates delete failure, raise an exception."""
- def not_found(request, context):
- context.status_code = 404
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 404})
- self.add_rule({
- 'text': not_found,
- 'method': 'DELETE',
- 'url': r'^http://pylxd.test/1.0/containers/an-container/snapshots/an-snapshot$', # NOQA
- })
-
- snapshot = models.Snapshot(
- self.client, container=self.container,
- name='an-snapshot')
-
- self.assertRaises(exceptions.LXDAPIException, snapshot.delete)
-
- def test_publish(self):
- """Snapshots can be published."""
- self.add_rule({
- 'text': json.dumps({
- 'type': 'sync',
- 'metadata': {
- 'id': 'operation-abc',
- 'metadata': {
- 'fingerprint': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' # NOQA
- }
- }
- }),
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/operations/operation-abc$',
- })
-
- snapshot = models.Snapshot(
- self.client, container=self.container,
- name='an-snapshot')
-
- image = snapshot.publish(wait=True)
-
- self.assertEqual(
- 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
- image.fingerprint)
-
-
-class TestFiles(testing.PyLXDTestCase):
- """Tests for pylxd.models.Container.files."""
-
- def setUp(self):
- super(TestFiles, self).setUp()
- self.container = models.Container.get(self.client, 'an-container')
-
- def test_put(self):
- """A file is put on the container."""
- data = 'The quick brown fox'
-
- self.container.files.put('/tmp/putted', data)
-
- # TODO: Add an assertion here
-
- def test_get(self):
- """A file is retrieved from the container."""
- data = self.container.files.get('/tmp/getted')
-
- self.assertEqual(b'This is a getted file', data)
-
- def test_get_not_found(self):
- """LXDAPIException is raised on bogus filenames."""
- def not_found(request, context):
- context.status_code = 500
- rule = {
- 'text': not_found,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/containers/an-container/files\?path=%2Ftmp%2Fgetted$', # NOQA
- }
- self.add_rule(rule)
-
- self.assertRaises(
- exceptions.LXDAPIException,
- self.container.files.get, '/tmp/getted')
-
- def test_get_error(self):
- """LXDAPIException is raised on error."""
- def not_found(request, context):
- context.status_code = 503
- rule = {
- 'text': not_found,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/containers/an-container/files\?path=%2Ftmp%2Fgetted$', # NOQA
- }
- self.add_rule(rule)
-
- self.assertRaises(
- exceptions.LXDAPIException,
- self.container.files.get, '/tmp/getted')
diff --git a/pylxd/tests/test_image.py b/pylxd/tests/test_image.py
deleted file mode 100644
index 26e7b21..0000000
--- a/pylxd/tests/test_image.py
+++ /dev/null
@@ -1,328 +0,0 @@
-import hashlib
-import json
-
-from pylxd import exceptions, models
-from pylxd.tests import testing
-
-
-class TestImage(testing.PyLXDTestCase):
- """Tests for pylxd.models.Image."""
-
- def test_get(self):
- """An image is fetched."""
- fingerprint = hashlib.sha256(b'').hexdigest()
- a_image = models.Image.get(self.client, fingerprint)
-
- self.assertEqual(fingerprint, a_image.fingerprint)
-
- def test_get_not_found(self):
- """LXDAPIException is raised when the image isn't found."""
- def not_found(request, context):
- context.status_code = 404
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 404})
- self.add_rule({
- 'text': not_found,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855$', # NOQA
- })
-
- fingerprint = hashlib.sha256(b'').hexdigest()
-
- self.assertRaises(
- exceptions.LXDAPIException,
- models.Image.get, self.client, fingerprint)
-
- def test_get_error(self):
- """LXDAPIException is raised on error."""
- def error(request, context):
- context.status_code = 500
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 500})
- self.add_rule({
- 'text': error,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855$', # NOQA
- })
-
- fingerprint = hashlib.sha256(b'').hexdigest()
-
- self.assertRaises(
- exceptions.LXDAPIException,
- models.Image.get, self.client, fingerprint)
-
- def test_get_by_alias(self):
- fingerprint = hashlib.sha256(b'').hexdigest()
-
- a_image = models.Image.get_by_alias(self.client, 'an-alias')
-
- self.assertEqual(fingerprint, a_image.fingerprint)
-
- def test_all(self):
- """A list of all images is returned."""
- images = models.Image.all(self.client)
-
- self.assertEqual(1, len(images))
-
- def test_create(self):
- """An image is created."""
- fingerprint = hashlib.sha256(b'').hexdigest()
- a_image = models.Image.create(self.client, b'', public=True, wait=True)
-
- self.assertIsInstance(a_image, models.Image)
- self.assertEqual(fingerprint, a_image.fingerprint)
-
- def test_update(self):
- """An image is updated."""
- a_image = self.client.images.all()[0]
- a_image.sync()
-
- a_image.save()
-
- def test_fetch(self):
- """A partial object is fetched and populated."""
- a_image = self.client.images.all()[0]
-
- a_image.sync()
-
- self.assertEqual(1, a_image.size)
-
- def test_fetch_notfound(self):
- """A bogus image fetch raises LXDAPIException."""
- def not_found(request, context):
- context.status_code = 404
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 404})
- self.add_rule({
- 'text': not_found,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855$', # NOQA
- })
- fingerprint = hashlib.sha256(b'').hexdigest()
-
- a_image = models.Image(self.client, fingerprint=fingerprint)
-
- self.assertRaises(exceptions.LXDAPIException, a_image.sync)
-
- def test_fetch_error(self):
- """A 500 error raises LXDAPIException."""
- def not_found(request, context):
- context.status_code = 500
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 500})
- self.add_rule({
- 'text': not_found,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855$', # NOQA
- })
- fingerprint = hashlib.sha256(b'').hexdigest()
-
- a_image = models.Image(self.client, fingerprint=fingerprint)
-
- self.assertRaises(exceptions.LXDAPIException, a_image.sync)
-
- def test_delete(self):
- """An image is deleted."""
- # XXX: rockstar (03 Jun 2016) - This just executes
- # a code path. There should be an assertion here, but
- # it's not clear how to assert that, just yet.
- a_image = self.client.images.all()[0]
-
- a_image.delete(wait=True)
-
- def test_export(self):
- """An image is exported."""
- a_image = self.client.images.all()[0]
-
- data = a_image.export()
- data_sha = hashlib.sha256(data).hexdigest()
-
- self.assertEqual(a_image.fingerprint, data_sha)
-
- def test_export_not_found(self):
- """LXDAPIException is raised on export of bogus image."""
- def not_found(request, context):
- context.status_code = 404
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 404})
- self.add_rule({
- 'text': not_found,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855/export$', # NOQA
- })
- a_image = self.client.images.all()[0]
-
- self.assertRaises(exceptions.LXDAPIException, a_image.export)
-
- def test_export_error(self):
- """LXDAPIException is raised on API error."""
- def error(request, context):
- context.status_code = 500
- return json.dumps({
- 'type': 'error',
- 'error': 'LOLOLOLOL',
- 'error_code': 500})
- self.add_rule({
- 'text': error,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855/export$', # NOQA
- })
- a_image = self.client.images.all()[0]
-
- self.assertRaises(exceptions.LXDAPIException, a_image.export)
-
- def test_add_alias(self):
- """Try to add an alias."""
- a_image = self.client.images.all()[0]
- a_image.add_alias('lol', 'Just LOL')
-
- aliases = [a['name'] for a in a_image.aliases]
- self.assertTrue('lol' in aliases, "Image didn't get updated.")
-
- def test_add_alias_duplicate(self):
- """Adding a alias twice should raise an LXDAPIException."""
- def error(request, context):
- context.status_code = 409
- return json.dumps({
- 'type': 'error',
- 'error': 'already exists',
- 'error_code': 409})
- self.add_rule({
- 'text': error,
- 'method': 'POST',
- 'url': r'^http://pylxd.test/1.0/images/aliases$', # NOQA
- })
-
- a_image = self.client.images.all()[0]
-
- self.assertRaises(
- exceptions.LXDAPIException,
- a_image.add_alias,
- 'lol', 'Just LOL'
- )
-
- def test_remove_alias(self):
- """Try to remove an-alias."""
- a_image = self.client.images.all()[0]
- a_image.delete_alias('an-alias')
-
- self.assertEqual(0, len(a_image.aliases), "Alias didn't get deleted.")
-
- def test_remove_alias_error(self):
- """Try to remove an non existant alias."""
- def error(request, context):
- context.status_code = 404
- return json.dumps({
- 'type': 'error',
- 'error': 'not found',
- 'error_code': 404})
- self.add_rule({
- 'text': error,
- 'method': 'DELETE',
- 'url': r'^http://pylxd.test/1.0/images/aliases/lol$', # NOQA
- })
-
- a_image = self.client.images.all()[0]
- self.assertRaises(
- exceptions.LXDAPIException,
- a_image.delete_alias,
- 'lol'
- )
-
- def test_remove_alias_not_in_image(self):
- """Try to remove an alias which is not in the current image."""
- a_image = self.client.images.all()[0]
- a_image.delete_alias('b-alias')
-
- def test_copy(self):
- """Try to copy an image to another LXD instance."""
- from pylxd.client import Client
-
- a_image = self.client.images.all()[0]
-
- client2 = Client(endpoint='http://pylxd2.test')
- copied_image = a_image.copy(client2, wait=True)
- self.assertEqual(a_image.fingerprint, copied_image.fingerprint)
-
- def test_copy_public(self):
- """Try to copy a public image."""
- from pylxd.client import Client
-
- def image_get(request, context):
- context.status_code = 200
- return json.dumps({
- 'type': 'sync',
- 'metadata': {
- 'aliases': [
- {
- 'name': 'an-alias', # NOQA
- 'fingerprint': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', # NOQA
- }
- ],
- 'architecture': 'x86_64',
- 'cached': False,
- 'filename': 'a_image.tar.bz2',
- 'fingerprint': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', # NOQA
- 'public': True,
- 'properties': {},
- 'size': 1,
- 'auto_update': False,
- 'created_at': '1983-06-16T02:42:00Z',
- 'expires_at': '1983-06-16T02:42:00Z',
- 'last_used_at': '1983-06-16T02:42:00Z',
- 'uploaded_at': '1983-06-16T02:42:00Z',
-
- },
- })
- self.add_rule({
- 'text': image_get,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/images/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855$', # NOQA
- })
-
- a_image = self.client.images.all()[0]
- self.assertTrue(a_image.public)
-
- client2 = Client(endpoint='http://pylxd2.test')
- copied_image = a_image.copy(client2, wait=True)
- self.assertEqual(a_image.fingerprint, copied_image.fingerprint)
-
- def test_copy_no_wait(self):
- """Try to copy and don't wait."""
- from pylxd.client import Client
-
- a_image = self.client.images.all()[0]
-
- client2 = Client(endpoint='http://pylxd2.test')
- a_image.copy(client2, public=False, auto_update=False)
-
- def test_create_from_simplestreams(self):
- """Try to create an image from simplestreams."""
- image = self.client.images.create_from_simplestreams(
- 'https://cloud-images.ubuntu.com/releases',
- 'trusty/amd64'
- )
- self.assertEqual(
- 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
- image.fingerprint
- )
-
- def test_create_from_url(self):
- """Try to create an image from an URL."""
- image = self.client.images.create_from_url(
- 'https://dl.stgraber.org/lxd'
- )
- self.assertEqual(
- 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
- image.fingerprint
- )
diff --git a/pylxd/tests/test_model.py b/pylxd/tests/test_model.py
deleted file mode 100644
index ca5f1b7..0000000
--- a/pylxd/tests/test_model.py
+++ /dev/null
@@ -1,181 +0,0 @@
-# Copyright (c) 2016 Canonical Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from pylxd.models import _model as model
-from pylxd.tests import testing
-
-
-class Item(model.Model):
- """A fake model."""
- name = model.Attribute(readonly=True)
- age = model.Attribute(int)
- data = model.Attribute()
-
- @property
- def api(self):
- return self.client.api.items[self.name]
-
-
-class TestModel(testing.PyLXDTestCase):
- """Tests for pylxd.model.Model."""
-
- def setUp(self):
- super(TestModel, self).setUp()
-
- self.add_rule({
- 'json': {
- 'type': 'sync',
- 'metadata': {
- 'name': 'an-item',
- 'age': 1000,
- 'data': {'key': 'val'},
- }
- },
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/items/an-item',
- })
- self.add_rule({
- 'json': {
- 'type': 'sync',
- 'metadata': {}
- },
- 'method': 'PUT',
- 'url': r'^http://pylxd.test/1.0/items/an-item',
- })
- self.add_rule({
- 'json': {
- 'type': 'sync',
- 'metadata': {}
- },
- 'method': 'DELETE',
- 'url': r'^http://pylxd.test/1.0/items/an-item',
- })
-
- def test_init(self):
- """Initial attributes are set."""
- item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
-
- self.assertEqual(self.client, item.client)
- self.assertEqual('an-item', item.name)
-
- def test_init_unknown_attribute(self):
- """Unknown attributes aren't set."""
- item = Item(self.client, name='an-item', nonexistent='SRSLY')
-
- try:
- item.nonexistent
- self.fail('item.nonexistent did not raise AttributeError')
- except AttributeError:
- pass
-
- def test_unknown_attribute(self):
- """Setting unknown attributes raise an exception."""
- def set_unknown_attribute():
- item = Item(self.client, name='an-item')
- item.nonexistent = 'SRSLY'
- self.assertRaises(AttributeError, set_unknown_attribute)
-
- def test_get_unknown_attribute(self):
- """Setting unknown attributes raise an exception."""
- def get_unknown_attribute():
- item = Item(self.client, name='an-item')
- return item.nonexistent
- self.assertRaises(AttributeError, get_unknown_attribute)
-
- def test_unset_attribute_sync(self):
- """Reading unavailable attributes calls sync."""
- item = Item(self.client, name='an-item')
-
- self.assertEqual(1000, item.age)
-
- def test_sync(self):
- """A sync will update attributes from the server."""
- item = Item(self.client, name='an-item')
-
- item.sync()
-
- self.assertEqual(1000, item.age)
-
- def test_sync_dirty(self):
- """Sync will not overwrite local attribute changes."""
- item = Item(self.client, name='an-item')
-
- item.age = 250
- item.sync()
-
- self.assertEqual(250, item.age)
-
- def test_rollback(self):
- """Rollback resets the object from the server."""
- item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
-
- item.age = 50
- item.rollback()
-
- self.assertEqual(1000, item.age)
- self.assertFalse(item.dirty)
-
- def test_int_attribute_validator(self):
- """Age is set properly to be an int."""
- item = Item(self.client)
-
- item.age = '100'
-
- self.assertEqual(100, item.age)
-
- def test_int_attribute_invalid(self):
- """TypeError is raised when data can't be converted to type."""
- def set_string():
- item = Item(self.client)
- item.age = 'abc'
-
- self.assertRaises(ValueError, set_string)
-
- def test_dirty(self):
- """Changes mark the object as dirty."""
- item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
-
- item.age = 100
-
- self.assertTrue(item.dirty)
-
- def test_not_dirty(self):
- """Changes mark the object as dirty."""
- item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
-
- self.assertFalse(item.dirty)
-
- def test_marshall(self):
- """The object is marshalled into a dict."""
- item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
-
- result = item.marshall()
-
- self.assertEqual({'age': 15, 'data': {'key': 'val'}}, result)
-
- def test_delete(self):
- """The object is deleted, and client is unset."""
- item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
-
- item.delete()
-
- self.assertIsNone(item.client)
-
- def test_save(self):
- """Attributes are written to the server; object is marked clean."""
- item = Item(self.client, name='an-item', age=15, data={'key': 'val'})
-
- item.age = 69
- item.save()
-
- self.assertFalse(item.dirty)
diff --git a/pylxd/tests/test_network.py b/pylxd/tests/test_network.py
deleted file mode 100644
index c815438..0000000
--- a/pylxd/tests/test_network.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Copyright (c) 2016 Canonical Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from pylxd import models
-from pylxd.tests import testing
-
-
-class TestNetwork(testing.PyLXDTestCase):
- """Tests for pylxd.models.Network."""
-
- def test_all(self):
- """A list of all networks are returned."""
- networks = models.Network.all(self.client)
-
- self.assertEqual(1, len(networks))
-
- def test_get(self):
- """Return a container."""
- name = 'lo'
-
- an_network = models.Network.get(self.client, name)
-
- self.assertEqual(name, an_network.name)
-
- def test_partial(self):
- """A partial network is synced."""
- an_network = models.Network(self.client, name='lo')
-
- self.assertEqual('loopback', an_network.type)
-
- def test_delete(self):
- """delete is not implemented in networks."""
- an_network = models.Network(self.client, name='lo')
-
- with self.assertRaises(NotImplementedError):
- an_network.delete()
-
- def test_save(self):
- """save is not implemented in networks."""
- an_network = models.Network(self.client, name='lo')
-
- with self.assertRaises(NotImplementedError):
- an_network.save()
diff --git a/pylxd/tests/test_operation.py b/pylxd/tests/test_operation.py
deleted file mode 100644
index 8c74cc9..0000000
--- a/pylxd/tests/test_operation.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright (c) 2016 Canonical Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from pylxd import exceptions, models
-from pylxd.tests import testing
-
-
-class TestOperation(testing.PyLXDTestCase):
- """Tests for pylxd.models.Operation."""
-
- def test_get(self):
- """Return an operation."""
- name = 'operation-abc'
-
- an_operation = models.Operation.get(self.client, name)
-
- self.assertEqual(name, an_operation.id)
-
- def test_get_full_path(self):
- """Return an operation even if the full path is specified."""
- name = '/1.0/operations/operation-abc'
-
- an_operation = models.Operation.get(self.client, name)
-
- self.assertEqual('operation-abc', an_operation.id)
-
- def test_wait_with_error(self):
- """If the operation errors, wait raises an exception."""
- def error(request, context):
- context.status_code = 200
- return {
- 'type': 'sync',
- 'metadata': {
- 'status': 'Failure',
- 'err': 'Keep your foot off the blasted samoflange.',
- }}
- self.add_rule({
- 'json': error,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/operations/operation-abc/wait$', # NOQA
- })
-
- name = '/1.0/operations/operation-abc'
-
- an_operation = models.Operation.get(self.client, name)
-
- self.assertRaises(exceptions.LXDAPIException, an_operation.wait)
diff --git a/pylxd/tests/test_profile.py b/pylxd/tests/test_profile.py
deleted file mode 100644
index 6a357aa..0000000
--- a/pylxd/tests/test_profile.py
+++ /dev/null
@@ -1,137 +0,0 @@
-import json
-
-from pylxd import exceptions, models
-from pylxd.tests import testing
-
-
-class TestProfile(testing.PyLXDTestCase):
- """Tests for pylxd.models.Profile."""
-
- def test_get(self):
- """A profile is fetched."""
- name = 'an-profile'
- an_profile = models.Profile.get(self.client, name)
-
- self.assertEqual(name, an_profile.name)
-
- def test_get_not_found(self):
- """LXDAPIException is raised on unknown profiles."""
- def not_found(request, context):
- context.status_code = 404
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 404})
- self.add_rule({
- 'text': not_found,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/profiles/an-profile$',
- })
-
- self.assertRaises(
- exceptions.LXDAPIException,
- models.Profile.get, self.client, 'an-profile')
-
- def test_get_error(self):
- """LXDAPIException is raised on get error."""
- def error(request, context):
- context.status_code = 500
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 500})
- self.add_rule({
- 'text': error,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/profiles/an-profile$',
- })
-
- self.assertRaises(
- exceptions.LXDAPIException,
- models.Profile.get, self.client, 'an-profile')
-
- def test_all(self):
- """A list of all profiles is returned."""
- profiles = models.Profile.all(self.client)
-
- self.assertEqual(1, len(profiles))
-
- def test_create(self):
- """A new profile is created."""
- an_profile = models.Profile.create(
- self.client, name='an-new-profile', config={}, devices={})
-
- self.assertIsInstance(an_profile, models.Profile)
- self.assertEqual('an-new-profile', an_profile.name)
-
- def test_rename(self):
- """A profile is renamed."""
- an_profile = models.Profile.get(self.client, 'an-profile')
-
- an_renamed_profile = an_profile.rename('an-renamed-profile')
-
- self.assertEqual('an-renamed-profile', an_renamed_profile.name)
-
- def test_update(self):
- """A profile is updated."""
- # XXX: rockstar (03 Jun 2016) - This just executes
- # a code path. There should be an assertion here, but
- # it's not clear how to assert that, just yet.
- an_profile = models.Profile.get(self.client, 'an-profile')
-
- an_profile.save()
-
- self.assertEqual({}, an_profile.config)
-
- def test_fetch(self):
- """A partially fetched profile is made complete."""
- an_profile = self.client.profiles.all()[0]
-
- an_profile.sync()
-
- self.assertEqual('An description', an_profile.description)
-
- def test_fetch_notfound(self):
- """LXDAPIException is raised on bogus profile fetches."""
- def not_found(request, context):
- context.status_code = 404
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 404})
- self.add_rule({
- 'text': not_found,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/profiles/an-profile$',
- })
-
- an_profile = models.Profile(self.client, name='an-profile')
-
- self.assertRaises(exceptions.LXDAPIException, an_profile.sync)
-
- def test_fetch_error(self):
- """LXDAPIException is raised on fetch error."""
- def error(request, context):
- context.status_code = 500
- return json.dumps({
- 'type': 'error',
- 'error': 'Not found',
- 'error_code': 500})
- self.add_rule({
- 'text': error,
- 'method': 'GET',
- 'url': r'^http://pylxd.test/1.0/profiles/an-profile$',
- })
-
- an_profile = models.Profile(self.client, name='an-profile')
-
- self.assertRaises(exceptions.LXDAPIException, an_profile.sync)
-
- def test_delete(self):
- """A profile is deleted."""
- # XXX: rockstar (03 Jun 2016) - This just executes
- # a code path. There should be an assertion here, but
- # it's not clear how to assert that, just yet.
- an_profile = self.client.profiles.all()[0]
-
- an_profile.delete()
More information about the lxc-devel
mailing list