[lxc-devel] [lxd/master] Clustering related ceph fixes

freeekanayaka on Github lxc-bot at linuxcontainers.org
Wed Mar 7 21:30:58 UTC 2018


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 350 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180307/1a1747e5/attachment.bin>
-------------- next part --------------
From 6aecb30af25fb729a7f294f730fb9dbe574e1bf8 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 7 Mar 2018 11:48:57 +0000
Subject: [PATCH 1/5] Fix ceph storage pool creation in a cluster

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/storage_pools.go       |  4 ++--
 lxd/storage_pools_utils.go | 14 ++++++++++++--
 2 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/lxd/storage_pools.go b/lxd/storage_pools.go
index 510d899f4..7defab3af 100644
--- a/lxd/storage_pools.go
+++ b/lxd/storage_pools.go
@@ -98,7 +98,7 @@ func storagePoolsPost(d *Daemon, r *http.Request) Response {
 			return BadRequest(err)
 		}
 		err = doStoragePoolCreateInternal(
-			d.State(), req.Name, req.Description, req.Driver, req.Config)
+			d.State(), req.Name, req.Description, req.Driver, req.Config, true)
 		if err != nil {
 			return SmartError(err)
 		}
@@ -209,7 +209,7 @@ func storagePoolsPostCluster(d *Daemon, req api.StoragePoolsPost) error {
 		return err
 	}
 	err = doStoragePoolCreateInternal(
-		d.State(), req.Name, req.Description, req.Driver, req.Config)
+		d.State(), req.Name, req.Description, req.Driver, req.Config, false)
 	if err != nil {
 		return err
 	}
diff --git a/lxd/storage_pools_utils.go b/lxd/storage_pools_utils.go
index b969beb4a..6a0299b36 100644
--- a/lxd/storage_pools_utils.go
+++ b/lxd/storage_pools_utils.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"fmt"
+	"os"
 	"strings"
 
 	"github.com/lxc/lxd/lxd/db"
@@ -222,19 +223,28 @@ func storagePoolCreateInternal(state *state.State, poolName, poolDescription str
 		}
 		dbStoragePoolDeleteAndUpdateCache(state.Cluster, poolName)
 	}()
-	err = doStoragePoolCreateInternal(state, poolName, poolDescription, driver, config)
+	err = doStoragePoolCreateInternal(state, poolName, poolDescription, driver, config, false)
 	tryUndo = err != nil
 	return err
 }
 
 // This performs all non-db related work needed to create the pool.
-func doStoragePoolCreateInternal(state *state.State, poolName, poolDescription string, driver string, config map[string]string) error {
+func doStoragePoolCreateInternal(state *state.State, poolName, poolDescription string, driver string, config map[string]string, isNotification bool) error {
 	tryUndo := true
 	s, err := storagePoolInit(state, poolName)
 	if err != nil {
 		return err
 	}
 
+	// If this is a clustering notification for a ceph storage, we don't
+	// want this node to actually create the pool, as it's already been
+	// done by the node that triggered this notification. We just need to
+	// create the storage pool directory.
+	if s, ok := s.(*storageCeph); ok && isNotification {
+		volumeMntPoint := getStoragePoolVolumeMountPoint(s.pool.Name, s.volume.Name)
+		return os.MkdirAll(volumeMntPoint, 0711)
+
+	}
 	err = s.StoragePoolCreate()
 	if err != nil {
 		return err

From 5fbe2756d326d0a319962605f5ef09f2dcc6c9c2 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 7 Mar 2018 11:55:41 +0000
Subject: [PATCH 2/5] Fix storage pool deletion in a cluster

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/storage_pools.go | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/lxd/storage_pools.go b/lxd/storage_pools.go
index 7defab3af..c0a80a185 100644
--- a/lxd/storage_pools.go
+++ b/lxd/storage_pools.go
@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"net/http"
+	"os"
 	"strings"
 	"sync"
 
@@ -530,6 +531,21 @@ func storagePoolDelete(d *Daemon, r *http.Request) Response {
 		return InternalError(err)
 	}
 
+	// If this is a notification for a ceph pool deletion, we don't want to
+	// actually delete the pool, since that will be done by the node that
+	// notified us. We just need to delete the local mountpoint.
+	if s, ok := s.(*storageCeph); ok && isClusterNotification(r) {
+		// Delete the mountpoint for the storage pool.
+		poolMntPoint := getStoragePoolMountPoint(s.pool.Name)
+		if shared.PathExists(poolMntPoint) {
+			err := os.RemoveAll(poolMntPoint)
+			if err != nil {
+				return SmartError(err)
+			}
+		}
+		return EmptySyncResponse
+	}
+
 	err = s.StoragePoolDelete()
 	if err != nil {
 		return InternalError(err)

From 339917b940573f86f11d3de847e68657574e8a58 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 7 Mar 2018 15:04:13 +0000
Subject: [PATCH 3/5] Add db patch to duplicate ceph volume rows across all
 nodes

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/db/cluster/schema.go      |   2 +-
 lxd/db/cluster/update.go      | 107 ++++++++++++++++++++++++++++++++++++++++++
 lxd/db/cluster/update_test.go |  71 ++++++++++++++++++++++++++++
 3 files changed, 179 insertions(+), 1 deletion(-)

diff --git a/lxd/db/cluster/schema.go b/lxd/db/cluster/schema.go
index 5626ea50c..9c77c2ad7 100644
--- a/lxd/db/cluster/schema.go
+++ b/lxd/db/cluster/schema.go
@@ -235,5 +235,5 @@ CREATE TABLE storage_volumes_config (
     FOREIGN KEY (storage_volume_id) REFERENCES storage_volumes (id) ON DELETE CASCADE
 );
 
-INSERT INTO schema (version, updated_at) VALUES (5, strftime("%s"))
+INSERT INTO schema (version, updated_at) VALUES (6, strftime("%s"))
 `
diff --git a/lxd/db/cluster/update.go b/lxd/db/cluster/update.go
index ee50f8e79..1630029df 100644
--- a/lxd/db/cluster/update.go
+++ b/lxd/db/cluster/update.go
@@ -3,7 +3,9 @@ package cluster
 import (
 	"database/sql"
 
+	"github.com/lxc/lxd/lxd/db/query"
 	"github.com/lxc/lxd/lxd/db/schema"
+	"github.com/pkg/errors"
 )
 
 // Schema for the cluster database.
@@ -28,6 +30,111 @@ var updates = map[int]schema.Update{
 	3: updateFromV2,
 	4: updateFromV3,
 	5: updateFromV4,
+	6: updateFromV5,
+}
+
+// For ceph volumes, add node-specific rows for all existing nodes, since any
+// node is able to access those volumes.
+func updateFromV5(tx *sql.Tx) error {
+	// Fetch the IDs of all existing nodes.
+	nodeIDs, err := query.SelectIntegers(tx, "SELECT id FROM nodes")
+	if err != nil {
+		return errors.Wrap(err, "failed to get IDs of current nodes")
+	}
+
+	// Fetch the IDs of all existing ceph volumes.
+	volumeIDs, err := query.SelectIntegers(tx, `
+SELECT storage_volumes.id FROM storage_volumes
+    JOIN storage_pools ON storage_volumes.storage_pool_id=storage_pools.id
+    WHERE storage_pools.driver='ceph'
+`)
+	if err != nil {
+		return errors.Wrap(err, "failed to get IDs of current volumes")
+	}
+
+	// Fetch all existing ceph volumes.
+	volumes := make([]struct {
+		ID            int
+		Name          string
+		StoragePoolID int
+		NodeID        int
+		Type          int
+		Description   string
+	}, len(volumeIDs))
+	stmt := `
+SELECT
+    storage_volumes.id,
+    storage_volumes.name,
+    storage_volumes.storage_pool_id,
+    storage_volumes.node_id,
+    storage_volumes.type,
+    storage_volumes.description
+FROM storage_volumes
+    JOIN storage_pools ON storage_volumes.storage_pool_id=storage_pools.id
+    WHERE storage_pools.driver='ceph'
+`
+	err = query.SelectObjects(tx, func(i int) []interface{} {
+		return []interface{}{
+			&volumes[i].ID,
+			&volumes[i].Name,
+			&volumes[i].StoragePoolID,
+			&volumes[i].NodeID,
+			&volumes[i].Type,
+			&volumes[i].Description,
+		}
+	}, stmt)
+	if err != nil {
+		return errors.Wrap(err, "failed to fetch current volumes")
+	}
+
+	// Duplicate each volume row across all nodes, and keep track of the
+	// new volume IDs that we've inserted.
+	created := make(map[int][]int64, 0) // Existing volume ID to new volumes IDs.
+	columns := []string{"name", "storage_pool_id", "node_id", "type", "description"}
+	for _, volume := range volumes {
+		for _, nodeID := range nodeIDs {
+			if volume.NodeID == nodeID {
+				// This node already has the volume row
+				continue
+			}
+			values := []interface{}{
+				volume.Name,
+				volume.StoragePoolID,
+				nodeID,
+				volume.Type,
+				volume.Description,
+			}
+			id, err := query.UpsertObject(tx, "storage_volumes", columns, values)
+			if err != nil {
+				return errors.Wrap(err, "failed to insert new volume")
+			}
+			_, ok := created[volume.ID]
+			if !ok {
+				created[volume.ID] = make([]int64, 0)
+			}
+			created[volume.ID] = append(created[volume.ID], id)
+		}
+	}
+
+	// Duplicate each volume config row across all nodes.
+	for id, newIDs := range created {
+		config, err := query.SelectConfig(tx, "storage_volumes_config", "storage_volume_id=?", id)
+		if err != nil {
+			errors.Wrap(err, "failed to fetch volume config")
+		}
+		for _, newID := range newIDs {
+			for key, value := range config {
+				_, err := tx.Exec(`
+INSERT INTO storage_volumes_config(storage_volume_id, key, value) VALUES(?, ?, ?)
+`, newID, key, value)
+				if err != nil {
+					return errors.Wrap(err, "failed to insert new volume config")
+				}
+			}
+		}
+	}
+
+	return nil
 }
 
 func updateFromV4(tx *sql.Tx) error {
diff --git a/lxd/db/cluster/update_test.go b/lxd/db/cluster/update_test.go
index ae29bf1fd..369f52864 100644
--- a/lxd/db/cluster/update_test.go
+++ b/lxd/db/cluster/update_test.go
@@ -7,6 +7,7 @@ import (
 	"time"
 
 	"github.com/lxc/lxd/lxd/db/cluster"
+	"github.com/lxc/lxd/lxd/db/query"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 )
@@ -218,3 +219,73 @@ func TestUpdateFromV3(t *testing.T) {
 	_, err = db.Exec("INSERT INTO storage_pools_nodes VALUES (1, 1, 1)")
 	require.Error(t, err)
 }
+
+func TestUpdateFromV5(t *testing.T) {
+	schema := cluster.Schema()
+	db, err := schema.ExerciseUpdate(6, func(db *sql.DB) {
+		// Create two nodes.
+		_, err := db.Exec(
+			"INSERT INTO nodes VALUES (1, 'n1', '', '1.2.3.4:666', 1, 32, ?, 0)",
+			time.Now())
+		require.NoError(t, err)
+		_, err = db.Exec(
+			"INSERT INTO nodes VALUES (2, 'n2', '', '5.6.7.8:666', 1, 32, ?, 0)",
+			time.Now())
+		require.NoError(t, err)
+
+		// Create a pool p1 of type zfs.
+		_, err = db.Exec("INSERT INTO storage_pools VALUES (1, 'p1', 'zfs', '', 0)")
+		require.NoError(t, err)
+
+		// Create a pool p2 of type ceph.
+		_, err = db.Exec("INSERT INTO storage_pools VALUES (2, 'p2', 'ceph', '', 0)")
+
+		// Create a volume v1 on pool p1, associated with n1 and a config.
+		require.NoError(t, err)
+		_, err = db.Exec("INSERT INTO storage_volumes VALUES (1, 'v1', 1, 1, 1, '')")
+		require.NoError(t, err)
+		_, err = db.Exec("INSERT INTO storage_volumes_config VALUES (1, 1, 'k', 'v')")
+		require.NoError(t, err)
+
+		// Create a volume v1 on pool p2, associated with n1 and a config.
+		require.NoError(t, err)
+		_, err = db.Exec("INSERT INTO storage_volumes VALUES (2, 'v1', 2, 1, 1, '')")
+		require.NoError(t, err)
+		_, err = db.Exec("INSERT INTO storage_volumes_config VALUES (2, 2, 'k', 'v')")
+		require.NoError(t, err)
+
+		// Create a volume v2 on pool p2, associated with n2 and no config.
+		require.NoError(t, err)
+		_, err = db.Exec("INSERT INTO storage_volumes VALUES (3, 'v2', 2, 2, 1, '')")
+		require.NoError(t, err)
+	})
+	require.NoError(t, err)
+
+	// Check that a volume row for n2 was added for v1 on p2.
+	tx, err := db.Begin()
+	defer tx.Rollback()
+	require.NoError(t, err)
+	nodeIDs, err := query.SelectIntegers(tx, `
+SELECT node_id FROM storage_volumes WHERE storage_pool_id=2 AND name='v1' ORDER BY node_id
+`)
+	require.NoError(t, err)
+	require.Equal(t, []int{1, 2}, nodeIDs)
+
+	// Check that a volume row for n1 was added for v2 on p2.
+	nodeIDs, err = query.SelectIntegers(tx, `
+SELECT node_id FROM storage_volumes WHERE storage_pool_id=2 AND name='v2' ORDER BY node_id
+`)
+	require.NoError(t, err)
+	require.Equal(t, []int{1, 2}, nodeIDs)
+
+	// Check that the config for volume v1 on p2 was duplicated.
+	volumeIDs, err := query.SelectIntegers(tx, `
+SELECT id FROM storage_volumes WHERE storage_pool_id=2 AND name='v1' ORDER BY id
+`)
+	require.NoError(t, err)
+	require.Equal(t, []int{2, 4}, volumeIDs)
+	config1, err := query.SelectConfig(tx, "storage_volumes_config", "storage_volume_id=?", volumeIDs[0])
+	require.NoError(t, err)
+	config2, err := query.SelectConfig(tx, "storage_volumes_config", "storage_volume_id=?", volumeIDs[1])
+	require.Equal(t, config1, config2)
+}

From 7b645898bcf07a6e9079d4e84c15a937ed581b20 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 7 Mar 2018 19:42:55 +0000
Subject: [PATCH 4/5] Apply ceph volume db changes to all nodes

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/db/storage_pools.go      | 134 ++++++++++++++++++++++++++++++++++---------
 lxd/db/storage_pools_test.go |  60 ++++++++++++++++---
 lxd/db/storage_volumes.go    |  16 ++++++
 3 files changed, 175 insertions(+), 35 deletions(-)

diff --git a/lxd/db/storage_pools.go b/lxd/db/storage_pools.go
index 43c683bbc..2166b8573 100644
--- a/lxd/db/storage_pools.go
+++ b/lxd/db/storage_pools.go
@@ -507,6 +507,23 @@ func storagePoolConfigAdd(tx *sql.Tx, poolID, nodeID int64, poolConfig map[strin
 	return nil
 }
 
+// StoragePoolDriver returns the driver of the pool with the given ID.
+func storagePoolDriverGet(tx *sql.Tx, id int64) (string, error) {
+	stmt := "SELECT driver FROM storage_pools WHERE id=?"
+	drivers, err := query.SelectStrings(tx, stmt, id)
+	if err != nil {
+		return "", err
+	}
+	switch len(drivers) {
+	case 0:
+		return "", NoSuchObjectError
+	case 1:
+		return drivers[0], nil
+	default:
+		return "", fmt.Errorf("more than one pool has the given ID")
+	}
+}
+
 // Update storage pool.
 func (c *Cluster) StoragePoolUpdate(poolName, description string, poolConfig map[string]string) error {
 	poolID, _, err := c.StoragePoolGet(poolName)
@@ -732,21 +749,20 @@ func (c *Cluster) StoragePoolVolumeUpdate(volumeName string, volumeType int, poo
 		return err
 	}
 
-	err = StorageVolumeConfigClear(tx, volumeID)
-	if err != nil {
-		tx.Rollback()
-		return err
-	}
+	err = storagePoolVolumeReplicateIfCeph(tx, volumeID, volumeName, volumeType, poolID, func(volumeID int64) error {
+		err = StorageVolumeConfigClear(tx, volumeID)
+		if err != nil {
+			return err
+		}
 
-	err = StorageVolumeConfigAdd(tx, volumeID, volumeConfig)
-	if err != nil {
-		tx.Rollback()
-		return err
-	}
+		err = StorageVolumeConfigAdd(tx, volumeID, volumeConfig)
+		if err != nil {
+			return err
+		}
 
-	err = StorageVolumeDescriptionUpdate(tx, volumeID, volumeDescription)
+		return StorageVolumeDescriptionUpdate(tx, volumeID, volumeDescription)
+	})
 	if err != nil {
-		tx.Rollback()
 		return err
 	}
 
@@ -760,12 +776,21 @@ func (c *Cluster) StoragePoolVolumeDelete(volumeName string, volumeType int, poo
 		return err
 	}
 
-	_, err = exec(c.db, "DELETE FROM storage_volumes WHERE id=?", volumeID)
+	tx, err := begin(c.db)
 	if err != nil {
 		return err
 	}
 
-	return nil
+	err = storagePoolVolumeReplicateIfCeph(tx, volumeID, volumeName, volumeType, poolID, func(volumeID int64) error {
+		_, err = tx.Exec("DELETE FROM storage_volumes WHERE id=?", volumeID)
+		return err
+	})
+	if err != nil {
+		tx.Rollback()
+		return err
+	}
+
+	return TxCommit(tx)
 }
 
 // Rename storage volume attached to a given storage pool.
@@ -780,7 +805,10 @@ func (c *Cluster) StoragePoolVolumeRename(oldVolumeName string, newVolumeName st
 		return err
 	}
 
-	_, err = tx.Exec("UPDATE storage_volumes SET name=? WHERE id=? AND type=?", newVolumeName, volumeID, volumeType)
+	err = storagePoolVolumeReplicateIfCeph(tx, volumeID, oldVolumeName, volumeType, poolID, func(volumeID int64) error {
+		_, err = tx.Exec("UPDATE storage_volumes SET name=? WHERE id=? AND type=?", newVolumeName, volumeID, volumeType)
+		return err
+	})
 	if err != nil {
 		tx.Rollback()
 		return err
@@ -789,6 +817,34 @@ func (c *Cluster) StoragePoolVolumeRename(oldVolumeName string, newVolumeName st
 	return TxCommit(tx)
 }
 
+// This a convenience to replicate a certain volume change to all nodes if the
+// underlying driver is ceph.
+func storagePoolVolumeReplicateIfCeph(tx *sql.Tx, volumeID int64, volumeName string, volumeType int, poolID int64, f func(int64) error) error {
+	driver, err := storagePoolDriverGet(tx, poolID)
+	if err != nil {
+		return err
+	}
+	volumeIDs := []int64{volumeID}
+
+	// If this is a ceph volume, we want to duplicate the change across the
+	// the rows for all other nodes.
+	if driver == "ceph" {
+		volumeIDs, err = storageVolumeIDsGet(tx, volumeName, volumeType, poolID)
+		if err != nil {
+			return err
+		}
+	}
+
+	for _, volumeID := range volumeIDs {
+		err := f(volumeID)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 // Create new storage volume attached to a given storage pool.
 func (c *Cluster) StoragePoolVolumeCreate(volumeName, volumeDescription string, volumeType int, poolID int64, volumeConfig map[string]string) (int64, error) {
 	tx, err := begin(c.db)
@@ -796,23 +852,47 @@ func (c *Cluster) StoragePoolVolumeCreate(volumeName, volumeDescription string,
 		return -1, err
 	}
 
-	result, err := tx.Exec("INSERT INTO storage_volumes (storage_pool_id, node_id, type, name, description) VALUES (?, ?, ?, ?, ?)",
-		poolID, c.nodeID, volumeType, volumeName, volumeDescription)
+	nodeIDs := []int{int(c.nodeID)}
+	driver, err := storagePoolDriverGet(tx, poolID)
 	if err != nil {
 		tx.Rollback()
 		return -1, err
 	}
-
-	volumeID, err := result.LastInsertId()
-	if err != nil {
-		tx.Rollback()
-		return -1, err
+	// If the driver is ceph, create a volume entry for each node.
+	if driver == "ceph" {
+		nodeIDs, err = query.SelectIntegers(tx, "SELECT id FROM nodes")
+		if err != nil {
+			tx.Rollback()
+			return -1, err
+		}
 	}
 
-	err = StorageVolumeConfigAdd(tx, volumeID, volumeConfig)
-	if err != nil {
-		tx.Rollback()
-		return -1, err
+	var thisVolumeID int64
+	for _, nodeID := range nodeIDs {
+		result, err := tx.Exec(`
+INSERT INTO storage_volumes (storage_pool_id, node_id, type, name, description) VALUES (?, ?, ?, ?, ?)
+`,
+			poolID, nodeID, volumeType, volumeName, volumeDescription)
+		if err != nil {
+			tx.Rollback()
+			return -1, err
+		}
+
+		volumeID, err := result.LastInsertId()
+		if err != nil {
+			tx.Rollback()
+			return -1, err
+		}
+		if int64(nodeID) == c.nodeID {
+			// Return the ID of the volume created on this node.
+			thisVolumeID = volumeID
+		}
+
+		err = StorageVolumeConfigAdd(tx, volumeID, volumeConfig)
+		if err != nil {
+			tx.Rollback()
+			return -1, err
+		}
 	}
 
 	err = TxCommit(tx)
@@ -820,7 +900,7 @@ func (c *Cluster) StoragePoolVolumeCreate(volumeName, volumeDescription string,
 		return -1, err
 	}
 
-	return volumeID, nil
+	return thisVolumeID, nil
 }
 
 // StoragePoolVolumeGetTypeID returns the ID of a storage volume on a given
diff --git a/lxd/db/storage_pools_test.go b/lxd/db/storage_pools_test.go
index b4a06081d..d29b2ad61 100644
--- a/lxd/db/storage_pools_test.go
+++ b/lxd/db/storage_pools_test.go
@@ -72,19 +72,63 @@ func TestStoragePoolsCreatePending_NonExistingNode(t *testing.T) {
 }
 
 // If a pool with the given name already exists but has different driver, an
-// error is returned.
-func TestStoragePoolsCreatePending_DriverMismatch(t *testing.T) {
-	tx, cleanup := db.NewTestClusterTx(t)
+// error is returned. Likewise, if volume is updated or deleted, it's udpated
+// or deleted on all nodes.
+func TestStoragePoolVolume_Ceph(t *testing.T) {
+	cluster, cleanup := db.NewTestCluster(t)
 	defer cleanup()
 
-	_, err := tx.NodeAdd("buzz", "1.2.3.4:666")
+	// Create a second no (beyond the default one).
+	err := cluster.Transaction(func(tx *db.ClusterTx) error {
+		_, err := tx.NodeAdd("n1", "1.2.3.4:666")
+		return err
+	})
 	require.NoError(t, err)
-	_, err = tx.NodeAdd("rusp", "5.6.7.8:666")
+
+	poolID, err := cluster.StoragePoolCreate("p1", "", "ceph", nil)
 	require.NoError(t, err)
 
-	err = tx.StoragePoolCreatePending("buzz", "pool1", "dir", map[string]string{})
+	config := map[string]string{"k": "v"}
+	volumeID, err := cluster.StoragePoolVolumeCreate("v1", "", 1, poolID, config)
+	require.NoError(t, err)
+
+	// The returned volume ID is the one of the volume created on the local
+	// node (node 1).
+	thisVolumeID, _, err := cluster.StoragePoolVolumeGetType("v1", 1, poolID, 1)
 	require.NoError(t, err)
+	assert.Equal(t, volumeID, thisVolumeID)
 
-	err = tx.StoragePoolCreatePending("rusp", "pool1", "zfs", map[string]string{})
-	require.EqualError(t, err, "pool already exists with a different driver")
+	// Another volume was created for the second node.
+	_, volume, err := cluster.StoragePoolVolumeGetType("v1", 1, poolID, 2)
+	require.NoError(t, err)
+	assert.NotNil(t, volume)
+	assert.Equal(t, config, volume.Config)
+
+	// Update the volume
+	config["k"] = "v2"
+	err = cluster.StoragePoolVolumeUpdate("v1", 1, poolID, "volume 1", config)
+	require.NoError(t, err)
+	for _, nodeID := range []int64{1, 2} {
+		_, volume, err := cluster.StoragePoolVolumeGetType("v1", 1, poolID, nodeID)
+		require.NoError(t, err)
+		assert.Equal(t, "volume 1", volume.Description)
+		assert.Equal(t, config, volume.Config)
+	}
+	err = cluster.StoragePoolVolumeRename("v1", "v1-new", 1, poolID)
+	require.NoError(t, err)
+	for _, nodeID := range []int64{1, 2} {
+		_, volume, err := cluster.StoragePoolVolumeGetType("v1-new", 1, poolID, nodeID)
+		require.NoError(t, err)
+		assert.NotNil(t, volume)
+	}
+	require.NoError(t, err)
+
+	// Delete the volume
+	err = cluster.StoragePoolVolumeDelete("v1-new", 1, poolID)
+	require.NoError(t, err)
+	for _, nodeID := range []int64{1, 2} {
+		_, volume, err := cluster.StoragePoolVolumeGetType("v1-new", 1, poolID, nodeID)
+		assert.Equal(t, db.NoSuchObjectError, err)
+		assert.Nil(t, volume)
+	}
 }
diff --git a/lxd/db/storage_volumes.go b/lxd/db/storage_volumes.go
index ca7d331e8..4df68a963 100644
--- a/lxd/db/storage_volumes.go
+++ b/lxd/db/storage_volumes.go
@@ -154,6 +154,22 @@ func StorageVolumeConfigClear(tx *sql.Tx, volumeID int64) error {
 	return nil
 }
 
+// Get the IDs of all volumes with the given name and type associated with the
+// given pool, regardless of their node_id column.
+func storageVolumeIDsGet(tx *sql.Tx, volumeName string, volumeType int, poolID int64) ([]int64, error) {
+	ids, err := query.SelectIntegers(tx, `
+SELECT id FROM storage_volumes WHERE name=? AND type=? AND storage_pool_id=?
+`, volumeName, volumeType, poolID)
+	if err != nil {
+		return nil, err
+	}
+	ids64 := make([]int64, len(ids))
+	for i, id := range ids {
+		ids64[i] = int64(id)
+	}
+	return ids64, nil
+}
+
 func (c *Cluster) StorageVolumeCleanupImages(fingerprints []string) error {
 	stmt := fmt.Sprintf(
 		"DELETE FROM storage_volumes WHERE type=? AND name NOT IN %s",

From 06eb2bef74143d6b37c12ff3af658b1a72eb454e Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Wed, 7 Mar 2018 21:05:00 +0000
Subject: [PATCH 5/5] Honor LXD_BACKEND in clustering_storage tests

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 test/includes/clustering.sh | 54 +++++++++++++++++++++++++++++++++++++++++----
 test/suites/clustering.sh   | 25 +++++++++++++++++----
 2 files changed, 71 insertions(+), 8 deletions(-)

diff --git a/test/includes/clustering.sh b/test/includes/clustering.sh
index 1afeac8b5..63efe7ff1 100644
--- a/test/includes/clustering.sh
+++ b/test/includes/clustering.sh
@@ -123,14 +123,36 @@ spawn_lxd_and_bootstrap_cluster() {
   (
     set -e
 
-    cat <<EOF | lxd init --preseed
+    # Only dir, btrfs and ceph backends are supported for clustering
+    # tests, since zfs volumes are global.
+    driver="${LXD_BACKEND}"
+    if [ "${driver}" = "zfs" ] || [ "${driver}" = "ceph" ]; then
+      driver="dir"
+    fi
+
+    cat > "${LXD_DIR}/preseed.yaml" <<EOF
 config:
   core.trust_password: sekret
   core.https_address: 10.1.1.101:8443
   images.auto_update_interval: 0
 storage_pools:
 - name: data
-  driver: dir
+  driver: $driver
+EOF
+    if [ "${driver}" = "btrfs" ]; then
+      cat >> "${LXD_DIR}/preseed.yaml" <<EOF
+  config:
+    size: 100GB
+EOF
+    fi
+    if [ "${driver}" = "ceph" ]; then
+      cat >> "${LXD_DIR}/preseed.yaml" <<EOF
+  config:
+    volume.size: 25GB
+    ceph.osd.pg_num: 8
+EOF
+    fi
+    cat >> "${LXD_DIR}/preseed.yaml" <<EOF
 networks:
 - name: $bridge
   type: bridge
@@ -148,6 +170,7 @@ cluster:
   server_name: node1
   enabled: true
 EOF
+  cat "${LXD_DIR}/preseed.yaml" | lxd init --preseed
   )
 }
 
@@ -165,13 +188,35 @@ spawn_lxd_and_join_cluster() {
   (
     set -e
 
-    cat <<EOF | lxd init --preseed
+    # Only dir, btrfs and ceph backends are supported for clustering
+    # tests, since zfs volumes are global.
+    driver="${LXD_BACKEND}"
+    if [ "${driver}" = "zfs" ] || [ "${driver}" = "ceph" ]; then
+      driver="dir"
+    fi
+
+    cat > "${LXD_DIR}/preseed.yaml" <<EOF
 config:
   core.https_address: 10.1.1.10${index}:8443
   images.auto_update_interval: 0
 storage_pools:
 - name: data
-  driver: dir
+  driver: $driver
+EOF
+    if [ "${driver}" = "btrfs" ]; then
+      cat >> "${LXD_DIR}/preseed.yaml" <<EOF
+  config:
+    size: 100GB
+EOF
+    if [ "${driver}" = "ceph" ]; then
+      cat >> "${LXD_DIR}/preseed.yaml" <<EOF
+  config:
+    volume.size: 25GB
+    ceph.osd.pg_num: 8
+EOF
+    fi
+    fi
+    cat >> "${LXD_DIR}/preseed.yaml" <<EOF
 networks:
 - name: $bridge
   type: bridge
@@ -192,5 +237,6 @@ cluster:
   cluster_certificate: "$cert"
 cluster_password: sekret
 EOF
+  cat "${LXD_DIR}/preseed.yaml" | lxd init --preseed
   )
 }
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index b0122f3e5..8bf7b5822 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -286,18 +286,35 @@ test_clustering_storage() {
   # Trying to pass config values other than 'source' results in an error
   ! LXD_DIR="${LXD_ONE_DIR}" lxc storage create pool1 dir source=/foo size=123 --target node1
 
+  # Only dir, btrfs and ceph backends are supported for clustering
+  # tests, since zfs volumes are global.
+  driver="${LXD_BACKEND}"
+  if [ "${driver}" = "zfs" ] || [ "${driver}" = "ceph" ]; then
+    driver="dir"
+  fi
+
   # Define storage pools on the two nodes
-  LXD_DIR="${LXD_ONE_DIR}" lxc storage create pool1 dir --target node1
+  driver_config=""
+  if [ "${driver}" = "btrfs" ]; then
+      driver_config="size=20GB"
+  fi
+  if [ "${driver}" = "ceph" ]; then
+      driver_config="volume.size=25MB ceph.osd.pg_num=8"
+  fi
+
+  LXD_DIR="${LXD_ONE_DIR}" lxc storage create pool1 "${driver}" "${driver_config}" --target node1
   LXD_DIR="${LXD_TWO_DIR}" lxc storage show pool1 | grep -q node1
   ! LXD_DIR="${LXD_TWO_DIR}" lxc storage show pool1 | grep -q node2
-  LXD_DIR="${LXD_ONE_DIR}" lxc storage create pool1 dir --target node2
+  LXD_DIR="${LXD_ONE_DIR}" lxc storage create pool1 "${driver}" "${driver_config}" --target node2
   LXD_DIR="${LXD_ONE_DIR}" lxc storage show pool1 | grep status: | grep -q Pending
 
   # The source config key is not legal for the final pool creation
-  ! LXD_DIR="${LXD_ONE_DIR}" lxc storage create pool1 dir source=/foo
+  if [ "${driver}" = "dir" ]; then
+    ! LXD_DIR="${LXD_ONE_DIR}" lxc storage create pool1 dir source=/foo
+  fi
 
   # Create the storage pool
-  LXD_DIR="${LXD_TWO_DIR}" lxc storage create pool1 dir
+  LXD_DIR="${LXD_TWO_DIR}" lxc storage create pool1 "${driver}"
   LXD_DIR="${LXD_ONE_DIR}" lxc storage show pool1 | grep status: | grep -q Created
 
   # The 'source' config key is omitted when showing the cluster


More information about the lxc-devel mailing list