[lxc-devel] [lxd/master] Custom schema queries

freeekanayaka on Github lxc-bot at linuxcontainers.org
Fri Apr 27 09:39:43 UTC 2018


A non-text attachment was scrubbed...
Name: not available
Type: text/x-mailbox
Size: 406 bytes
Desc: not available
URL: <http://lists.linuxcontainers.org/pipermail/lxc-devel/attachments/20180427/c5c28b0b/attachment.bin>
-------------- next part --------------
From 97c9d706c2f70d6e418bde5c812c785d07785439 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Fri, 27 Apr 2018 09:00:29 +0000
Subject: [PATCH 1/3] Add a new Schema.File() method to load extra queries from
 a file

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/db/schema/query.go       | 28 ++++++++++++++++++++
 lxd/db/schema/schema.go      | 16 ++++++++++-
 lxd/db/schema/schema_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 106 insertions(+), 1 deletion(-)

diff --git a/lxd/db/schema/query.go b/lxd/db/schema/query.go
index 6ec5ea6c4..6a421d6b3 100644
--- a/lxd/db/schema/query.go
+++ b/lxd/db/schema/query.go
@@ -3,8 +3,12 @@ package schema
 import (
 	"database/sql"
 	"fmt"
+	"io/ioutil"
+	"os"
 
 	"github.com/lxc/lxd/lxd/db/query"
+	"github.com/lxc/lxd/shared"
+	"github.com/pkg/errors"
 )
 
 // Return whether the schema table is present in the database.
@@ -68,3 +72,27 @@ INSERT INTO schema (version, updated_at) VALUES (?, strftime("%s"))
 	_, err := tx.Exec(statement, new)
 	return err
 }
+
+// Read the given file (if it exists) and executes all queries it contains.
+func execFromFile(tx *sql.Tx, path string) error {
+	if !shared.PathExists(path) {
+		return nil
+	}
+
+	bytes, err := ioutil.ReadFile(path)
+	if err != nil {
+		return errors.Wrap(err, "failed to read file")
+	}
+
+	_, err = tx.Exec(string(bytes))
+	if err != nil {
+		return err
+	}
+
+	err = os.Remove(path)
+	if err != nil {
+		return errors.Wrap(err, "failed to remove file")
+	}
+
+	return nil
+}
diff --git a/lxd/db/schema/schema.go b/lxd/db/schema/schema.go
index 7015977c4..f8166e826 100644
--- a/lxd/db/schema/schema.go
+++ b/lxd/db/schema/schema.go
@@ -8,6 +8,7 @@ import (
 
 	"github.com/lxc/lxd/lxd/db/query"
 	"github.com/lxc/lxd/shared"
+	"github.com/pkg/errors"
 )
 
 // Schema captures the schema of a database in terms of a series of ordered
@@ -17,6 +18,7 @@ type Schema struct {
 	hook    Hook     // Optional hook to execute whenever a update gets applied
 	fresh   string   // Optional SQL statement used to create schema from scratch
 	check   Check    // Optional callback invoked before doing any update
+	path    string   // Optional path to a file containing extra queries to run
 }
 
 // Update applies a specific schema change to a database, and returns an error
@@ -111,6 +113,13 @@ func (s *Schema) Fresh(statement string) {
 	s.fresh = statement
 }
 
+// File extra queries from a file. If the file is exists, all SQL queries in it
+// will be executed transactionally at the very start of Ensure(), before
+// anything else is done.
+func (s *Schema) File(path string) {
+	s.path = path
+}
+
 // Ensure makes sure that the actual schema in the given database matches the
 // one defined by our updates.
 //
@@ -127,7 +136,12 @@ func (s *Schema) Ensure(db *sql.DB) (int, error) {
 	var current int
 	aborted := false
 	err := query.Transaction(db, func(tx *sql.Tx) error {
-		err := ensureSchemaTableExists(tx)
+		err := execFromFile(tx, s.path)
+		if err != nil {
+			return errors.Wrapf(err, "failed to execute queries from %s", s.path)
+		}
+
+		err = ensureSchemaTableExists(tx)
 		if err != nil {
 			return err
 		}
diff --git a/lxd/db/schema/schema_test.go b/lxd/db/schema/schema_test.go
index 4c225980e..b48425093 100644
--- a/lxd/db/schema/schema_test.go
+++ b/lxd/db/schema/schema_test.go
@@ -3,6 +3,7 @@ package schema_test
 import (
 	"database/sql"
 	"fmt"
+	"os"
 	"testing"
 
 	"github.com/stretchr/testify/assert"
@@ -10,6 +11,7 @@ import (
 
 	"github.com/lxc/lxd/lxd/db/query"
 	"github.com/lxc/lxd/lxd/db/schema"
+	"github.com/lxc/lxd/shared"
 )
 
 // Create a new Schema by specifying an explicit map from versions to Update
@@ -334,6 +336,67 @@ func TestSchema_ExeciseUpdate(t *testing.T) {
 	require.EqualError(t, err, "no such column: name")
 }
 
+// A custom schema file path is given, but it does not exists. This is a no-op.
+func TestSchema_File_NotExists(t *testing.T) {
+	schema, db := newSchemaAndDB(t)
+	schema.Add(updateCreateTable)
+	schema.File("/non/existing/file/path")
+
+	_, err := schema.Ensure(db)
+	require.NoError(t, err)
+}
+
+// A custom schema file path is given, but it contains non valid SQL. An error
+// is returned an no change to the database is performed at all.
+func TestSchema_File_Garbage(t *testing.T) {
+	schema, db := newSchemaAndDB(t)
+	schema.Add(updateCreateTable)
+
+	path, err := shared.WriteTempFile("", "lxd-db-schema-", "SELECT FROM baz")
+	require.NoError(t, err)
+	defer os.Remove(path)
+
+	schema.File(path)
+
+	_, err = schema.Ensure(db)
+
+	message := fmt.Sprintf("failed to execute queries from %s: near \"FROM\": syntax error", path)
+	require.EqualError(t, err, message)
+}
+
+// A custom schema file path is given, it runs some queries that repair an
+// otherwise broken update, before the update is run.
+func TestSchema_File(t *testing.T) {
+	schema, db := newSchemaAndDB(t)
+
+	// Add an update that would insert a value into a non-existing table.
+	schema.Add(updateInsertValue)
+
+	path, err := shared.WriteTempFile("", "lxd-db-schema-",
+		`CREATE TABLE test (id INTEGER);
+INSERT INTO test VALUES (2);
+`)
+
+	require.NoError(t, err)
+	defer os.Remove(path)
+
+	schema.File(path)
+
+	_, err = schema.Ensure(db)
+	require.NoError(t, err)
+
+	// The file does not exist anymore.
+	assert.False(t, shared.PathExists(path))
+
+	// The table was created, and the extra row inserted as well.
+	tx, err := db.Begin()
+	require.NoError(t, err)
+
+	ids, err := query.SelectIntegers(tx, "SELECT id FROM test ORDER BY id")
+	require.NoError(t, err)
+	assert.Equal(t, []int{1, 2}, ids)
+}
+
 // Return a new in-memory SQLite database.
 func newDB(t *testing.T) *sql.DB {
 	db, err := sql.Open("sqlite3", ":memory:")

From 467c795740e9a8578e64f7684aa5649dc5c35ceb Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Fri, 27 Apr 2018 09:15:02 +0000
Subject: [PATCH 2/3] Add support for patch.local.sql and patch.global.sql

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 lxd/api_cluster.go             | 2 +-
 lxd/cluster/heartbeat_test.go  | 2 +-
 lxd/cluster/membership_test.go | 8 ++++----
 lxd/daemon.go                  | 3 ++-
 lxd/db/cluster/open.go         | 4 +++-
 lxd/db/cluster/open_test.go    | 6 +++---
 lxd/db/db.go                   | 6 +++---
 lxd/db/node/open.go            | 1 +
 lxd/db/testing.go              | 2 +-
 9 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index 0fd89ed3b..a8cc8f978 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -318,7 +318,7 @@ func clusterPutDisable(d *Daemon) Response {
 	if err != nil {
 		return SmartError(err)
 	}
-	d.cluster, err = db.OpenCluster("db.bin", d.gateway.Dialer(), address)
+	d.cluster, err = db.OpenCluster("db.bin", d.gateway.Dialer(), address, "/unused/db/dir")
 	if err != nil {
 		return SmartError(err)
 	}
diff --git a/lxd/cluster/heartbeat_test.go b/lxd/cluster/heartbeat_test.go
index bc4e7624f..b1def2ed1 100644
--- a/lxd/cluster/heartbeat_test.go
+++ b/lxd/cluster/heartbeat_test.go
@@ -249,7 +249,7 @@ func (f *heartbeatFixture) node() (*state.State, *cluster.Gateway, string) {
 
 	var err error
 	require.NoError(f.t, state.Cluster.Close())
-	state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address)
+	state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address, "/unused/db/dir")
 	require.NoError(f.t, err)
 
 	f.gateways[len(f.gateways)] = gateway
diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go
index 5d3523350..b1cbc7c86 100644
--- a/lxd/cluster/membership_test.go
+++ b/lxd/cluster/membership_test.go
@@ -256,7 +256,7 @@ func TestJoin(t *testing.T) {
 	targetAddress := targetServer.Listener.Addr().String()
 	var err error
 	require.NoError(t, targetState.Cluster.Close())
-	targetState.Cluster, err = db.OpenCluster("db.bin", targetGateway.Dialer(), targetAddress)
+	targetState.Cluster, err = db.OpenCluster("db.bin", targetGateway.Dialer(), targetAddress, "/unused/db/dir")
 	require.NoError(t, err)
 	targetF := &membershipFixtures{t: t, state: targetState}
 	targetF.NetworkAddress(targetAddress)
@@ -283,7 +283,7 @@ func TestJoin(t *testing.T) {
 
 	address := server.Listener.Addr().String()
 	require.NoError(t, state.Cluster.Close())
-	state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address)
+	state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address, "/unused/db/dir")
 	require.NoError(t, err)
 
 	f := &membershipFixtures{t: t, state: state}
@@ -368,7 +368,7 @@ func FLAKY_TestPromote(t *testing.T) {
 	targetAddress := targetServer.Listener.Addr().String()
 	var err error
 	require.NoError(t, targetState.Cluster.Close())
-	targetState.Cluster, err = db.OpenCluster("db.bin", targetGateway.Dialer(), targetAddress)
+	targetState.Cluster, err = db.OpenCluster("db.bin", targetGateway.Dialer(), targetAddress, "/unused/db/dir")
 	require.NoError(t, err)
 	targetF := &membershipFixtures{t: t, state: targetState}
 	targetF.NetworkAddress(targetAddress)
@@ -397,7 +397,7 @@ func FLAKY_TestPromote(t *testing.T) {
 		mux.HandleFunc(path, handler)
 	}
 
-	state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address)
+	state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address, "/unused/db/dir")
 	require.NoError(t, err)
 
 	// Promote the node.
diff --git a/lxd/daemon.go b/lxd/daemon.go
index 2b76ab2ec..75691226c 100644
--- a/lxd/daemon.go
+++ b/lxd/daemon.go
@@ -463,7 +463,8 @@ func (d *Daemon) init() error {
 
 	/* Open the cluster database */
 	for {
-		d.cluster, err = db.OpenCluster("db.bin", d.gateway.Dialer(), address)
+		dir := filepath.Join(d.os.VarDir, "database")
+		d.cluster, err = db.OpenCluster("db.bin", d.gateway.Dialer(), address, dir)
 		if err == nil {
 			break
 		}
diff --git a/lxd/db/cluster/open.go b/lxd/db/cluster/open.go
index 498e5d923..b11347bd0 100644
--- a/lxd/db/cluster/open.go
+++ b/lxd/db/cluster/open.go
@@ -3,6 +3,7 @@ package cluster
 import (
 	"database/sql"
 	"fmt"
+	"path/filepath"
 	"sync/atomic"
 
 	"github.com/CanonicalLtd/go-grpc-sql"
@@ -47,7 +48,7 @@ func Open(name string, dialer grpcsql.Dialer) (*sql.DB, error) {
 // nodes have version greater than us and we need to be upgraded), or return
 // false and no error (if some nodes have a lower version, and we need to wait
 // till they get upgraded and restarted).
-func EnsureSchema(db *sql.DB, address string) (bool, error) {
+func EnsureSchema(db *sql.DB, address string, dir string) (bool, error) {
 	someNodesAreBehind := false
 	apiExtensions := version.APIExtensionsCount()
 
@@ -86,6 +87,7 @@ func EnsureSchema(db *sql.DB, address string) (bool, error) {
 	}
 
 	schema := Schema()
+	schema.File(filepath.Join(dir, "patch.global.sql")) // Optional custom queries
 	schema.Check(check)
 
 	var initial int
diff --git a/lxd/db/cluster/open_test.go b/lxd/db/cluster/open_test.go
index 63ac1c4f7..2b74fa309 100644
--- a/lxd/db/cluster/open_test.go
+++ b/lxd/db/cluster/open_test.go
@@ -17,7 +17,7 @@ import (
 func TestEnsureSchema_NoClustered(t *testing.T) {
 	db := newDB(t)
 	addNode(t, db, "0.0.0.0", 1, 1)
-	ready, err := cluster.EnsureSchema(db, "1.2.3.4:666")
+	ready, err := cluster.EnsureSchema(db, "1.2.3.4:666", "/unused/db/dir")
 	assert.True(t, ready)
 	assert.NoError(t, err)
 }
@@ -83,7 +83,7 @@ func TestEnsureSchema_ClusterNotUpgradable(t *testing.T) {
 		subtest.Run(t, c.title, func(t *testing.T) {
 			db := newDB(t)
 			c.setup(t, db)
-			ready, err := cluster.EnsureSchema(db, "1")
+			ready, err := cluster.EnsureSchema(db, "1", "/unused/db/dir")
 			assert.Equal(t, c.ready, ready)
 			if c.error == "" {
 				assert.NoError(t, err)
@@ -125,7 +125,7 @@ func TestEnsureSchema_UpdateNodeVersion(t *testing.T) {
 			addNode(t, db, "1", schema-1, apiExtensions-1)
 
 			// Ensure the schema.
-			ready, err := cluster.EnsureSchema(db, "1")
+			ready, err := cluster.EnsureSchema(db, "1", "/unused/db/dir")
 			assert.NoError(t, err)
 			assert.Equal(t, c.ready, ready)
 
diff --git a/lxd/db/db.go b/lxd/db/db.go
index 3c540c322..f2742babc 100644
--- a/lxd/db/db.go
+++ b/lxd/db/db.go
@@ -149,13 +149,13 @@ type Cluster struct {
 // - name: Basename of the database file holding the data. Typically "db.bin".
 // - dialer: Function used to connect to the dqlite backend via gRPC SQL.
 // - address: Network address of this node (or empty string).
-// - api: Number of API extensions that this node supports.
+// - dir: Base LXD database directory (e.g. /var/lib/lxd/database)
 //
 // The address and api parameters will be used to determine if the cluster
 // database matches our version, and possibly trigger a schema update. If the
 // schema update can't be performed right now, because some nodes are still
 // behind, an Upgrading error is returned.
-func OpenCluster(name string, dialer grpcsql.Dialer, address string) (*Cluster, error) {
+func OpenCluster(name string, dialer grpcsql.Dialer, address, dir string) (*Cluster, error) {
 	db, err := cluster.Open(name, dialer)
 	if err != nil {
 		return nil, errors.Wrap(err, "failed to open database")
@@ -181,7 +181,7 @@ func OpenCluster(name string, dialer grpcsql.Dialer, address string) (*Cluster,
 		}
 	}
 
-	nodesVersionsMatch, err := cluster.EnsureSchema(db, address)
+	nodesVersionsMatch, err := cluster.EnsureSchema(db, address, dir)
 	if err != nil {
 		return nil, errors.Wrap(err, "failed to ensure schema")
 	}
diff --git a/lxd/db/node/open.go b/lxd/db/node/open.go
index 23048820f..05c556ae4 100644
--- a/lxd/db/node/open.go
+++ b/lxd/db/node/open.go
@@ -30,6 +30,7 @@ func EnsureSchema(db *sql.DB, dir string, hook schema.Hook) (int, error) {
 	backupDone := false
 
 	schema := Schema()
+	schema.File(filepath.Join(dir, "patch.local.sql")) // Optional custom queries
 	schema.Hook(func(version int, tx *sql.Tx) error {
 		if !backupDone {
 			logger.Infof("Updating the LXD database schema. Backup made as \"local.db.bak\"")
diff --git a/lxd/db/testing.go b/lxd/db/testing.go
index beb025384..e2d77cb4d 100644
--- a/lxd/db/testing.go
+++ b/lxd/db/testing.go
@@ -56,7 +56,7 @@ func NewTestCluster(t *testing.T) (*Cluster, func()) {
 	// Create an in-memory gRPC SQL server and dialer.
 	server, dialer := newGrpcServer()
 
-	cluster, err := OpenCluster(":memory:", dialer, "1")
+	cluster, err := OpenCluster(":memory:", dialer, "1", "/unused/db/dir")
 	require.NoError(t, err)
 
 	cleanup := func() {

From c12c708aa9c9e1c2065e80a884c563b63bfd3079 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanayaka at canonical.com>
Date: Fri, 27 Apr 2018 09:37:38 +0000
Subject: [PATCH 3/3] Add integration tests

Signed-off-by: Free Ekanayaka <free.ekanayaka at canonical.com>
---
 test/suites/database_update.sh | 24 +++++++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)

diff --git a/test/suites/database_update.sh b/test/suites/database_update.sh
index 6259d5c34..d212c739b 100644
--- a/test/suites/database_update.sh
+++ b/test/suites/database_update.sh
@@ -3,6 +3,19 @@ test_database_update(){
   mkdir -p "${LXD_MIGRATE_DIR}/database"
   MIGRATE_DB=${LXD_MIGRATE_DIR}/database/local.db
 
+  # Add some custom queries in patch.local.sql
+  cat << EOF > "${LXD_MIGRATE_DIR}/database/patch.local.sql"
+INSERT INTO certificates(fingerprint, type, name, certificate) VALUES('abc', 0, 'cert1', 'blob1');
+CREATE TABLE test (n INT);
+INSERT INTO test(n) VALUES(1);
+EOF
+
+  # Add some custom queries in patch.global.sql
+  cat << EOF > "${LXD_MIGRATE_DIR}/database/patch.global.sql"
+CREATE TABLE test (n INT);
+INSERT INTO test(n) VALUES(1);
+EOF
+
   # Create the version 1 schema as the database
   sqlite3 "${MIGRATE_DB}" > /dev/null < deps/schema1.sql
 
@@ -10,9 +23,18 @@ test_database_update(){
   spawn_lxd "${LXD_MIGRATE_DIR}" true
 
   # Assert there are enough tables.
-  expected_tables=4
+  expected_tables=5
   tables=$(sqlite3 "${MIGRATE_DB}" ".dump" | grep -c "CREATE TABLE")
   [ "${tables}" -eq "${expected_tables}" ] || { echo "FAIL: Wrong number of tables after database migration. Found: ${tables}, expected ${expected_tables}"; false; }
 
+  # Check that the custom queries were executed.
+  LXD_DIR="${LXD_MIGRATE_DIR}" lxd sql local "SELECT * FROM test" | grep -q "1"
+  LXD_DIR="${LXD_MIGRATE_DIR}" lxd sql global "SELECT * FROM certificates" | grep -q "cert1"
+  LXD_DIR="${LXD_MIGRATE_DIR}" lxd sql global "SELECT * FROM test" | grep -q "1"
+
+  # The custom patch files were deleted.
+  ! [ -e "${LXD_MIGRATE_DIR}/database/patch.local.sql" ]
+  ! [ -e "${LXD_MIGRATE_DIR}/database/patch.global.sql" ]
+
   kill_lxd "$LXD_MIGRATE_DIR"
 }


More information about the lxc-devel mailing list