[lxc-devel] [lxd/master] Image sync
adglkh on Github
lxc-bot at linuxcontainers.org
Mon Oct 29 07:27:40 UTC 2018
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 386 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20181029/70e39ab5/attachment.bin>
-------------- next part --------------
From c89e57ddbb8bb119ae81a9089ed3b9ebf941cd5c Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 21 Sep 2018 15:36:54 +0800
Subject: [PATCH 1/7] Add a new config item to specify the numbers of nodes for
image synchronization.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/cluster/config.go | 1 +
1 file changed, 1 insertion(+)
diff --git a/lxd/cluster/config.go b/lxd/cluster/config.go
index a7930d71de..c16c9671ce 100644
--- a/lxd/cluster/config.go
+++ b/lxd/cluster/config.go
@@ -218,6 +218,7 @@ func configGet(cluster *db.Cluster) (*Config, error) {
var ConfigSchema = config.Schema{
"backups.compression_algorithm": {Default: "gzip", Validator: validateCompression},
"cluster.offline_threshold": {Type: config.Int64, Default: offlineThresholdDefault(), Validator: offlineThresholdValidator},
+ "cluster.image_sync_nodes": {Type: config.Int64, Default: "5"},
"core.https_allowed_headers": {},
"core.https_allowed_methods": {},
"core.https_allowed_origin": {},
From dfafc12cd4da4962905d27c295742ca2745932b3 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 21 Sep 2018 15:56:18 +0800
Subject: [PATCH 2/7] Add a func for cluster to check all online node addresses
with the given image in db.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/db/images.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 46 insertions(+)
diff --git a/lxd/db/images.go b/lxd/db/images.go
index 56bd993fde..eb4e04a74e 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -889,3 +889,49 @@ func (c *Cluster) ImageUploadedAt(id int, uploadedAt time.Time) error {
err := exec(c.db, "UPDATE images SET upload_date=? WHERE id=?", uploadedAt, id)
return err
}
+
+// ImageGetNodesWithImage returns the list of the address of online nodes
+// which already have the image.
+// Note: the local address is not included in the returned address list.
+func (c *Cluster) ImageGetNodesWithImage(fingerprint string) ([]string, error) {
+ stmt := `
+ SELECT nodes.address FROM nodes
+ LEFT JOIN images_nodes ON images_nodes.node_id = nodes.id
+ LEFT JOIN images ON images_nodes.image_id = images.id
+ WHERE images.fingerprint = ?
+ `
+ var localAddress string // Address of this node
+ var addresses []string // Addresses of online nodes with the image
+
+ err := c.Transaction(func(tx *ClusterTx) error {
+ offlineThreshold, err := tx.NodeOfflineThreshold()
+ if err != nil {
+ return err
+ }
+
+ localAddress, err = tx.NodeAddress()
+ if err != nil {
+ return err
+ }
+ allAddresses, err := query.SelectStrings(tx.tx, stmt, fingerprint)
+ if err != nil {
+ return err
+ }
+ for _, address := range allAddresses {
+ node, err := tx.NodeByAddress(address)
+ if err != nil {
+ return err
+ }
+ if address == localAddress || node.IsOffline(offlineThreshold) {
+ continue
+ }
+ addresses = append(addresses, address)
+ }
+ return err
+ })
+ if err != nil {
+ return "", err
+ }
+
+ return addresses, nil
+}
From c76e7dd22f155200390165036f6530a4efda1b55 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 21 Sep 2018 18:34:20 +0800
Subject: [PATCH 3/7] Choose the proper nodes for image synchronization.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/cluster/config.go | 11 +++++++-
lxd/db/db.go | 1 -
lxd/db/images.go | 4 +--
lxd/images.go | 66 +++++++++++++++++++++++++++++++++++++++++++
4 files changed, 78 insertions(+), 4 deletions(-)
diff --git a/lxd/cluster/config.go b/lxd/cluster/config.go
index c16c9671ce..036844eb75 100644
--- a/lxd/cluster/config.go
+++ b/lxd/cluster/config.go
@@ -127,6 +127,11 @@ func (c *Config) OfflineThreshold() time.Duration {
return time.Duration(n) * time.Second
}
+// ImageSyncNodes returns the numbers of nodes for images synchronization inside of cluster.
+func (c *Config) ImageSyncNodes() int64 {
+ return c.m.GetInt64("cluster.image_sync_nodes")
+}
+
// Dump current configuration keys and their values. Keys with values matching
// their defaults are omitted.
func (c *Config) Dump() map[string]interface{} {
@@ -218,7 +223,7 @@ func configGet(cluster *db.Cluster) (*Config, error) {
var ConfigSchema = config.Schema{
"backups.compression_algorithm": {Default: "gzip", Validator: validateCompression},
"cluster.offline_threshold": {Type: config.Int64, Default: offlineThresholdDefault(), Validator: offlineThresholdValidator},
- "cluster.image_sync_nodes": {Type: config.Int64, Default: "5"},
+ "cluster.image_sync_nodes": {Type: config.Int64, Default: imageSyncNodesDefault()},
"core.https_allowed_headers": {},
"core.https_allowed_methods": {},
"core.https_allowed_origin": {},
@@ -253,6 +258,10 @@ func offlineThresholdDefault() string {
return strconv.Itoa(db.DefaultOfflineThreshold)
}
+func imageSyncNodesDefault() int64 {
+ return DefaultImageSyncNodes
+}
+
func offlineThresholdValidator(value string) error {
// Ensure that the given value is greater than the heartbeat interval,
// which is the lower bound granularity of the offline check.
diff --git a/lxd/db/db.go b/lxd/db/db.go
index 02ab33ff11..1b84267432 100644
--- a/lxd/db/db.go
+++ b/lxd/db/db.go
@@ -6,7 +6,6 @@ import (
"sync"
"time"
- "github.com/CanonicalLtd/go-dqlite"
"github.com/pkg/errors"
"github.com/lxc/lxd/lxd/db/cluster"
diff --git a/lxd/db/images.go b/lxd/db/images.go
index eb4e04a74e..3bf4806031 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -890,10 +890,10 @@ func (c *Cluster) ImageUploadedAt(id int, uploadedAt time.Time) error {
return err
}
-// ImageGetNodesWithImage returns the list of the address of online nodes
+// ImageGetNodesHasImage returns the list of the address of online nodes
// which already have the image.
// Note: the local address is not included in the returned address list.
-func (c *Cluster) ImageGetNodesWithImage(fingerprint string) ([]string, error) {
+func (c *Cluster) ImageGetNodesHasImage(fingerprint string) ([]string, error) {
stmt := `
SELECT nodes.address FROM nodes
LEFT JOIN images_nodes ON images_nodes.node_id = nodes.id
diff --git a/lxd/images.go b/lxd/images.go
index 7dd70da63f..b907944ae5 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -712,6 +712,30 @@ func imagesPost(d *Daemon, r *http.Request) Response {
return err
}
+ // Check how many nodes already have this image
+ nodeAddresses := d.cluster.ImageGetNodesHasImage(info.Fingerprint)
+ var syncNodes int64
+ err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
+ config, err := cluster.ConfigLoad(tx)
+ if err != nil {
+ return errors.Wrap(err, "Failed to load cluster configuration")
+ }
+ syncNodes = config.ImageSyncNodes()
+ })
+ if len(nodeAddresses) < syncNodes {
+ nodesInfo, err := chooseNodesForImageSync(nodeAddresses);
+ if err != nil {
+ return errors.Wrap(err, "Failed to choose node for image synchronization")
+ }
+ for _, node := range nodesInfo {
+ client, err := cluster.Connect(node.Address, d.endpoints.NetworkCert(), false)
+ if err != nil {
+ return errors.Wrap(err, "Failed to connect node for image synchronization")
+ }
+ client.CreateImage()
+ }
+ }
+
// Apply any provided alias
for _, alias := range req.Aliases {
_, _, err := d.cluster.ImageAliasGet(project, alias.Name, true)
@@ -1909,3 +1933,45 @@ func imageRefresh(d *Daemon, r *http.Request) Response {
return OperationResponse(op)
}
+
+func chooseNodesForImageSync(hasImageNodes []NodeInfo) ([]NodeInfo, error) {
+ nodesInfo, err = d.cluster.Transaction(func(tx *ClusterTx) ([]NodeInfo, error) {
+ nodes, err := tx.Nodes()
+ if err != nil {
+ return errors.Wrap(err, "Failed to fetch nodes")
+ }
+ if len(nodes) == 1 && nodes[0].Address == "0.0.0.0" {
+ // We're not clustered
+ return []NodeInfo{}, nil
+ }
+
+ return nodes
+ })
+
+ if err != nil {
+ return errors.Wrap(err, "Failed to fetch nodes")
+ }
+
+ // Only sync the image with the nodes which don't have the image stored before
+ var nodesForImageSync NodeInfo[]
+ for _, node := range nodesInfo {
+ stored := false
+ for _, imageNode := range hasImageNodes {
+ if imageNode.Address == node.Address {
+ stored = true
+ }
+ }
+
+ if !stored {
+ nodesForImageSync := append(nodesForImageSync, node)
+ }
+ }
+
+ // Sort the nodes according to the hearbeat
+ // The more responsive the node is, the higher priority the node comes
+ sort.Slice(nodesForImageSync, func(i, j int) bool {
+ return nodesForImageSync[i].Heartbeat > nodesInfo[j].Heartbeat
+ })
+
+ return nodesForImageSync, err
+}
From 279516f5db99aae231e113f9046690bdb5c2beb1 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 21 Sep 2018 20:13:23 +0800
Subject: [PATCH 4/7] Add the validator for cluster.image_sync_node_count and
store the value in db.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/cluster/config.go | 23 +++++++++++++++++------
lxd/db/images.go | 22 ++++++++++++++++++++++
lxd/images.go | 3 ++-
3 files changed, 41 insertions(+), 7 deletions(-)
diff --git a/lxd/cluster/config.go b/lxd/cluster/config.go
index 036844eb75..72ae39130d 100644
--- a/lxd/cluster/config.go
+++ b/lxd/cluster/config.go
@@ -127,9 +127,9 @@ func (c *Config) OfflineThreshold() time.Duration {
return time.Duration(n) * time.Second
}
-// ImageSyncNodes returns the numbers of nodes for images synchronization inside of cluster.
-func (c *Config) ImageSyncNodes() int64 {
- return c.m.GetInt64("cluster.image_sync_nodes")
+// ImageSyncNodeCount returns the numbers of nodes for images synchronization inside of cluster.
+func (c *Config) ImageSyncNodeCount() int64 {
+ return c.m.GetInt64("cluster.image_sync_node_count")
}
// Dump current configuration keys and their values. Keys with values matching
@@ -223,7 +223,7 @@ func configGet(cluster *db.Cluster) (*Config, error) {
var ConfigSchema = config.Schema{
"backups.compression_algorithm": {Default: "gzip", Validator: validateCompression},
"cluster.offline_threshold": {Type: config.Int64, Default: offlineThresholdDefault(), Validator: offlineThresholdValidator},
- "cluster.image_sync_nodes": {Type: config.Int64, Default: imageSyncNodesDefault()},
+ "cluster.image_sync_node_count": {Type: config.Int64, Default: imageSyncNodeCountDefault(), Validator: imageSyncNodeCountValidator},
"core.https_allowed_headers": {},
"core.https_allowed_methods": {},
"core.https_allowed_origin": {},
@@ -258,8 +258,8 @@ func offlineThresholdDefault() string {
return strconv.Itoa(db.DefaultOfflineThreshold)
}
-func imageSyncNodesDefault() int64 {
- return DefaultImageSyncNodes
+func imageSyncNodeCountDefault() string {
+ return strconv.Itoa(db.DefaultImageSyncNodeCount)
}
func offlineThresholdValidator(value string) error {
@@ -275,6 +275,17 @@ func offlineThresholdValidator(value string) error {
return nil
}
+func imageSyncNodeCountValidator(value string) error {
+ imageSyncNodeCount, err := strconv.Atoi(value)
+ if err != nil {
+ return fmt.Errorf("image sync node count is not a number")
+ }
+ if imageSyncNodeCount < 0 {
+ return fmt.Errorf("value must be greater than or equal to 0")
+ }
+ return nil
+}
+
func passwordSetter(value string) (string, error) {
// Nothing to do on unset
if value == "" {
diff --git a/lxd/db/images.go b/lxd/db/images.go
index 3bf4806031..6277939367 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -3,6 +3,7 @@ package db
import (
"database/sql"
"fmt"
+ "strconv"
"time"
"github.com/lxc/lxd/lxd/db/query"
@@ -890,6 +891,23 @@ func (c *Cluster) ImageUploadedAt(id int, uploadedAt time.Time) error {
return err
}
+// ImageSyncNodeCount returns the number of nodes that needs to used for image synchronization.
+func (c *Cluster) ImageSyncNodeCount() error {
+ nodeCount := DefaultImageSyncNodeCount
+ values, err := query.SelectStrings(
+ c.tx, "SELECT value FROM config WHERE key='cluster.image_sync_node_count'")
+ if err != nil {
+ return 0, err
+ }
+ if len(values) > 0 {
+ nodeCount, err := strconv.Atoi(values[0])
+ if err != nil {
+ return 0, err
+ }
+ }
+ return nodeCount, nil
+}
+
// ImageGetNodesHasImage returns the list of the address of online nodes
// which already have the image.
// Note: the local address is not included in the returned address list.
@@ -935,3 +953,7 @@ func (c *Cluster) ImageGetNodesHasImage(fingerprint string) ([]string, error) {
return addresses, nil
}
+
+// DefaultImageSyncNodeCount is the default value for the numbers of nodes
+// for image synchronization
+const DefaultImageSyncNodeCount = 3
diff --git a/lxd/images.go b/lxd/images.go
index b907944ae5..a8e297d481 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -720,7 +720,8 @@ func imagesPost(d *Daemon, r *http.Request) Response {
if err != nil {
return errors.Wrap(err, "Failed to load cluster configuration")
}
- syncNodes = config.ImageSyncNodes()
+ // TODO: which one should I use for the offline storage of image sync node count
+ syncNodes = tx.ImageSyncNodeCount()
})
if len(nodeAddresses) < syncNodes {
nodesInfo, err := chooseNodesForImageSync(nodeAddresses);
From e350f6e3a2bdc28c88de6b97c2d1fe73405fefbb Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 21 Sep 2018 21:16:32 +0800
Subject: [PATCH 5/7] Get the nodes which don't store the image for the
synchronization.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/db/images.go | 45 +++++++++-----------
lxd/images.go | 108 ++++++++++++++++++-----------------------------
2 files changed, 61 insertions(+), 92 deletions(-)
diff --git a/lxd/db/images.go b/lxd/db/images.go
index 6277939367..4a8b866e4b 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -3,7 +3,6 @@ package db
import (
"database/sql"
"fmt"
- "strconv"
"time"
"github.com/lxc/lxd/lxd/db/query"
@@ -891,33 +890,34 @@ func (c *Cluster) ImageUploadedAt(id int, uploadedAt time.Time) error {
return err
}
-// ImageSyncNodeCount returns the number of nodes that needs to used for image synchronization.
-func (c *Cluster) ImageSyncNodeCount() error {
- nodeCount := DefaultImageSyncNodeCount
- values, err := query.SelectStrings(
- c.tx, "SELECT value FROM config WHERE key='cluster.image_sync_node_count'")
- if err != nil {
- return 0, err
- }
- if len(values) > 0 {
- nodeCount, err := strconv.Atoi(values[0])
- if err != nil {
- return 0, err
- }
- }
- return nodeCount, nil
-}
-
// ImageGetNodesHasImage returns the list of the address of online nodes
// which already have the image.
// Note: the local address is not included in the returned address list.
func (c *Cluster) ImageGetNodesHasImage(fingerprint string) ([]string, error) {
- stmt := `
+ return c.getNodesByImageFingerprint(fingerprint, true)
+}
+
+// ImageGetNodesHasNoImage returns the list of the address of online nodes
+// which don't have the image.
+// Note: the local address is not included in the returned address list.
+func (c *Cluster) ImageGetNodesHasNoImage(fingerprint string) ([]string, error) {
+ return c.getNodesByImageFingerprint(fingerprint, false)
+}
+
+func (c *Cluster) getNodesByImageFingerprint(fingerprint string, bool hasImage) ([]string, error) {
+ q := `
SELECT nodes.address FROM nodes
LEFT JOIN images_nodes ON images_nodes.node_id = nodes.id
- LEFT JOIN images ON images_nodes.image_id = images.id
+ LEFT JOIN images ON images_nodes.image_id %s images.id
WHERE images.fingerprint = ?
`
+ var stmt string
+ if hasImage {
+ stmt = fmt.Sprintf(q, "=")
+ } else {
+ stmt = fmt.Sprintf(q, "!=")
+ }
+
var localAddress string // Address of this node
var addresses []string // Addresses of online nodes with the image
@@ -947,11 +947,8 @@ func (c *Cluster) ImageGetNodesHasImage(fingerprint string) ([]string, error) {
}
return err
})
- if err != nil {
- return "", err
- }
- return addresses, nil
+ return addresses, err
}
// DefaultImageSyncNodeCount is the default value for the numbers of nodes
diff --git a/lxd/images.go b/lxd/images.go
index a8e297d481..c571109e91 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -712,31 +712,6 @@ func imagesPost(d *Daemon, r *http.Request) Response {
return err
}
- // Check how many nodes already have this image
- nodeAddresses := d.cluster.ImageGetNodesHasImage(info.Fingerprint)
- var syncNodes int64
- err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
- config, err := cluster.ConfigLoad(tx)
- if err != nil {
- return errors.Wrap(err, "Failed to load cluster configuration")
- }
- // TODO: which one should I use for the offline storage of image sync node count
- syncNodes = tx.ImageSyncNodeCount()
- })
- if len(nodeAddresses) < syncNodes {
- nodesInfo, err := chooseNodesForImageSync(nodeAddresses);
- if err != nil {
- return errors.Wrap(err, "Failed to choose node for image synchronization")
- }
- for _, node := range nodesInfo {
- client, err := cluster.Connect(node.Address, d.endpoints.NetworkCert(), false)
- if err != nil {
- return errors.Wrap(err, "Failed to connect node for image synchronization")
- }
- client.CreateImage()
- }
- }
-
// Apply any provided alias
for _, alias := range req.Aliases {
_, _, err := d.cluster.ImageAliasGet(project, alias.Name, true)
@@ -764,6 +739,46 @@ func imagesPost(d *Daemon, r *http.Request) Response {
metadata["fingerprint"] = info.Fingerprint
metadata["size"] = strconv.FormatInt(info.Size, 10)
op.UpdateMetadata(metadata)
+
+ // Check how many nodes already have this image
+ nodeAddresses, err := d.cluster.ImageGetNodesHasImage(info.Fingerprint)
+ if err != nil {
+ return errors.Wrap(err, "Failed to get nodes for the image synchronization")
+ }
+
+ var syncNodeCount int64
+ err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
+ config, err := cluster.ConfigLoad(tx)
+ if err != nil {
+ return errors.Wrap(err, "Failed to load cluster configuration")
+ }
+ syncNodeCount = config.ImageSyncNodeCount()
+ })
+ if len(nodeAddresses) < syncNodeCount {
+ nodeAddresses, err := d.cluster.ImageGetNodesHasNoImage()
+ if err != nil {
+ return errors.Wrap(err, "Failed to get nodes for the image synchronization")
+ }
+
+ for _, address := range nodeAddresses {
+ client, err := cluster.Connect(address, d.endpoints.NetworkCert(), false)
+ if err != nil {
+ return errors.Wrap(err, "Failed to connect node for image synchronization")
+ }
+ op, err := client.CreateImage(req, nil)
+ if err != nil {
+ return err
+ }
+
+ err = op.Wait()
+ if err != nil {
+ return err
+ }
+ // opAPI := op.Get()
+ // Check the publish.go line:216
+ }
+ }
+
return nil
}
@@ -1932,47 +1947,4 @@ func imageRefresh(d *Daemon, r *http.Request) Response {
}
return OperationResponse(op)
-
-}
-
-func chooseNodesForImageSync(hasImageNodes []NodeInfo) ([]NodeInfo, error) {
- nodesInfo, err = d.cluster.Transaction(func(tx *ClusterTx) ([]NodeInfo, error) {
- nodes, err := tx.Nodes()
- if err != nil {
- return errors.Wrap(err, "Failed to fetch nodes")
- }
- if len(nodes) == 1 && nodes[0].Address == "0.0.0.0" {
- // We're not clustered
- return []NodeInfo{}, nil
- }
-
- return nodes
- })
-
- if err != nil {
- return errors.Wrap(err, "Failed to fetch nodes")
- }
-
- // Only sync the image with the nodes which don't have the image stored before
- var nodesForImageSync NodeInfo[]
- for _, node := range nodesInfo {
- stored := false
- for _, imageNode := range hasImageNodes {
- if imageNode.Address == node.Address {
- stored = true
- }
- }
-
- if !stored {
- nodesForImageSync := append(nodesForImageSync, node)
- }
- }
-
- // Sort the nodes according to the hearbeat
- // The more responsive the node is, the higher priority the node comes
- sort.Slice(nodesForImageSync, func(i, j int) bool {
- return nodesForImageSync[i].Heartbeat > nodesInfo[j].Heartbeat
- })
-
- return nodesForImageSync, err
}
From 1fe28460f9525b7678b4664cd0fec68b0c8afd93 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Sat, 22 Sep 2018 22:59:03 +0800
Subject: [PATCH 6/7] Do not create an image once the image already exists on
the container.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/db/db.go | 1 +
lxd/db/images.go | 13 +++++------
lxd/images.go | 57 +++++++++++++++++++++++++++++++++---------------
3 files changed, 46 insertions(+), 25 deletions(-)
diff --git a/lxd/db/db.go b/lxd/db/db.go
index 1b84267432..02ab33ff11 100644
--- a/lxd/db/db.go
+++ b/lxd/db/db.go
@@ -6,6 +6,7 @@ import (
"sync"
"time"
+ "github.com/CanonicalLtd/go-dqlite"
"github.com/pkg/errors"
"github.com/lxc/lxd/lxd/db/cluster"
diff --git a/lxd/db/images.go b/lxd/db/images.go
index 4a8b866e4b..e7295050fc 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -904,13 +904,13 @@ func (c *Cluster) ImageGetNodesHasNoImage(fingerprint string) ([]string, error)
return c.getNodesByImageFingerprint(fingerprint, false)
}
-func (c *Cluster) getNodesByImageFingerprint(fingerprint string, bool hasImage) ([]string, error) {
+func (c *Cluster) getNodesByImageFingerprint(fingerprint string, hasImage bool) ([]string, error) {
q := `
- SELECT nodes.address FROM nodes
- LEFT JOIN images_nodes ON images_nodes.node_id = nodes.id
- LEFT JOIN images ON images_nodes.image_id %s images.id
- WHERE images.fingerprint = ?
- `
+SELECT nodes.address FROM nodes
+ LEFT JOIN images_nodes ON images_nodes.node_id = nodes.id
+ LEFT JOIN images ON images_nodes.image_id %s images.id
+WHERE images.fingerprint = ?
+`
var stmt string
if hasImage {
stmt = fmt.Sprintf(q, "=")
@@ -947,7 +947,6 @@ func (c *Cluster) getNodesByImageFingerprint(fingerprint string, bool hasImage)
}
return err
})
-
return addresses, err
}
diff --git a/lxd/images.go b/lxd/images.go
index c571109e91..d13ae5732c 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -734,51 +734,72 @@ func imagesPost(d *Daemon, r *http.Request) Response {
}
}
- // Set the metadata
- metadata := make(map[string]string)
- metadata["fingerprint"] = info.Fingerprint
- metadata["size"] = strconv.FormatInt(info.Size, 10)
- op.UpdateMetadata(metadata)
-
// Check how many nodes already have this image
- nodeAddresses, err := d.cluster.ImageGetNodesHasImage(info.Fingerprint)
+ syncNodeAddresses, err := d.cluster.ImageGetNodesHasImage(info.Fingerprint)
if err != nil {
return errors.Wrap(err, "Failed to get nodes for the image synchronization")
}
- var syncNodeCount int64
- err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
+ var desiredSyncNodeCount int64
+ err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
config, err := cluster.ConfigLoad(tx)
if err != nil {
return errors.Wrap(err, "Failed to load cluster configuration")
}
- syncNodeCount = config.ImageSyncNodeCount()
+ desiredSyncNodeCount = config.ImageSyncNodeCount()
+ return nil
})
- if len(nodeAddresses) < syncNodeCount {
- nodeAddresses, err := d.cluster.ImageGetNodesHasNoImage()
+ nodeCount := desiredSyncNodeCount - int64(len(syncNodeAddresses))
+ if nodeCount > 0 {
+ addresses, err := d.cluster.ImageGetNodesHasNoImage(info.Fingerprint)
if err != nil {
return errors.Wrap(err, "Failed to get nodes for the image synchronization")
}
- for _, address := range nodeAddresses {
- client, err := cluster.Connect(address, d.endpoints.NetworkCert(), false)
+ min := func(x, y int64) int64 {
+ if x > y {
+ return y
+ }
+ return x
+ }
+
+ for idx := int64(0); idx < min(int64(len(addresses)), nodeCount); idx++ {
+ client, err := cluster.Connect(addresses[idx], d.endpoints.NetworkCert(), false)
if err != nil {
return errors.Wrap(err, "Failed to connect node for image synchronization")
}
- op, err := client.CreateImage(req, nil)
+
+ // We spread the image for the nodes inside of cluster and we need to double
+ // check if the image already exists since when one certain node is going to
+ // create an image it will goes into the same routine.
+ fingerprints, err := client.GetImageFingerprints()
+ if err != nil {
+ return errors.Wrap(err, "Failed to fetch container's all image finger prints")
+ }
+
+ for _, fingerprint := range fingerprints {
+ if info.Fingerprint == fingerprint {
+ continue
+ }
+ }
+
+ opImgCreation, err := client.CreateImage(req, nil)
if err != nil {
return err
}
- err = op.Wait()
+ err = opImgCreation.Wait()
if err != nil {
return err
}
- // opAPI := op.Get()
- // Check the publish.go line:216
}
}
+ // Set the metadata
+ metadata := make(map[string]string)
+ metadata["fingerprint"] = info.Fingerprint
+ metadata["size"] = strconv.FormatInt(info.Size, 10)
+ op.UpdateMetadata(metadata)
return nil
}
From 16e23c30e682f1de9966bc90d9338ad07853ff45 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Mon, 29 Oct 2018 14:52:42 +0800
Subject: [PATCH 7/7] Compose ImageCreateArgs and ImagesPost structure before
creating an image on each node.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/images.go | 139 +++++++++++++++++++++++++++++---------------------
1 file changed, 80 insertions(+), 59 deletions(-)
diff --git a/lxd/images.go b/lxd/images.go
index d13ae5732c..d7ec00acdc 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -734,65 +734,9 @@ func imagesPost(d *Daemon, r *http.Request) Response {
}
}
- // Check how many nodes already have this image
- syncNodeAddresses, err := d.cluster.ImageGetNodesHasImage(info.Fingerprint)
- if err != nil {
- return errors.Wrap(err, "Failed to get nodes for the image synchronization")
- }
-
- var desiredSyncNodeCount int64
- err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
- config, err := cluster.ConfigLoad(tx)
- if err != nil {
- return errors.Wrap(err, "Failed to load cluster configuration")
- }
- desiredSyncNodeCount = config.ImageSyncNodeCount()
- return nil
- })
- nodeCount := desiredSyncNodeCount - int64(len(syncNodeAddresses))
- if nodeCount > 0 {
- addresses, err := d.cluster.ImageGetNodesHasNoImage(info.Fingerprint)
- if err != nil {
- return errors.Wrap(err, "Failed to get nodes for the image synchronization")
- }
-
- min := func(x, y int64) int64 {
- if x > y {
- return y
- }
- return x
- }
-
- for idx := int64(0); idx < min(int64(len(addresses)), nodeCount); idx++ {
- client, err := cluster.Connect(addresses[idx], d.endpoints.NetworkCert(), false)
- if err != nil {
- return errors.Wrap(err, "Failed to connect node for image synchronization")
- }
-
- // We spread the image for the nodes inside of cluster and we need to double
- // check if the image already exists since when one certain node is going to
- // create an image it will goes into the same routine.
- fingerprints, err := client.GetImageFingerprints()
- if err != nil {
- return errors.Wrap(err, "Failed to fetch container's all image finger prints")
- }
-
- for _, fingerprint := range fingerprints {
- if info.Fingerprint == fingerprint {
- continue
- }
- }
-
- opImgCreation, err := client.CreateImage(req, nil)
- if err != nil {
- return err
- }
-
- err = opImgCreation.Wait()
- if err != nil {
- return err
- }
- }
+ // Sync the images between each node in the cluster on demand
+ if err := imageSyncBetweenNodes(d, req, info.Fingerprint, imageUpload, post); err != nil {
+ return errors.Wrapf(err, "Image sync bwtween nodes")
}
// Set the metadata
@@ -1969,3 +1913,80 @@ func imageRefresh(d *Daemon, r *http.Request) Response {
return OperationResponse(op)
}
+
+func imageSyncBetweenNodes(d *Daemon, req api.ImagesPost, fingerprint string, imageUpload bool, file *os.File) error {
+ // Check how many nodes already have this image
+ syncNodeAddresses, err := d.cluster.ImageGetNodesHasImage(fingerprint)
+ if err != nil {
+ return errors.Wrap(err, "Failed to get nodes for the image synchronization")
+ }
+
+ var desiredSyncNodeCount int64
+ err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
+ config, err := cluster.ConfigLoad(tx)
+ if err != nil {
+ return errors.Wrap(err, "Failed to load cluster configuration")
+ }
+ desiredSyncNodeCount = config.ImageSyncNodeCount()
+ return nil
+ })
+
+ nodeCount := desiredSyncNodeCount - int64(len(syncNodeAddresses))
+ if nodeCount > 0 {
+ addresses, err := d.cluster.ImageGetNodesHasNoImage(fingerprint)
+ if err != nil {
+ return errors.Wrap(err, "Failed to get nodes for the image synchronization")
+ }
+
+ min := func(x, y int64) int64 {
+ if x > y {
+ return y
+ }
+ return x
+ }
+
+ for idx := int64(0); idx < min(int64(len(addresses)), nodeCount); idx++ {
+ client, err := cluster.Connect(addresses[idx], d.endpoints.NetworkCert(), false)
+ if err != nil {
+ return errors.Wrap(err, "Failed to connect node for image synchronization")
+ }
+
+ // We spread the image for the nodes inside of cluster and we need to double
+ // check if the image already exists since when one certain node is going to
+ // create an image it will goes into the same routine.
+ fingerprints, err := client.GetImageFingerprints()
+ if err != nil {
+ return errors.Wrap(err, "Failed to fetch container's all image finger prints")
+ }
+
+ for _, imgFingerprint := range fingerprints {
+ if fingerprint == imgFingerprint {
+ continue
+ }
+ }
+
+ createArgs := &lxd.ImageCreateArgs{}
+ image := api.ImagesPost{}
+ if imageUpload {
+ createArgs.MetaFile = file
+ createArgs.MetaName = filepath.Base(file.Name())
+ image.Filename = createArgs.MetaName
+ } else {
+ createArgs = nil
+ image = req
+ }
+
+ op, err := client.CreateImage(image, createArgs)
+ if err != nil {
+ return err
+ }
+
+ err = op.Wait()
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
More information about the lxc-devel
mailing list