[lxc-devel] [lxd/master] Image sync for joined node
adglkh on Github
lxc-bot at linuxcontainers.org
Mon Feb 18 12:47:17 UTC 2019
A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 582 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20190218/18b130b4/attachment.bin>
-------------- next part --------------
From e753054bfef8620af8c784657bdff290eb6abbcf Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 15 Feb 2019 10:39:29 +0800
Subject: [PATCH 1/8] Do not iterate all available nodes across the cluster for
image synchronization.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/images.go | 97 ++++++++++++++++++++++++---------------------------
1 file changed, 45 insertions(+), 52 deletions(-)
diff --git a/lxd/images.go b/lxd/images.go
index 062c04ca59..203862f360 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -776,7 +776,7 @@ func imagesPost(d *Daemon, r *http.Request) Response {
}
// Sync the images between each node in the cluster on demand
- err = imageSyncBetweenNodes(d, req, info.Fingerprint)
+ err = imageSyncBetweenNodes(d, info.Fingerprint)
if err != nil {
return errors.Wrapf(err, "Image sync between nodes")
}
@@ -1984,7 +1984,7 @@ func imageRefresh(d *Daemon, r *http.Request) Response {
return OperationResponse(op)
}
-func imageSyncBetweenNodes(d *Daemon, req api.ImagesPost, fingerprint string) error {
+func imageSyncBetweenNodes(d *Daemon, fingerprint string) error {
var desiredSyncNodeCount int64
err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
@@ -2025,70 +2025,63 @@ func imageSyncBetweenNodes(d *Daemon, req api.ImagesPost, fingerprint string) er
if err != nil {
return errors.Wrap(err, "Failed to get nodes for the image synchronization")
}
+ if len(addresses) <= 0 {
+ return nil
+ }
- min := func(x, y int64) int64 {
- if x > y {
- return y
- }
+ // We spread the image for the nodes inside of cluster and we need to double
+ // check if the image already exists via DB since when one certain node is
+ // going to create an image it will invoke the same routine. Hence we only
+ // take the first node here as the target node for the image synchronization.
+ // In case the operation fails, the daily image synchronization task will check
+ // whether an image was synced successfully across the cluster and perform the
+ // same job if not.
+ targetNodeAddress := addresses[0]
+ syncNodeAddresses, err = d.cluster.ImageGetNodesWithImage(fingerprint)
+ if err != nil {
+ return errors.Wrap(err, "Failed to get nodes for the image synchronization")
+ }
- return x
+ if shared.StringInSlice(targetNodeAddress, syncNodeAddresses) {
+ return nil
}
- for idx := int64(0); idx < min(int64(len(addresses)), nodeCount); idx++ {
- // We spread the image for the nodes inside of cluster and we need to double
- // check if the image already exists via DB since when one certain node is
- // going to create an image it will invoke the same routine.
- syncNodeAddresses, err = d.cluster.ImageGetNodesWithImage(fingerprint)
- if err != nil {
- return errors.Wrap(err, "Failed to get nodes for the image synchronization")
- }
+ client, err := cluster.Connect(targetNodeAddress, d.endpoints.NetworkCert(), true)
+ if err != nil {
+ return errors.Wrap(err, "Failed to connect node for image synchronization")
+ }
- if shared.StringInSlice(addresses[idx], syncNodeAddresses) {
- continue
- }
+ createArgs := &lxd.ImageCreateArgs{}
+ imageMetaPath := shared.VarPath("images", fingerprint)
+ imageRootfsPath := shared.VarPath("images", fingerprint+".rootfs")
- client, err := cluster.Connect(addresses[idx], d.endpoints.NetworkCert(), true)
- if err != nil {
- return errors.Wrap(err, "Failed to connect node for image synchronization")
- }
+ metaFile, err := os.Open(imageMetaPath)
+ if err != nil {
+ return err
+ }
+ defer metaFile.Close()
- createArgs := &lxd.ImageCreateArgs{}
- imageMetaPath := shared.VarPath("images", fingerprint)
- imageRootfsPath := shared.VarPath("images", fingerprint+".rootfs")
+ createArgs.MetaFile = metaFile
+ createArgs.MetaName = filepath.Base(imageMetaPath)
- metaFile, err := os.Open(imageMetaPath)
+ if shared.PathExists(imageRootfsPath) {
+ rootfsFile, err := os.Open(imageRootfsPath)
if err != nil {
return err
}
- defer metaFile.Close()
-
- createArgs.MetaFile = metaFile
- createArgs.MetaName = filepath.Base(imageMetaPath)
+ defer rootfsFile.Close()
- if shared.PathExists(imageRootfsPath) {
- rootfsFile, err := os.Open(imageRootfsPath)
- if err != nil {
- return err
- }
- defer rootfsFile.Close()
-
- createArgs.RootfsFile = rootfsFile
- createArgs.RootfsName = filepath.Base(imageRootfsPath)
- }
-
- image := api.ImagesPost{}
- image.Filename = createArgs.MetaName
+ createArgs.RootfsFile = rootfsFile
+ createArgs.RootfsName = filepath.Base(imageRootfsPath)
+ }
- op, err := client.CreateImage(image, createArgs)
- if err != nil {
- return err
- }
+ image := api.ImagesPost{}
+ image.Filename = createArgs.MetaName
- err = op.Wait()
- if err != nil {
- return err
- }
+ op, err := client.CreateImage(image, createArgs)
+ if err != nil {
+ return err
}
- return nil
+ return op.Wait()
}
From 0ad338d182fa03674b16f398a1a8c049cfff0040 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 15 Feb 2019 12:17:08 +0800
Subject: [PATCH 2/8] Use proper the function names for the query of the image
nodes.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/db/images.go | 8 ++++----
lxd/images.go | 6 +++---
2 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/lxd/db/images.go b/lxd/db/images.go
index b6ec9a3db1..ea7ee4eabf 100644
--- a/lxd/db/images.go
+++ b/lxd/db/images.go
@@ -890,8 +890,8 @@ func (c *Cluster) ImageUploadedAt(id int, uploadedAt time.Time) error {
return err
}
-// ImageGetNodesWithImage returns the addresses of online nodes which already have the image.
-func (c *Cluster) ImageGetNodesWithImage(fingerprint string) ([]string, error) {
+// ImageGetNodesHaveImage returns the addresses of online nodes which already have the image.
+func (c *Cluster) ImageGetNodesHaveImage(fingerprint string) ([]string, error) {
q := `
SELECT DISTINCT nodes.address FROM nodes
LEFT JOIN images_nodes ON images_nodes.node_id = nodes.id
@@ -901,8 +901,8 @@ WHERE images.fingerprint = ?
return c.getNodesByImageFingerprint(q, fingerprint)
}
-// ImageGetNodesHasNoImage returns the addresses of online nodes which don't have the image.
-func (c *Cluster) ImageGetNodesHasNoImage(fingerprint string) ([]string, error) {
+// ImageGetNodesHaveNoImage returns the addresses of online nodes which don't have the image.
+func (c *Cluster) ImageGetNodesHaveNoImage(fingerprint string) ([]string, error) {
q := `
SELECT DISTINCT nodes.address FROM nodes WHERE nodes.address NOT IN (
SELECT DISTINCT nodes.address FROM nodes
diff --git a/lxd/images.go b/lxd/images.go
index 203862f360..cdfba9efaa 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -2011,7 +2011,7 @@ func imageSyncBetweenNodes(d *Daemon, fingerprint string) error {
}
// Check how many nodes already have this image
- syncNodeAddresses, err := d.cluster.ImageGetNodesWithImage(fingerprint)
+ syncNodeAddresses, err := d.cluster.ImageGetNodesHaveImage(fingerprint)
if err != nil {
return errors.Wrap(err, "Failed to get nodes for the image synchronization")
}
@@ -2021,7 +2021,7 @@ func imageSyncBetweenNodes(d *Daemon, fingerprint string) error {
return nil
}
- addresses, err := d.cluster.ImageGetNodesHasNoImage(fingerprint)
+ addresses, err := d.cluster.ImageGetNodesHaveNoImage(fingerprint)
if err != nil {
return errors.Wrap(err, "Failed to get nodes for the image synchronization")
}
@@ -2037,7 +2037,7 @@ func imageSyncBetweenNodes(d *Daemon, fingerprint string) error {
// whether an image was synced successfully across the cluster and perform the
// same job if not.
targetNodeAddress := addresses[0]
- syncNodeAddresses, err = d.cluster.ImageGetNodesWithImage(fingerprint)
+ syncNodeAddresses, err = d.cluster.ImageGetNodesHaveImage(fingerprint)
if err != nil {
return errors.Wrap(err, "Failed to get nodes for the image synchronization")
}
From f07a61d3a6b3492e17b4e0f1d14dd99b1ecfb221 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 15 Feb 2019 14:02:04 +0800
Subject: [PATCH 3/8] Fetch the images fingerprints of the current online node.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/db/node.go | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
diff --git a/lxd/db/node.go b/lxd/db/node.go
index c28c5ad199..0a7d44074e 100644
--- a/lxd/db/node.go
+++ b/lxd/db/node.go
@@ -504,6 +504,23 @@ func (c *ClusterTx) NodeUpdateVersion(id int64, version [2]int) error {
return nil
}
+// NodeGetImages returns all images that the current LXD node instance has.
+func (c *ClusterTx) NodeGetImages() ([]string, error) {
+ return c.NodeGetImagesByID(c.nodeID)
+}
+
+// NodeGetImagesByID returns all images that the LXD node instance has with the given node id.
+func (c *ClusterTx) NodeGetImagesByID(id int64) ([]string, error) {
+ stmt := `
+ SELECT DISTINCT images.fingerprint FROM images
+ LEFT JOIN images_nodes ON images.id = images_nodes.image_id
+ LEFT JOIN nodes ON images_nodes.node_id = nodes.id
+ WHERE nodes.id = ?
+ `
+
+ return query.SelectStrings(c.tx, stmt, id)
+}
+
func nodeIsOffline(threshold time.Duration, heartbeat time.Time) bool {
return heartbeat.Before(time.Now().Add(-threshold))
}
From 331348dd02f019b7ba4822452a0f2882d21f04c9 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 15 Feb 2019 16:36:27 +0800
Subject: [PATCH 4/8] Add a task that auto synchronize images across the
cluster and run it on the background.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/daemon.go | 3 +++
lxd/db/operations.go | 3 +++
lxd/images.go | 63 ++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 69 insertions(+)
diff --git a/lxd/daemon.go b/lxd/daemon.go
index 5ea6bd84cb..a244b0c113 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -824,6 +824,9 @@ func (d *Daemon) Ready() error {
// Remove expired container snapshots (minutely)
d.tasks.Add(pruneExpiredContainerSnapshotsTask(d))
+
+ // Auto-sync images across the cluster (daily)
+ d.tasks.Add(autoSyncImagesTask(d))
}
// Start all background tasks
diff --git a/lxd/db/operations.go b/lxd/db/operations.go
index 048b934514..c29eed3213 100644
--- a/lxd/db/operations.go
+++ b/lxd/db/operations.go
@@ -57,6 +57,7 @@ const (
OperationImagesExpire
OperationImagesPruneLeftover
OperationImagesUpdate
+ OperationImagesSynchronize
OperationLogsExpire
OperationInstanceTypesUpdate
OperationBackupsExpire
@@ -146,6 +147,8 @@ func (t OperationType) Description() string {
return "Pruning leftover image files"
case OperationImagesUpdate:
return "Updating images"
+ case OperationImagesSynchronize:
+ return "Synchronizing images"
case OperationLogsExpire:
return "Expiring log files"
case OperationInstanceTypesUpdate:
diff --git a/lxd/images.go b/lxd/images.go
index cdfba9efaa..2b00d6ccaf 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -1984,6 +1984,69 @@ func imageRefresh(d *Daemon, r *http.Request) Response {
return OperationResponse(op)
}
+func autoSyncImagesTask(d *Daemon) (task.Func, task.Schedule) {
+ f := func(ctx context.Context) {
+ logger.Infof("Synchronizing images across the cluster")
+ opRun := func(op *operation) error {
+ return autoSyncImages(ctx, d)
+ }
+
+ op, err := operationCreate(d.cluster, "", operationClassTask, db.OperationImagesSynchronize, nil, nil, opRun, nil, nil)
+ if err != nil {
+ logger.Error("Failed to start image synchronization operation", log.Ctx{"err": err})
+ return
+ }
+
+ _, err = op.Run()
+ if err != nil {
+ logger.Error("Failed to synchronize images", log.Ctx{"err": err})
+ return
+ }
+
+ logger.Infof("Done synchronizing images across the cluster")
+ }
+
+ return f, task.Daily()
+}
+
+func autoSyncImages(ctx context.Context, d *Daemon) error {
+ // Check how many images the current node owns and automatically sync all
+ // available images to other nodes which don't have yet.
+ // NOTE: we ignore errors and only log the message in this function in case an error occurs
+ var fingerprints []string
+ var err error
+ err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
+ fingerprints, err = tx.NodeGetImages()
+ if err != nil {
+ return err
+ }
+ return nil
+ })
+ if err != nil {
+ logger.Error("Failed to query image fingerprints of the node", log.Ctx{"err": err})
+ return nil
+ }
+
+ for _, fingerprint := range fingerprints {
+ ch := make(chan error)
+ go func() {
+ err := imageSyncBetweenNodes(d, fingerprint)
+ if err != nil {
+ logger.Error("Failed to synchronize images", log.Ctx{"err": err, "image_fingerprint": fingerprint})
+ }
+ ch <- nil
+ }()
+
+ select {
+ case <-ctx.Done():
+ return nil
+ case <-ch:
+ }
+ }
+
+ return nil
+}
+
func imageSyncBetweenNodes(d *Daemon, fingerprint string) error {
var desiredSyncNodeCount int64
From 5c3ba9194134838639cb265994dd21f188e847e1 Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 15 Feb 2019 19:30:19 +0800
Subject: [PATCH 5/8] Restrict only leader node can launch the task operation
for image synchronization.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/images.go | 26 ++++++++++++++++++--------
1 file changed, 18 insertions(+), 8 deletions(-)
diff --git a/lxd/images.go b/lxd/images.go
index 2b00d6ccaf..eaa40554dd 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -27,6 +27,7 @@ import (
lxd "github.com/lxc/lxd/client"
"github.com/lxc/lxd/lxd/cluster"
"github.com/lxc/lxd/lxd/db"
+ "github.com/lxc/lxd/lxd/node"
"github.com/lxc/lxd/lxd/state"
"github.com/lxc/lxd/lxd/task"
"github.com/lxc/lxd/lxd/util"
@@ -2010,21 +2011,30 @@ func autoSyncImagesTask(d *Daemon) (task.Func, task.Schedule) {
}
func autoSyncImages(ctx context.Context, d *Daemon) error {
+ // In order to only have one task operation executed per image when syncing the images
+ // across the cluster, only leader node can launch the task, no others.
+ localAddress, err := node.ClusterAddress(d.db)
+ if err != nil {
+ return err
+ }
+ leader, err := d.gateway.LeaderAddress()
+ if err != nil {
+ return err
+ }
+ if localAddress != leader {
+ logger.Debug("Skipping image synchronization since we're not leader")
+ return nil
+ }
+
// Check how many images the current node owns and automatically sync all
// available images to other nodes which don't have yet.
- // NOTE: we ignore errors and only log the message in this function in case an error occurs
var fingerprints []string
- var err error
err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
fingerprints, err = tx.NodeGetImages()
- if err != nil {
- return err
- }
- return nil
+ return err
})
if err != nil {
- logger.Error("Failed to query image fingerprints of the node", log.Ctx{"err": err})
- return nil
+ return errors.Wrap(err, "Failed to query image fingerprints of the node")
}
for _, fingerprint := range fingerprints {
From f078771e09f3489d3ea3a5a3ee4ea592c415aceb Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Fri, 15 Feb 2019 20:47:39 +0800
Subject: [PATCH 6/8] Sync the image to the leader node due to the high
priority.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/daemon.go | 2 +-
lxd/images.go | 19 ++++++++++---------
2 files changed, 11 insertions(+), 10 deletions(-)
diff --git a/lxd/daemon.go b/lxd/daemon.go
index a244b0c113..5a31349e7c 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -247,7 +247,7 @@ func isJSONRequest(r *http.Request) bool {
return false
}
-// State creates a new State instance liked to our internal db and os.
+// State creates a new State instance linked to our internal db and os.
func (d *Daemon) State() *state.State {
return state.NewState(d.db, d.cluster, d.maas, d.os, d.endpoints)
}
diff --git a/lxd/images.go b/lxd/images.go
index eaa40554dd..38fcd68f5b 100644
--- a/lxd/images.go
+++ b/lxd/images.go
@@ -2104,21 +2104,22 @@ func imageSyncBetweenNodes(d *Daemon, fingerprint string) error {
// We spread the image for the nodes inside of cluster and we need to double
// check if the image already exists via DB since when one certain node is
- // going to create an image it will invoke the same routine. Hence we only
- // take the first node here as the target node for the image synchronization.
+ // going to create an image it will invoke the same routine.
+ // Also as the daily image synchronization task can be only launched by leader node,
+ // hence the leader node will have the image synced first with higher priority.
// In case the operation fails, the daily image synchronization task will check
// whether an image was synced successfully across the cluster and perform the
// same job if not.
- targetNodeAddress := addresses[0]
- syncNodeAddresses, err = d.cluster.ImageGetNodesHaveImage(fingerprint)
+ leader, err := d.gateway.LeaderAddress()
if err != nil {
- return errors.Wrap(err, "Failed to get nodes for the image synchronization")
+ return errors.Wrap(err, "Failed to fetch the leader node address")
}
-
- if shared.StringInSlice(targetNodeAddress, syncNodeAddresses) {
- return nil
+ var targetNodeAddress string
+ if shared.StringInSlice(leader, addresses) {
+ targetNodeAddress = leader
+ } else {
+ targetNodeAddress = addresses[0]
}
-
client, err := cluster.Connect(targetNodeAddress, d.endpoints.NetworkCert(), true)
if err != nil {
return errors.Wrap(err, "Failed to connect node for image synchronization")
From 43a25e76f91c662cb3ab8af69d2049bc5f2d8cde Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Sat, 16 Feb 2019 22:23:55 +0800
Subject: [PATCH 7/8] Import all images from the leader node to the new node
after it's joined.
Signed-off-by: gary-wzl77 <gary.wang at canonical.com>
---
lxd/api_cluster.go | 43 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 43 insertions(+)
diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 99aa60da53..97f2753881 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -523,6 +523,49 @@ func clusterPutJoin(d *Daemon, req api.ClusterPut) Response {
return err
}
+ // Re-use the client handler and import the images from the leader node which
+ // owns all available images to the joined node
+ go func() {
+ var fingerprints []string
+ err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
+ nodeInfo, err := tx.NodeByAddress(req.ClusterAddress)
+ if err != nil {
+ return err
+ }
+
+ fingerprints, err = tx.NodeGetImagesByID(nodeInfo.ID)
+ return err
+ })
+ if err != nil {
+ logger.Errorf("Failed to retrieve image fingerprint %s: %v", req.ClusterAddress, err)
+ return
+ }
+
+ imageImport := func(client lxd.ContainerServer, fingerprint string) error {
+ err := imageImportFromNode(filepath.Join(d.os.VarDir, "images"), client, fingerprint)
+ if err != nil {
+ return err
+ }
+
+ project := "default"
+ return d.cluster.ImageAssociateNode(project, fingerprint)
+ }
+
+ for _, fingerprint := range fingerprints {
+ ch := make(chan error)
+ go func() {
+ err := imageImport(client, fingerprint)
+ if err != nil {
+ logger.Errorf("Failed to import an image %s from %s: %v", fingerprint, req.ClusterAddress, err)
+ }
+ ch <- nil
+ }()
+ select {
+ case <-ch:
+ }
+ }
+ }()
+
// Add the cluster flag from the agent
version.UserAgentFeatures([]string{"cluster"})
From 011c8a2b6cfb4033fd8658ab36e375b0f566c29b Mon Sep 17 00:00:00 2001
From: gary-wzl77 <gary.wang at canonical.com>
Date: Mon, 18 Feb 2019 19:24:05 +0800
Subject: [PATCH 8/8] Update the test case to cover the image sync scenario for
joined node.
---
test/suites/clustering.sh | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index 66fd9541d6..c33e416848 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -1166,11 +1166,6 @@ test_clustering_image_replication() {
[ -f "${LXD_ONE_DIR}/images/${fingerprint}" ] || false
[ -f "${LXD_TWO_DIR}/images/${fingerprint}" ] || false
- # Delete the imported image
- LXD_DIR="${LXD_ONE_DIR}" lxc image delete testimage
- [ ! -f "${LXD_ONE_DIR}/images/${fingerprint}" ] || false
- [ ! -f "${LXD_TWO_DIR}/images/${fingerprint}" ] || false
-
# Spawn a third node
setup_clustering_netns 3
LXD_THREE_DIR=$(mktemp -d -p "${TEST_DIR}" XXX)
@@ -1178,6 +1173,16 @@ test_clustering_image_replication() {
ns3="${prefix}3"
spawn_lxd_and_join_cluster "${ns3}" "${bridge}" "${cert}" 3 1 "${LXD_THREE_DIR}"
+ # Wait for the test image to be synced into the joined node on the background
+ sleep 5
+ [ -f "${LXD_THREE_DIR}/images/${fingerprint}" ] || false
+
+ # Delete the imported image
+ LXD_DIR="${LXD_ONE_DIR}" lxc image delete testimage
+ [ ! -f "${LXD_ONE_DIR}/images/${fingerprint}" ] || false
+ [ ! -f "${LXD_TWO_DIR}/images/${fingerprint}" ] || false
+ [ ! -f "${LXD_THREE_DIR}/images/${fingerprint}" ] || false
+
# Import the test image on node3
LXD_DIR="${LXD_THREE_DIR}" ensure_import_testimage
More information about the lxc-devel
mailing list