[lxc-devel] [lxd/master] Propagate profile changes to cluster nodes

freeekanayaka on Github lxc-bot at linuxcontainers.org
Tue May 29 10:45:46 UTC 2018


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 838 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180529/d1b0fea1/attachment.bin>
-------------- next part --------------
From 7a27e85a0ad3fe07c53966752b7764473e60997e Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 29 May 2018 09:10:11 +0000
Subject: [PATCH 1/2] Extract expandConfigFromProfiles from expandConfig to
 avoid db interaction

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/container_lxc.go | 67 +++++++++++++++++++++++++++++++++++++---------------
 1 file changed, 48 insertions(+), 19 deletions(-)

diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index 7310ba142..6c008abae 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -464,7 +464,20 @@ func containerLXCCreate(s *state.State, args db.ContainerArgs) (container, error
 
 func containerLXCLoad(s *state.State, args db.ContainerArgs) (container, error) {
 	// Create the container struct
-	c := &containerLXC{
+	c := containerLXCInstantiate(s, args)
+
+	// Load the config.
+	err := c.init()
+	if err != nil {
+		return nil, err
+	}
+
+	return c, nil
+}
+
+// Create a container struct without initializing it.
+func containerLXCInstantiate(s *state.State, args db.ContainerArgs) *containerLXC {
+	return &containerLXC{
 		state:        s,
 		id:           args.ID,
 		name:         args.Name,
@@ -480,14 +493,6 @@ func containerLXCLoad(s *state.State, args db.ContainerArgs) (container, error)
 		stateful:     args.Stateful,
 		node:         args.Node,
 	}
-
-	// Load the config.
-	err := c.init()
-	if err != nil {
-		return nil, err
-	}
-
-	return c, nil
 }
 
 // The LXC container driver
@@ -1700,16 +1705,30 @@ func (c *containerLXC) initStorage() error {
 
 // Config handling
 func (c *containerLXC) expandConfig() error {
-	config := map[string]string{}
+	// Fetch profile configs
+	profileConfigs := make([]map[string]string, len(c.profiles))
 
 	// Apply all the profiles
-	for _, name := range c.profiles {
+	for i, name := range c.profiles {
 		profileConfig, err := c.state.Cluster.ProfileConfig(name)
 		if err != nil {
 			return err
 		}
+		profileConfigs[i] = profileConfig
+	}
 
-		for k, v := range profileConfig {
+	c.expandConfigFromProfiles(profileConfigs)
+
+	return nil
+}
+
+// Expand the container config using the given profile configs.
+func (c *containerLXC) expandConfigFromProfiles(profileConfigs []map[string]string) {
+	config := map[string]string{}
+
+	// Apply all the profiles
+	for i := range profileConfigs {
+		for k, v := range profileConfigs[i] {
 			config[k] = v
 		}
 	}
@@ -1720,20 +1739,31 @@ func (c *containerLXC) expandConfig() error {
 	}
 
 	c.expandedConfig = config
-	return nil
 }
 
 func (c *containerLXC) expandDevices() error {
-	devices := types.Devices{}
-
-	// Apply all the profiles
+	// Fetch profile devices
+	profileDevices := make([]types.Devices, len(c.profiles))
 	for _, p := range c.profiles {
-		profileDevices, err := c.state.Cluster.Devices(p, true)
+		devices, err := c.state.Cluster.Devices(p, true)
 		if err != nil {
 			return err
 		}
+		profileDevices = append(profileDevices, devices)
+	}
+
+	c.expandDevicesFromProfiles(profileDevices)
+
+	return nil
+}
+
+// Expand the container config using the given profile devices.
+func (c *containerLXC) expandDevicesFromProfiles(profileDevices []types.Devices) {
+	devices := types.Devices{}
 
-		for k, v := range profileDevices {
+	// Apply all the profiles
+	for i := range profileDevices {
+		for k, v := range profileDevices[i] {
 			devices[k] = v
 		}
 	}
@@ -1744,7 +1774,6 @@ func (c *containerLXC) expandDevices() error {
 	}
 
 	c.expandedDevices = devices
-	return nil
 }
 
 // setupUnixDevice() creates the unix device and sets up the necessary low-level

From b2ce6ebe9a75d74c676aa7e54aad30e63c752fe7 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Tue, 29 May 2018 06:49:49 +0000
Subject: [PATCH 2/2] Broadcast profile changes to other cluster nodes

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/profiles.go           |  35 ++++++++++-
 lxd/profiles_utils.go     | 145 +++++++++++++++++++++++++++++++++++++++++-----
 test/main.sh              |   1 +
 test/suites/clustering.sh |  68 ++++++++++++++++++++++
 4 files changed, 233 insertions(+), 16 deletions(-)

diff --git a/lxd/profiles.go b/lxd/profiles.go
index 87e6175cc..fee793eb2 100644
--- a/lxd/profiles.go
+++ b/lxd/profiles.go
@@ -10,6 +10,8 @@ import (
 
 	"github.com/gorilla/mux"
 
+	lxd "github.com/lxc/lxd/client"
+	"github.com/lxc/lxd/lxd/cluster"
 	"github.com/lxc/lxd/lxd/state"
 	"github.com/lxc/lxd/lxd/util"
 	"github.com/lxc/lxd/shared"
@@ -158,6 +160,21 @@ func getContainersWithProfile(s *state.State, profile string) []container {
 func profilePut(d *Daemon, r *http.Request) Response {
 	// Get the profile
 	name := mux.Vars(r)["name"]
+
+	if isClusterNotification(r) {
+		// In this case the ProfilePut request payload contains
+		// information about the old profile, since the new one has
+		// already been saved in the database.
+		old := api.ProfilePut{}
+		err := json.NewDecoder(r.Body).Decode(&old)
+		if err != nil {
+			return BadRequest(err)
+		}
+		err = doProfileUpdateCluster(d, name, old)
+		return SmartError(err)
+
+	}
+
 	id, profile, err := d.cluster.ProfileGet(name)
 	if err != nil {
 		return SmartError(fmt.Errorf("Failed to retrieve profile='%s'", name))
@@ -175,7 +192,23 @@ func profilePut(d *Daemon, r *http.Request) Response {
 		return BadRequest(err)
 	}
 
-	return SmartError(doProfileUpdate(d, name, id, profile, req))
+	err = doProfileUpdate(d, name, id, profile, req)
+
+	if err == nil && !isClusterNotification(r) {
+		// Notify all other nodes. If a node is down, it will be ignored.
+		notifier, err := cluster.NewNotifier(d.State(), d.endpoints.NetworkCert(), cluster.NotifyAlive)
+		if err != nil {
+			return SmartError(err)
+		}
+		err = notifier(func(client lxd.ContainerServer) error {
+			return client.UpdateProfile(name, profile.ProfilePut, "")
+		})
+		if err != nil {
+			return SmartError(err)
+		}
+	}
+
+	return SmartError(err)
 }
 
 func profilePatch(d *Daemon, r *http.Request) Response {
diff --git a/lxd/profiles_utils.go b/lxd/profiles_utils.go
index f6be6f4e6..c3c332769 100644
--- a/lxd/profiles_utils.go
+++ b/lxd/profiles_utils.go
@@ -6,8 +6,10 @@ import (
 
 	"github.com/lxc/lxd/lxd/db"
 	"github.com/lxc/lxd/lxd/db/query"
+	"github.com/lxc/lxd/lxd/types"
 	"github.com/lxc/lxd/shared"
 	"github.com/lxc/lxd/shared/api"
+	"github.com/pkg/errors"
 )
 
 func doProfileUpdate(d *Daemon, name string, id int64, profile *api.Profile, req api.ProfilePut) error {
@@ -22,7 +24,10 @@ func doProfileUpdate(d *Daemon, name string, id int64, profile *api.Profile, req
 		return err
 	}
 
-	containers := getContainersWithProfile(d.State(), name)
+	containers, err := getProfileContainersInfo(d.cluster, name)
+	if err != nil {
+		return errors.Wrapf(err, "failed to query containers associated with profile '%s'", name)
+	}
 
 	// Check if the root device is supposed to be changed or removed.
 	oldProfileRootDiskDeviceKey, oldProfileRootDiskDevice, _ := shared.GetRootDiskDevice(profile.Devices)
@@ -31,14 +36,13 @@ func doProfileUpdate(d *Daemon, name string, id int64, profile *api.Profile, req
 		// Check for containers using the device
 		for _, container := range containers {
 			// Check if the device is locally overridden
-			localDevices := container.LocalDevices()
-			k, v, _ := shared.GetRootDiskDevice(localDevices)
+			k, v, _ := shared.GetRootDiskDevice(container.Devices)
 			if k != "" && v["pool"] != "" {
 				continue
 			}
 
 			// Check what profile the device comes from
-			profiles := container.Profiles()
+			profiles := container.Profiles
 			for i := len(profiles) - 1; i >= 0; i-- {
 				_, profile, err := d.cluster.ProfileGet(profiles[i])
 				if err != nil {
@@ -114,20 +118,59 @@ func doProfileUpdate(d *Daemon, name string, id int64, profile *api.Profile, req
 		return err
 	}
 
-	// Update all the containers using the profile. Must be done after db.TxCommit due to DB lock.
+	// Update all the containers on this node using the profile. Must be
+	// done after db.TxCommit due to DB lock.
+	nodeName := ""
+	err = d.cluster.Transaction(func(tx *db.ClusterTx) error {
+		var err error
+		nodeName, err = tx.NodeName()
+		return err
+	})
+	if err != nil {
+		return errors.Wrap(err, "failed to query local node name")
+	}
 	failures := map[string]error{}
-	for _, c := range containers {
-		err = c.Update(db.ContainerArgs{
-			Architecture: c.Architecture(),
-			Ephemeral:    c.IsEphemeral(),
-			Config:       c.LocalConfig(),
-			Devices:      c.LocalDevices(),
-			Profiles:     c.Profiles(),
-			Description:  c.Description(),
-		}, true)
+	for _, args := range containers {
+		err := doProfileUpdateContainer(d, name, profile.ProfilePut, nodeName, args)
+		if err != nil {
+			failures[args.Name] = err
+		}
+	}
 
+	if len(failures) != 0 {
+		msg := "The following containers failed to update (profile change still saved):\n"
+		for cname, err := range failures {
+			msg += fmt.Sprintf(" - %s: %s\n", cname, err)
+		}
+		return fmt.Errorf("%s", msg)
+	}
+
+	return nil
+}
+
+// Like doProfileUpdate but does not update the database, since it was already
+// updated by doProfileUpdate itself, called on the notifying node.
+func doProfileUpdateCluster(d *Daemon, name string, old api.ProfilePut) error {
+	nodeName := ""
+	err := d.cluster.Transaction(func(tx *db.ClusterTx) error {
+		var err error
+		nodeName, err = tx.NodeName()
+		return err
+	})
+	if err != nil {
+		return errors.Wrap(err, "failed to query local node name")
+	}
+
+	containers, err := getProfileContainersInfo(d.cluster, name)
+	if err != nil {
+		return errors.Wrapf(err, "failed to query containers associated with profile '%s'", name)
+	}
+
+	failures := map[string]error{}
+	for _, args := range containers {
+		err := doProfileUpdateContainer(d, name, old, nodeName, args)
 		if err != nil {
-			failures[c.Name()] = err
+			failures[args.Name] = err
 		}
 	}
 
@@ -141,3 +184,75 @@ func doProfileUpdate(d *Daemon, name string, id int64, profile *api.Profile, req
 
 	return nil
 }
+
+// Profile update of a single container.
+func doProfileUpdateContainer(d *Daemon, name string, old api.ProfilePut, nodeName string, args db.ContainerArgs) error {
+	if args.Node != "" && args.Node != nodeName {
+		// No-op, this container does not belong to this node.
+		return nil
+	}
+
+	profileConfigs := make([]map[string]string, len(args.Profiles))
+	for i, profileName := range args.Profiles {
+		if profileName == name {
+			// Use the old config.
+			profileConfigs[i] = old.Config
+			continue
+		}
+		// Use the config currently in the database.
+		profileConfig, err := d.cluster.ProfileConfig(profileName)
+		if err != nil {
+			return errors.Wrapf(err, "failed to load profile config for '%s'", profileName)
+		}
+		profileConfigs[i] = profileConfig
+	}
+
+	profileDevices := make([]types.Devices, len(args.Profiles))
+	for i, profileName := range args.Profiles {
+		if profileName == name {
+			// Use the old devices
+			profileDevices[i] = old.Devices
+			continue
+		}
+		// Use the config currently in the database.
+		devices, err := d.cluster.Devices(profileName, true)
+		if err != nil {
+			return errors.Wrapf(err, "failed to load profile devices for '%s'", profileName)
+		}
+		profileDevices[i] = devices
+	}
+
+	c := containerLXCInstantiate(d.State(), args)
+	c.expandConfigFromProfiles(profileConfigs)
+	c.expandDevicesFromProfiles(profileDevices)
+
+	return c.Update(db.ContainerArgs{
+		Architecture: c.Architecture(),
+		Ephemeral:    c.IsEphemeral(),
+		Config:       c.LocalConfig(),
+		Devices:      c.LocalDevices(),
+		Profiles:     c.Profiles(),
+		Description:  c.Description(),
+	}, true)
+}
+
+// Query the db for information about containers associated with the given
+// profile.
+func getProfileContainersInfo(cluster *db.Cluster, profile string) ([]db.ContainerArgs, error) {
+	// Query the db for information about containers associated with the
+	// given profile.
+	names, err := cluster.ProfileContainersGet(profile)
+	if err != nil {
+		return nil, errors.Wrapf(err, "failed to query containers with profile '%s'", profile)
+	}
+	containers := make([]db.ContainerArgs, len(names))
+	for i, name := range names {
+		container, err := cluster.ContainerGet(name)
+		if err != nil {
+			return nil, errors.Wrapf(err, "failed to query container '%s'", name)
+		}
+		containers[i] = container
+	}
+
+	return containers, nil
+}
diff --git a/test/main.sh b/test/main.sh
index dc13c3738..22b9cc1f9 100755
--- a/test/main.sh
+++ b/test/main.sh
@@ -209,6 +209,7 @@ run_test test_clustering_containers "clustering containers"
 run_test test_clustering_storage "clustering storage"
 run_test test_clustering_network "clustering network"
 run_test test_clustering_publish "clustering publish"
+run_test test_clustering_profiles "clustering profiles"
 #run_test test_clustering_upgrade "clustering upgrade"
 
 # shellcheck disable=SC2034
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index bd318845b..70b770afd 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -797,3 +797,71 @@ test_clustering_publish() {
   kill_lxd "${LXD_TWO_DIR}"
 }
 
+test_clustering_profiles() {
+  setup_clustering_bridge
+  prefix="lxd$$"
+  bridge="${prefix}"
+
+  setup_clustering_netns 1
+  LXD_ONE_DIR=$(mktemp -d -p "${TEST_DIR}" XXX)
+  chmod +x "${LXD_ONE_DIR}"
+  ns1="${prefix}1"
+  spawn_lxd_and_bootstrap_cluster "${ns1}" "${bridge}" "${LXD_ONE_DIR}"
+
+  # Add a newline at the end of each line. YAML as weird rules..
+  cert=$(sed ':a;N;$!ba;s/\n/\n\n/g' "${LXD_ONE_DIR}/server.crt")
+
+  # Spawn a second node
+  setup_clustering_netns 2
+  LXD_TWO_DIR=$(mktemp -d -p "${TEST_DIR}" XXX)
+  chmod +x "${LXD_TWO_DIR}"
+  ns2="${prefix}2"
+  spawn_lxd_and_join_cluster "${ns2}" "${bridge}" "${cert}" 2 1 "${LXD_TWO_DIR}"
+
+  # Create an empty profile.
+  LXD_DIR="${LXD_TWO_DIR}" lxc profile create web
+
+  # Launch two containers on the two nodes, using the above profile.
+  LXD_DIR="${LXD_TWO_DIR}" ensure_import_testimage
+  LXD_DIR="${LXD_ONE_DIR}" lxc launch --target node1 -p default -p web testimage c1
+  LXD_DIR="${LXD_ONE_DIR}" lxc launch --target node2 -p default -p web testimage c2
+
+  # Edit the profile.
+  source=$(mktemp -d -p "${TEST_DIR}" XXX)
+  touch "${source}/hello"
+  chmod 755 "${source}"
+  chmod 644 "${source}/hello"
+  (
+    cat <<EOF
+config: {}
+description: ""
+devices:
+  web:
+    path: /mnt
+    source: "${source}"
+    type: disk
+name: web
+used_by:
+- /1.0/containers/c1
+- /1.0/containers/c2
+EOF
+  ) | LXD_DIR="${LXD_TWO_DIR}" lxc profile edit web
+
+  LXD_DIR="${LXD_TWO_DIR}" lxc exec c1 ls /mnt | grep -q hello
+  LXD_DIR="${LXD_TWO_DIR}" lxc exec c2 ls /mnt | grep -q hello
+
+  LXD_DIR="${LXD_TWO_DIR}" lxc stop c1 --force
+  LXD_DIR="${LXD_ONE_DIR}" lxc stop c2 --force
+
+  LXD_DIR="${LXD_TWO_DIR}" lxd shutdown
+  LXD_DIR="${LXD_ONE_DIR}" lxd shutdown
+  sleep 2
+  rm -f "${LXD_TWO_DIR}/unix.socket"
+  rm -f "${LXD_ONE_DIR}/unix.socket"
+
+  teardown_clustering_netns
+  teardown_clustering_bridge
+
+  kill_lxd "${LXD_ONE_DIR}"
+  kill_lxd "${LXD_TWO_DIR}"
+}


More information about the lxc-devel mailing list