[lxc-devel] [lxd/master] Image sync

adglkh on Github lxc-bot at linuxcontainers.org
Mon Oct 29 07:27:40 UTC 2018


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 386 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20181029/70e39ab5/attachment.bin>
-------------- next part --------------
From c89e57ddbb8bb119ae81a9089ed3b9ebf941cd5c Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 21 Sep 2018 15:36:54 +0800
Subject: [PATCH 1/7] Add a new config item to specify the numbers of nodes for
 image synchronization.

Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
 lxd/cluster/config.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lxd/cluster/config.go b/lxd/cluster/config.go
index a7930d71de..c16c9671ce 100644
--- a/lxd/cluster/config.go
+++ b/lxd/cluster/config.go
@@ -218,6 +218,7 @@ func configGet(cluster *db.Cluster) (*Config, error) {
 var ConfigSchema = config.Schema{
 	"backups.compression_algorithm":  {Default: "gzip", Validator: validateCompression},
 	"cluster.offline_threshold":      {Type: config.Int64, Default: offlineThresholdDefault(), Validator: offlineThresholdValidator},
+	"cluster.image_sync_nodes":       {Type: config.Int64, Default: "5"},
 	"core.https_allowed_headers":     {},
 	"core.https_allowed_methods":     {},
 	"core.https_allowed_origin":      {},

From dfafc12cd4da4962905d27c295742ca2745932b3 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 21 Sep 2018 15:56:18 +0800
Subject: [PATCH 2/7] Add a func for cluster to check all online node addresses
 with the given image in db.

Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
 lxd/db/images.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 46 insertions(+)

diff --git a/lxd/db/images.go b/lxd/db/images.go
index 56bd993fde..eb4e04a74e 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -889,3 +889,49 @@ func (c *Cluster) ImageUploadedAt(id int, uploadedAt time.Time) error {
 	err := exec(c.db, "UPDATE images SET upload_date=? WHERE id=?", uploadedAt, id)
 	return err
 }
+
+// ImageGetNodesWithImage returns the list of the address of online nodes
+// which already have the image.
+// Note: the local address is not included in the returned address list.
+func (c *Cluster) ImageGetNodesWithImage(fingerprint string) ([]string, error) {
+	stmt := `
+	SELECT nodes.address FROM nodes
+	  LEFT JOIN images_nodes ON images_nodes.node_id = nodes.id
+	  LEFT JOIN images ON images_nodes.image_id = images.id
+	WHERE images.fingerprint = ?
+	`
+	var localAddress string // Address of this node
+	var addresses []string  // Addresses of online nodes with the image
+
+	err := c.Transaction(func(tx *ClusterTx) error {
+		offlineThreshold, err := tx.NodeOfflineThreshold()
+		if err != nil {
+			return err
+		}
+
+		localAddress, err = tx.NodeAddress()
+		if err != nil {
+			return err
+		}
+		allAddresses, err := query.SelectStrings(tx.tx, stmt, fingerprint)
+		if err != nil {
+			return err
+		}
+		for _, address := range allAddresses {
+			node, err := tx.NodeByAddress(address)
+			if err != nil {
+				return err
+			}
+			if address == localAddress || node.IsOffline(offlineThreshold) {
+				continue
+			}
+			addresses = append(addresses, address)
+		}
+		return err
+	})
+	if err != nil {
+		return "", err
+	}
+
+	return addresses, nil
+}

From c76e7dd22f155200390165036f6530a4efda1b55 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 21 Sep 2018 18:34:20 +0800
Subject: [PATCH 3/7] Choose the proper nodes for image synchronization.

Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
 lxd/cluster/config.go | 11 +++++++-
 lxd/db/db.go          |  1 -
 lxd/db/images.go      |  4 +--
 lxd/images.go         | 66 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 78 insertions(+), 4 deletions(-)

diff --git a/lxd/cluster/config.go b/lxd/cluster/config.go
index c16c9671ce..036844eb75 100644
--- a/lxd/cluster/config.go
+++ b/lxd/cluster/config.go
@@ -127,6 +127,11 @@ func (c *Config) OfflineThreshold() time.Duration {
 	return time.Duration(n) * time.Second
 }
 
+// ImageSyncNodes returns the numbers of nodes for images synchronization inside of cluster.
+func (c *Config) ImageSyncNodes() int64 {
+	return c.m.GetInt64("cluster.image_sync_nodes")
+}
+
 // Dump current configuration keys and their values. Keys with values matching
 // their defaults are omitted.
 func (c *Config) Dump() map[string]interface{} {
@@ -218,7 +223,7 @@ func configGet(cluster *db.Cluster) (*Config, error) {
 var ConfigSchema = config.Schema{
 	"backups.compression_algorithm":  {Default: "gzip", Validator: validateCompression},
 	"cluster.offline_threshold":      {Type: config.Int64, Default: offlineThresholdDefault(), Validator: offlineThresholdValidator},
-	"cluster.image_sync_nodes":       {Type: config.Int64, Default: "5"},
+	"cluster.image_sync_nodes":       {Type: config.Int64, Default: imageSyncNodesDefault()},
 	"core.https_allowed_headers":     {},
 	"core.https_allowed_methods":     {},
 	"core.https_allowed_origin":      {},
@@ -253,6 +258,10 @@ func offlineThresholdDefault() string {
 	return strconv.Itoa(db.DefaultOfflineThreshold)
 }
 
+func imageSyncNodesDefault() int64 {
+	return DefaultImageSyncNodes
+}
+
 func offlineThresholdValidator(value string) error {
 	// Ensure that the given value is greater than the heartbeat interval,
 	// which is the lower bound granularity of the offline check.
diff --git a/lxd/db/db.go b/lxd/db/db.go
index 02ab33ff11..1b84267432 100644
--- a/lxd/db/db.go
+++ b/lxd/db/db.go
@@ -6,7 +6,6 @@ import (
 	"sync"
 	"time"
 
-	"github.com/CanonicalLtd/go-dqlite"
 	"github.com/pkg/errors"
 
 	"github.com/lxc/lxd/lxd/db/cluster"
diff --git a/lxd/db/images.go b/lxd/db/images.go
index eb4e04a74e..3bf4806031 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -890,10 +890,10 @@ func (c *Cluster) ImageUploadedAt(id int, uploadedAt time.Time) error {
 	return err
 }
 
-// ImageGetNodesWithImage returns the list of the address of online nodes
+// ImageGetNodesHasImage returns the list of the address of online nodes
 // which already have the image.
 // Note: the local address is not included in the returned address list.
-func (c *Cluster) ImageGetNodesWithImage(fingerprint string) ([]string, error) {
+func (c *Cluster) ImageGetNodesHasImage(fingerprint string) ([]string, error) {
 	stmt := `
 	SELECT nodes.address FROM nodes
 	  LEFT JOIN images_nodes ON images_nodes.node_id = nodes.id
diff --git a/lxd/images.go b/lxd/images.go
index 7dd70da63f..b907944ae5 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -712,6 +712,30 @@ func imagesPost(d *Daemon, r *http.Request) Response {
 			return err
 		}
 
+		// Check how many nodes already have this image
+		nodeAddresses := d.cluster.ImageGetNodesHasImage(info.Fingerprint)
+		var syncNodes int64
+		err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
+			config, err := cluster.ConfigLoad(tx)
+			if err != nil {
+				return errors.Wrap(err, "Failed to load cluster configuration")
+			}
+			syncNodes = config.ImageSyncNodes()
+		})
+		if len(nodeAddresses) < syncNodes {
+			nodesInfo, err := chooseNodesForImageSync(nodeAddresses);
+			if err != nil {
+				return errors.Wrap(err, "Failed to choose node for image synchronization")
+			}
+			for _, node := range nodesInfo {
+				client, err := cluster.Connect(node.Address, d.endpoints.NetworkCert(), false)
+				if err != nil {
+					return errors.Wrap(err, "Failed to connect node for image synchronization")
+				}
+				client.CreateImage()
+			}
+		}
+
 		// Apply any provided alias
 		for _, alias := range req.Aliases {
 			_, _, err := d.cluster.ImageAliasGet(project, alias.Name, true)
@@ -1909,3 +1933,45 @@ func imageRefresh(d *Daemon, r *http.Request) Response {
 	return OperationResponse(op)
 
 }
+
+func chooseNodesForImageSync(hasImageNodes []NodeInfo) ([]NodeInfo, error) {
+	nodesInfo, err = d.cluster.Transaction(func(tx *ClusterTx) ([]NodeInfo, error) {
+		nodes, err := tx.Nodes()
+		if err != nil {
+			return errors.Wrap(err, "Failed to fetch nodes")
+		}
+		if len(nodes) == 1 && nodes[0].Address == "0.0.0.0" {
+			// We're not clustered
+			return []NodeInfo{}, nil
+		}
+
+		return nodes
+	})
+
+	if err != nil {
+		return errors.Wrap(err, "Failed to fetch nodes")
+	}
+
+	// Only sync the image with the nodes which don't have the image stored before
+	var nodesForImageSync NodeInfo[]
+	for _, node := range nodesInfo {
+		stored := false
+		for _, imageNode := range hasImageNodes {
+			if imageNode.Address == node.Address {
+				stored = true
+			}
+		}
+
+		if !stored {
+			nodesForImageSync := append(nodesForImageSync, node)
+		}
+	}
+
+	// Sort the nodes according to the hearbeat
+	// The more responsive the node is, the higher priority the node comes
+	sort.Slice(nodesForImageSync, func(i, j int) bool {
+		return nodesForImageSync[i].Heartbeat > nodesInfo[j].Heartbeat
+	})
+
+	return nodesForImageSync, err
+}

From 279516f5db99aae231e113f9046690bdb5c2beb1 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 21 Sep 2018 20:13:23 +0800
Subject: [PATCH 4/7] Add the validator for cluster.image_sync_node_count and
 store the value in db.

Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
 lxd/cluster/config.go | 23 +++++++++++++++++------
 lxd/db/images.go      | 22 ++++++++++++++++++++++
 lxd/images.go         |  3 ++-
 3 files changed, 41 insertions(+), 7 deletions(-)

diff --git a/lxd/cluster/config.go b/lxd/cluster/config.go
index 036844eb75..72ae39130d 100644
--- a/lxd/cluster/config.go
+++ b/lxd/cluster/config.go
@@ -127,9 +127,9 @@ func (c *Config) OfflineThreshold() time.Duration {
 	return time.Duration(n) * time.Second
 }
 
-// ImageSyncNodes returns the numbers of nodes for images synchronization inside of cluster.
-func (c *Config) ImageSyncNodes() int64 {
-	return c.m.GetInt64("cluster.image_sync_nodes")
+// ImageSyncNodeCount returns the numbers of nodes for images synchronization inside of cluster.
+func (c *Config) ImageSyncNodeCount() int64 {
+	return c.m.GetInt64("cluster.image_sync_node_count")
 }
 
 // Dump current configuration keys and their values. Keys with values matching
@@ -223,7 +223,7 @@ func configGet(cluster *db.Cluster) (*Config, error) {
 var ConfigSchema = config.Schema{
 	"backups.compression_algorithm":  {Default: "gzip", Validator: validateCompression},
 	"cluster.offline_threshold":      {Type: config.Int64, Default: offlineThresholdDefault(), Validator: offlineThresholdValidator},
-	"cluster.image_sync_nodes":       {Type: config.Int64, Default: imageSyncNodesDefault()},
+	"cluster.image_sync_node_count":  {Type: config.Int64, Default: imageSyncNodeCountDefault(), Validator: imageSyncNodeCountValidator},
 	"core.https_allowed_headers":     {},
 	"core.https_allowed_methods":     {},
 	"core.https_allowed_origin":      {},
@@ -258,8 +258,8 @@ func offlineThresholdDefault() string {
 	return strconv.Itoa(db.DefaultOfflineThreshold)
 }
 
-func imageSyncNodesDefault() int64 {
-	return DefaultImageSyncNodes
+func imageSyncNodeCountDefault() string {
+	return strconv.Itoa(db.DefaultImageSyncNodeCount)
 }
 
 func offlineThresholdValidator(value string) error {
@@ -275,6 +275,17 @@ func offlineThresholdValidator(value string) error {
 	return nil
 }
 
+func imageSyncNodeCountValidator(value string) error {
+	imageSyncNodeCount, err := strconv.Atoi(value)
+	if err != nil {
+		return fmt.Errorf("image sync node count is not a number")
+	}
+	if imageSyncNodeCount < 0 {
+		return fmt.Errorf("value must be greater than or equal to 0")
+	}
+	return nil
+}
+
 func passwordSetter(value string) (string, error) {
 	// Nothing to do on unset
 	if value == "" {
diff --git a/lxd/db/images.go b/lxd/db/images.go
index 3bf4806031..6277939367 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -3,6 +3,7 @@ package db
 import (
 	"database/sql"
 	"fmt"
+	"strconv"
 	"time"
 
 	"github.com/lxc/lxd/lxd/db/query"
@@ -890,6 +891,23 @@ func (c *Cluster) ImageUploadedAt(id int, uploadedAt time.Time) error {
 	return err
 }
 
+// ImageSyncNodeCount returns the number of nodes that needs to used for image synchronization.
+func (c *Cluster) ImageSyncNodeCount() error {
+	nodeCount := DefaultImageSyncNodeCount
+	values, err := query.SelectStrings(
+		c.tx, "SELECT value FROM config WHERE key='cluster.image_sync_node_count'")
+	if err != nil {
+		return 0, err
+	}
+	if len(values) > 0 {
+		nodeCount, err := strconv.Atoi(values[0])
+		if err != nil {
+			return 0, err
+		}
+	}
+	return nodeCount, nil
+}
+
 // ImageGetNodesHasImage returns the list of the address of online nodes
 // which already have the image.
 // Note: the local address is not included in the returned address list.
@@ -935,3 +953,7 @@ func (c *Cluster) ImageGetNodesHasImage(fingerprint string) ([]string, error) {
 
 	return addresses, nil
 }
+
+// DefaultImageSyncNodeCount is the default value for the numbers of nodes
+// for image synchronization
+const DefaultImageSyncNodeCount = 3
diff --git a/lxd/images.go b/lxd/images.go
index b907944ae5..a8e297d481 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -720,7 +720,8 @@ func imagesPost(d *Daemon, r *http.Request) Response {
 			if err != nil {
 				return errors.Wrap(err, "Failed to load cluster configuration")
 			}
-			syncNodes = config.ImageSyncNodes()
+			// TODO: which one should I use for the offline storage of image sync node count
+			syncNodes = tx.ImageSyncNodeCount()
 		})
 		if len(nodeAddresses) < syncNodes {
 			nodesInfo, err := chooseNodesForImageSync(nodeAddresses);

From e350f6e3a2bdc28c88de6b97c2d1fe73405fefbb Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 21 Sep 2018 21:16:32 +0800
Subject: [PATCH 5/7] Get the nodes which don't store the image for the
 synchronization.

Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
 lxd/db/images.go |  45 +++++++++-----------
 lxd/images.go    | 108 ++++++++++++++++++-----------------------------
 2 files changed, 61 insertions(+), 92 deletions(-)

diff --git a/lxd/db/images.go b/lxd/db/images.go
index 6277939367..4a8b866e4b 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -3,7 +3,6 @@ package db
 import (
 	"database/sql"
 	"fmt"
-	"strconv"
 	"time"
 
 	"github.com/lxc/lxd/lxd/db/query"
@@ -891,33 +890,34 @@ func (c *Cluster) ImageUploadedAt(id int, uploadedAt time.Time) error {
 	return err
 }
 
-// ImageSyncNodeCount returns the number of nodes that needs to used for image synchronization.
-func (c *Cluster) ImageSyncNodeCount() error {
-	nodeCount := DefaultImageSyncNodeCount
-	values, err := query.SelectStrings(
-		c.tx, "SELECT value FROM config WHERE key='cluster.image_sync_node_count'")
-	if err != nil {
-		return 0, err
-	}
-	if len(values) > 0 {
-		nodeCount, err := strconv.Atoi(values[0])
-		if err != nil {
-			return 0, err
-		}
-	}
-	return nodeCount, nil
-}
-
 // ImageGetNodesHasImage returns the list of the address of online nodes
 // which already have the image.
 // Note: the local address is not included in the returned address list.
 func (c *Cluster) ImageGetNodesHasImage(fingerprint string) ([]string, error) {
-	stmt := `
+	return c.getNodesByImageFingerprint(fingerprint, true)
+}
+
+// ImageGetNodesHasNoImage returns the list of the address of online nodes
+// which don't have the image.
+// Note: the local address is not included in the returned address list.
+func (c *Cluster) ImageGetNodesHasNoImage(fingerprint string) ([]string, error) {
+	return c.getNodesByImageFingerprint(fingerprint, false)
+}
+
+func (c *Cluster) getNodesByImageFingerprint(fingerprint string, bool hasImage) ([]string, error) {
+	q := `
 	SELECT nodes.address FROM nodes
 	  LEFT JOIN images_nodes ON images_nodes.node_id = nodes.id
-	  LEFT JOIN images ON images_nodes.image_id = images.id
+	  LEFT JOIN images ON images_nodes.image_id %s images.id
 	WHERE images.fingerprint = ?
 	`
+	var stmt string
+	if hasImage {
+		stmt = fmt.Sprintf(q, "=")
+	} else {
+		stmt = fmt.Sprintf(q, "!=")
+	}
+
 	var localAddress string // Address of this node
 	var addresses []string  // Addresses of online nodes with the image
 
@@ -947,11 +947,8 @@ func (c *Cluster) ImageGetNodesHasImage(fingerprint string) ([]string, error) {
 		}
 		return err
 	})
-	if err != nil {
-		return "", err
-	}
 
-	return addresses, nil
+	return addresses, err
 }
 
 // DefaultImageSyncNodeCount is the default value for the numbers of nodes
diff --git a/lxd/images.go b/lxd/images.go
index a8e297d481..c571109e91 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -712,31 +712,6 @@ func imagesPost(d *Daemon, r *http.Request) Response {
 			return err
 		}
 
-		// Check how many nodes already have this image
-		nodeAddresses := d.cluster.ImageGetNodesHasImage(info.Fingerprint)
-		var syncNodes int64
-		err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
-			config, err := cluster.ConfigLoad(tx)
-			if err != nil {
-				return errors.Wrap(err, "Failed to load cluster configuration")
-			}
-			// TODO: which one should I use for the offline storage of image sync node count
-			syncNodes = tx.ImageSyncNodeCount()
-		})
-		if len(nodeAddresses) < syncNodes {
-			nodesInfo, err := chooseNodesForImageSync(nodeAddresses);
-			if err != nil {
-				return errors.Wrap(err, "Failed to choose node for image synchronization")
-			}
-			for _, node := range nodesInfo {
-				client, err := cluster.Connect(node.Address, d.endpoints.NetworkCert(), false)
-				if err != nil {
-					return errors.Wrap(err, "Failed to connect node for image synchronization")
-				}
-				client.CreateImage()
-			}
-		}
-
 		// Apply any provided alias
 		for _, alias := range req.Aliases {
 			_, _, err := d.cluster.ImageAliasGet(project, alias.Name, true)
@@ -764,6 +739,46 @@ func imagesPost(d *Daemon, r *http.Request) Response {
 		metadata["fingerprint"] = info.Fingerprint
 		metadata["size"] = strconv.FormatInt(info.Size, 10)
 		op.UpdateMetadata(metadata)
+
+		// Check how many nodes already have this image
+		nodeAddresses, err := d.cluster.ImageGetNodesHasImage(info.Fingerprint)
+		if err != nil {
+			return errors.Wrap(err, "Failed to get nodes for the image synchronization")
+		}
+
+		var syncNodeCount int64
+		err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
+			config, err := cluster.ConfigLoad(tx)
+			if err != nil {
+				return errors.Wrap(err, "Failed to load cluster configuration")
+			}
+			syncNodeCount = config.ImageSyncNodeCount()
+		})
+		if len(nodeAddresses) < syncNodeCount {
+			nodeAddresses, err := d.cluster.ImageGetNodesHasNoImage()
+			if err != nil {
+				return errors.Wrap(err, "Failed to get nodes for the image synchronization")
+			}
+
+			for _, address := range nodeAddresses {
+				client, err := cluster.Connect(address, d.endpoints.NetworkCert(), false)
+				if err != nil {
+					return errors.Wrap(err, "Failed to connect node for image synchronization")
+				}
+				op, err := client.CreateImage(req, nil)
+				if err != nil {
+					return err
+				}
+
+				err = op.Wait()
+				if err != nil {
+					return err
+				}
+				// opAPI := op.Get()
+				// Check the publish.go line:216
+			}
+		}
+
 		return nil
 	}
 
@@ -1932,47 +1947,4 @@ func imageRefresh(d *Daemon, r *http.Request) Response {
 	}
 
 	return OperationResponse(op)
-
-}
-
-func chooseNodesForImageSync(hasImageNodes []NodeInfo) ([]NodeInfo, error) {
-	nodesInfo, err = d.cluster.Transaction(func(tx *ClusterTx) ([]NodeInfo, error) {
-		nodes, err := tx.Nodes()
-		if err != nil {
-			return errors.Wrap(err, "Failed to fetch nodes")
-		}
-		if len(nodes) == 1 && nodes[0].Address == "0.0.0.0" {
-			// We're not clustered
-			return []NodeInfo{}, nil
-		}
-
-		return nodes
-	})
-
-	if err != nil {
-		return errors.Wrap(err, "Failed to fetch nodes")
-	}
-
-	// Only sync the image with the nodes which don't have the image stored before
-	var nodesForImageSync NodeInfo[]
-	for _, node := range nodesInfo {
-		stored := false
-		for _, imageNode := range hasImageNodes {
-			if imageNode.Address == node.Address {
-				stored = true
-			}
-		}
-
-		if !stored {
-			nodesForImageSync := append(nodesForImageSync, node)
-		}
-	}
-
-	// Sort the nodes according to the hearbeat
-	// The more responsive the node is, the higher priority the node comes
-	sort.Slice(nodesForImageSync, func(i, j int) bool {
-		return nodesForImageSync[i].Heartbeat > nodesInfo[j].Heartbeat
-	})
-
-	return nodesForImageSync, err
 }

From 1fe28460f9525b7678b4664cd0fec68b0c8afd93 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Sat, 22 Sep 2018 22:59:03 +0800
Subject: [PATCH 6/7] Do not create an image once the image already exists on
 the container.

Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
 lxd/db/db.go     |  1 +
 lxd/db/images.go | 13 +++++------
 lxd/images.go    | 57 +++++++++++++++++++++++++++++++++---------------
 3 files changed, 46 insertions(+), 25 deletions(-)

diff --git a/lxd/db/db.go b/lxd/db/db.go
index 1b84267432..02ab33ff11 100644
--- a/lxd/db/db.go
+++ b/lxd/db/db.go
@@ -6,6 +6,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/CanonicalLtd/go-dqlite"
 	"github.com/pkg/errors"
 
 	"github.com/lxc/lxd/lxd/db/cluster"
diff --git a/lxd/db/images.go b/lxd/db/images.go
index 4a8b866e4b..e7295050fc 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -904,13 +904,13 @@ func (c *Cluster) ImageGetNodesHasNoImage(fingerprint string) ([]string, error)
 	return c.getNodesByImageFingerprint(fingerprint, false)
 }
 
-func (c *Cluster) getNodesByImageFingerprint(fingerprint string, bool hasImage) ([]string, error) {
+func (c *Cluster) getNodesByImageFingerprint(fingerprint string, hasImage bool) ([]string, error) {
 	q := `
-	SELECT nodes.address FROM nodes
-	  LEFT JOIN images_nodes ON images_nodes.node_id = nodes.id
-	  LEFT JOIN images ON images_nodes.image_id %s images.id
-	WHERE images.fingerprint = ?
-	`
+SELECT nodes.address FROM nodes
+  LEFT JOIN images_nodes ON images_nodes.node_id = nodes.id
+  LEFT JOIN images ON images_nodes.image_id %s images.id
+WHERE images.fingerprint = ?
+`
 	var stmt string
 	if hasImage {
 		stmt = fmt.Sprintf(q, "=")
@@ -947,7 +947,6 @@ func (c *Cluster) getNodesByImageFingerprint(fingerprint string, bool hasImage)
 		}
 		return err
 	})
-
 	return addresses, err
 }
 
diff --git a/lxd/images.go b/lxd/images.go
index c571109e91..d13ae5732c 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -734,51 +734,72 @@ func imagesPost(d *Daemon, r *http.Request) Response {
 			}
 		}
 
-		// Set the metadata
-		metadata := make(map[string]string)
-		metadata["fingerprint"] = info.Fingerprint
-		metadata["size"] = strconv.FormatInt(info.Size, 10)
-		op.UpdateMetadata(metadata)
-
 		// Check how many nodes already have this image
-		nodeAddresses, err := d.cluster.ImageGetNodesHasImage(info.Fingerprint)
+		syncNodeAddresses, err := d.cluster.ImageGetNodesHasImage(info.Fingerprint)
 		if err != nil {
 			return errors.Wrap(err, "Failed to get nodes for the image synchronization")
 		}
 
-		var syncNodeCount int64
-		err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
+		var desiredSyncNodeCount int64
+		err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
 			config, err := cluster.ConfigLoad(tx)
 			if err != nil {
 				return errors.Wrap(err, "Failed to load cluster configuration")
 			}
-			syncNodeCount = config.ImageSyncNodeCount()
+			desiredSyncNodeCount = config.ImageSyncNodeCount()
+			return nil
 		})
-		if len(nodeAddresses) < syncNodeCount {
-			nodeAddresses, err := d.cluster.ImageGetNodesHasNoImage()
+		nodeCount := desiredSyncNodeCount - int64(len(syncNodeAddresses))
+		if nodeCount > 0 {
+			addresses, err := d.cluster.ImageGetNodesHasNoImage(info.Fingerprint)
 			if err != nil {
 				return errors.Wrap(err, "Failed to get nodes for the image synchronization")
 			}
 
-			for _, address := range nodeAddresses {
-				client, err := cluster.Connect(address, d.endpoints.NetworkCert(), false)
+			min := func(x, y int64) int64 {
+				if x > y {
+					return y
+				}
+				return x
+			}
+
+			for idx := int64(0); idx < min(int64(len(addresses)), nodeCount); idx++ {
+				client, err := cluster.Connect(addresses[idx], d.endpoints.NetworkCert(), false)
 				if err != nil {
 					return errors.Wrap(err, "Failed to connect node for image synchronization")
 				}
-				op, err := client.CreateImage(req, nil)
+
+				// We spread the image for the nodes inside of cluster and we need to double
+				// check if the image already exists since when one certain node is going to
+				// create an image it will goes into the same routine.
+				fingerprints, err := client.GetImageFingerprints()
+				if err != nil {
+					return errors.Wrap(err, "Failed to fetch container's all image finger prints")
+				}
+
+				for _, fingerprint := range fingerprints {
+					if info.Fingerprint == fingerprint {
+						continue
+					}
+				}
+
+				opImgCreation, err := client.CreateImage(req, nil)
 				if err != nil {
 					return err
 				}
 
-				err = op.Wait()
+				err = opImgCreation.Wait()
 				if err != nil {
 					return err
 				}
-				// opAPI := op.Get()
-				// Check the publish.go line:216
 			}
 		}
 
+		// Set the metadata
+		metadata := make(map[string]string)
+		metadata["fingerprint"] = info.Fingerprint
+		metadata["size"] = strconv.FormatInt(info.Size, 10)
+		op.UpdateMetadata(metadata)
 		return nil
 	}
 

From 16e23c30e682f1de9966bc90d9338ad07853ff45 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Mon, 29 Oct 2018 14:52:42 +0800
Subject: [PATCH 7/7] Compose ImageCreateArgs and ImagesPost structure before
 creating an image on each node.

Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
 lxd/images.go | 139 +++++++++++++++++++++++++++++---------------------
 1 file changed, 80 insertions(+), 59 deletions(-)

diff --git a/lxd/images.go b/lxd/images.go
index d13ae5732c..d7ec00acdc 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -734,65 +734,9 @@ func imagesPost(d *Daemon, r *http.Request) Response {
 			}
 		}
 
-		// Check how many nodes already have this image
-		syncNodeAddresses, err := d.cluster.ImageGetNodesHasImage(info.Fingerprint)
-		if err != nil {
-			return errors.Wrap(err, "Failed to get nodes for the image synchronization")
-		}
-
-		var desiredSyncNodeCount int64
-		err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
-			config, err := cluster.ConfigLoad(tx)
-			if err != nil {
-				return errors.Wrap(err, "Failed to load cluster configuration")
-			}
-			desiredSyncNodeCount = config.ImageSyncNodeCount()
-			return nil
-		})
-		nodeCount := desiredSyncNodeCount - int64(len(syncNodeAddresses))
-		if nodeCount > 0 {
-			addresses, err := d.cluster.ImageGetNodesHasNoImage(info.Fingerprint)
-			if err != nil {
-				return errors.Wrap(err, "Failed to get nodes for the image synchronization")
-			}
-
-			min := func(x, y int64) int64 {
-				if x > y {
-					return y
-				}
-				return x
-			}
-
-			for idx := int64(0); idx < min(int64(len(addresses)), nodeCount); idx++ {
-				client, err := cluster.Connect(addresses[idx], d.endpoints.NetworkCert(), false)
-				if err != nil {
-					return errors.Wrap(err, "Failed to connect node for image synchronization")
-				}
-
-				// We spread the image for the nodes inside of cluster and we need to double
-				// check if the image already exists since when one certain node is going to
-				// create an image it will goes into the same routine.
-				fingerprints, err := client.GetImageFingerprints()
-				if err != nil {
-					return errors.Wrap(err, "Failed to fetch container's all image finger prints")
-				}
-
-				for _, fingerprint := range fingerprints {
-					if info.Fingerprint == fingerprint {
-						continue
-					}
-				}
-
-				opImgCreation, err := client.CreateImage(req, nil)
-				if err != nil {
-					return err
-				}
-
-				err = opImgCreation.Wait()
-				if err != nil {
-					return err
-				}
-			}
+		// Sync the images between each node in the cluster on demand
+		if err := imageSyncBetweenNodes(d, req, info.Fingerprint, imageUpload, post); err != nil {
+			return errors.Wrapf(err, "Image sync bwtween nodes")
 		}
 
 		// Set the metadata
@@ -1969,3 +1913,80 @@ func imageRefresh(d *Daemon, r *http.Request) Response {
 
 	return OperationResponse(op)
 }
+
+func imageSyncBetweenNodes(d *Daemon, req api.ImagesPost, fingerprint string, imageUpload bool, file *os.File) error {
+	// Check how many nodes already have this image
+	syncNodeAddresses, err := d.cluster.ImageGetNodesHasImage(fingerprint)
+	if err != nil {
+		return errors.Wrap(err, "Failed to get nodes for the image synchronization")
+	}
+
+	var desiredSyncNodeCount int64
+	err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
+		config, err := cluster.ConfigLoad(tx)
+		if err != nil {
+			return errors.Wrap(err, "Failed to load cluster configuration")
+		}
+		desiredSyncNodeCount = config.ImageSyncNodeCount()
+		return nil
+	})
+
+	nodeCount := desiredSyncNodeCount - int64(len(syncNodeAddresses))
+	if nodeCount > 0 {
+		addresses, err := d.cluster.ImageGetNodesHasNoImage(fingerprint)
+		if err != nil {
+			return errors.Wrap(err, "Failed to get nodes for the image synchronization")
+		}
+
+		min := func(x, y int64) int64 {
+			if x > y {
+				return y
+			}
+			return x
+		}
+
+		for idx := int64(0); idx < min(int64(len(addresses)), nodeCount); idx++ {
+			client, err := cluster.Connect(addresses[idx], d.endpoints.NetworkCert(), false)
+			if err != nil {
+				return errors.Wrap(err, "Failed to connect node for image synchronization")
+			}
+
+			// We spread the image for the nodes inside of cluster and we need to double
+			// check if the image already exists since when one certain node is going to
+			// create an image it will goes into the same routine.
+			fingerprints, err := client.GetImageFingerprints()
+			if err != nil {
+				return errors.Wrap(err, "Failed to fetch container's all image finger prints")
+			}
+
+			for _, imgFingerprint := range fingerprints {
+				if fingerprint == imgFingerprint {
+					continue
+				}
+			}
+
+			createArgs := &lxd.ImageCreateArgs{}
+			image := api.ImagesPost{}
+			if imageUpload {
+				createArgs.MetaFile = file
+				createArgs.MetaName = filepath.Base(file.Name())
+				image.Filename = createArgs.MetaName
+			} else {
+				createArgs = nil
+				image = req
+			}
+
+			op, err := client.CreateImage(image, createArgs)
+			if err != nil {
+				return err
+			}
+
+			err = op.Wait()
+			if err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}


More information about the lxc-devel mailing list